您好,欢迎来到尔游网。
搜索
您的当前位置:首页Netty源码分析之ByteBufAllocator

Netty源码分析之ByteBufAllocator

来源:尔游网

一、ByteBufAllocator

前文分析可知ByteBufAllocator接口是ByteBuf的分配器,主要用来分配各种ByteBuf对象,本文将重点分析Allocator的分配过程。
接口定义:

package io.netty.buffer;

public interface ByteBufAllocator {
    ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR;
    ByteBuf buffer();
    ByteBuf buffer(int var1);
    ByteBuf buffer(int var1, int var2);
    ByteBuf ioBuffer();
    ByteBuf ioBuffer(int var1);
    ByteBuf ioBuffer(int var1, int var2);
    ByteBuf heapBuffer();
    ByteBuf heapBuffer(int var1);
    ByteBuf heapBuffer(int var1, int var2);
    ByteBuf directBuffer();
    ByteBuf directBuffer(int var1);
    ByteBuf directBuffer(int var1, int var2);
    CompositeByteBuf compositeBuffer();
    CompositeByteBuf compositeBuffer(int var1);
    CompositeByteBuf compositeHeapBuffer();
    CompositeByteBuf compositeHeapBuffer(int var1);
    CompositeByteBuf compositeDirectBuffer();
    CompositeByteBuf compositeDirectBuffer(int var1);
    boolean isDirectBufferPooled();
    int calculateNewCapacity(int var1, int var2);
}

接口方法几乎全是各种ByteBuf的分配方法,有直接内存的、堆内存的、以及CompositeByteBuf类型等。其属性DEFAULT的默认是实现,由前文分析可知是PooledByteBufAllocator(可以通过参数io.netty.allocator.type来修改(参数值为pooled和unpooled,也就是对应的PooledByteBufAllocator和UnpooledByteBufAllocator))。抽象类中对接口中的一些方法进行了实现,以及新增了toLeakAwareBuffer()方法,这个方法的主要目的是监测直接内存的泄露情况,以及新增一些如下静态属性值。

public abstract class AbstractByteBufAllocator implements ByteBufAllocator {
    static final int DEFAULT_INITIAL_CAPACITY = 256;//默认的初始化容量大小
    static final int DEFAULT_MAX_CAPACITY = Integer.MAX_VALUE;//默认的最大容量大小
    static final int DEFAULT_MAX_COMPONENTS = 16;//最大组件数
    static final int CALCULATE_THRESHOLD = 1048576 * 4; // 4 MiB page

    static {
        ResourceLeakDetector.addExclusions(AbstractByteBufAllocator.class, "toLeakAwareBuffer");
    }
}

二、PooledByteBufAllocator

如果你认真阅读PooledByteBufAllocator源码(Unpooled非池化类型不做分析,相对简单),就会发现其最重要的两个地方是静态代码块和构造方法。
静态代码块: 初始化静态属性值(如下所示)
DEFAULT_NUM_HEAP_ARENA: 默认PoolArena堆内存数组大小
DEFAULT_NUM_DIRECT_ARENA: 默认PoolArena直接内存数组大小
以上可用io.netty.allocator.numHeapArenas和io.netty.allocator.numDirectArenas参数配置,默认取CPU核心数的两倍和PlatformDependent.maxDirectMemory() / defaultChunkSize / 2 / 3(根据(Assuming each arena has 3 chunks, the pool should not consume more than 50% of max memory.每一个arena 可以划分三个chunk快,并且确保每个arena不超过最大内存的二分之一)),从这里可以知道/2是保证分配的arena不会超过最大内存的50%,/3是保证每个arena可分配三个chunk块。
DEFAULT_PAGE_SIZE: 可通过参数io.netty.allocator.pageSize配置,默认页大小是8KB
DEFAULT_MAX_ORDER: 可通过参数io.netty.allocator.maxOrder配置,默认是11。最大值是13,和page_size组合使用,得到chunkSize,从validateAndCalculateChunkSize(int pageSize, int maxOrder)方法可以知道,当page_size左移max_order位之后,得到的数据不能超过1GB
DEFAULT_SMALL_CACHE_SIZE: 可参数io.netty.allocator.smallCacheSize配置,默认是256
DEFAULT_NORMAL_CACHE_SIZE: 可参数io.netty.allocator.normalCacheSize配置,默认是
DEFAULT_MAX_CACHED_BUFFER_CAPACITY: 可参数io.netty.allocator.maxCachedBufferCapacity配置,默认是321024
DEFAULT_CACHE_TRIM_INTERVAL: 可参数io.netty.allocator.maxCachedBufferCapacity配置,默认是32
1024
DEFAULT_CACHE_TRIM_INTERVAL_MILLIS: 可参数io.netty.allocator.cacheTrimInterval配置,默认是8192
DEFAULT_USE_CACHE_FOR_ALL_THREADS: 可参数io.netty.allocator.useCacheForAllThreads配置,默认是true
DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT: 可参数io.netty.allocator.directMemoryCacheAlignment配置,默认是0
DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK: 可参数io.netty.allocator.maxCachedByteBuffersPerChunk配置,默认是1023
构造方法: chunkSize属性是根据page_size左移max_order位得到16MB,并且构造方法主要是初始化heapArenas和directArenas数组。

PoolArena.DirectArena arena = new PoolArena.DirectArena(
                        this, pageSize, pageShifts, chunkSize, directMemoryCacheAlignment);

pageShifts通过pageSize计算出,计算方法是算出是2的多少次幂,默认是13。还有就是初始化threadCache这里我们主要关注他的initialValue()方法。

        protected synchronized PoolThreadCache initialValue() {
            //取出最新被使用的PoolArena,其中heapArenas和directArenas是
            //PooledByteBufAllocator的两个属性(数组),其实例化在它的构造方法中,但是数组的大小值由DEFAULT_NUM_HEAP_ARENA和DEFAULT_NUM_DIRECT_ARENA确定
            final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
            final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);

            final Thread current = Thread.currentThread();
            if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {
                final PoolThreadCache cache = new PoolThreadCache(
                        heapArena, directArena, smallCacheSize, normalCacheSize,
                        DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);

                if (DEFAULT_CACHE_TRIM_INTERVAL_MILLIS > 0) {
                    final EventExecutor executor = ThreadExecutorMap.currentExecutor();
                    if (executor != null) {
                        executor.scheduleAtFixedRate(trimTask, DEFAULT_CACHE_TRIM_INTERVAL_MILLIS,
                                DEFAULT_CACHE_TRIM_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
                    }
                }
                return cache;
            }
            // No caching so just use 0 as sizes.
            return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0);
        }

接下来就是分配的主题:

    protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
        //从缓存中取值
        PoolThreadCache cache = threadCache.get();
        PoolArena<ByteBuffer> directArena = cache.directArena;

        final ByteBuf buf;
        if (directArena != null) {
           //进行cache分配
            buf = directArena.allocate(cache, initialCapacity, maxCapacity);
        } else {
            buf = PlatformDependent.hasUnsafe() ?
                    UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
                    new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
        }

        return toLeakAwareBuffer(buf);
    }

PoolThreadCache cache = threadCache.get():threadCache是PoolThreadLocalCache实例,他是PooledByteBufAllocator的一个内部类实现了FastThreadLocal(这是netty自己实现的一个ThreadLocal类,后续做单独分析)。
从上方法可知道,先取出cache,然后从cache取出directArena,如果存在那么可以进行分配,然后通过内存监测包装。

以上,有任何不对的地方,请指正,敬请谅解。

因篇幅问题不能全部显示,请点此查看更多更全内容

Copyright © 2019- axer.cn 版权所有 湘ICP备2023022495号-12

违法及侵权请联系:TEL:199 18 7713 E-MAIL:2724546146@qq.com

本站由北京市万商天勤律师事务所王兴未律师提供法律服务