NIOEventLoopGroup源码?
NioEventLoopGroup(其实是MultithreadEventExecutorGroup) 内部维护一个类型为 EventExecutor children [], 默认大小是处理器核数 * 2, 这样就构成了一个线程池,初始化EventExecutor时NioEventLoopGroup重载newChild方法,所以children元素的实际类型为NioEventLoop。
线程启动时调用SingleThreadEventExecutor的构造方法,执行NioEventLoop类的run方法,首先会调用hasTasks()方法判断当前taskQueue是否有元素。
如果taskQueue中有元素,执行 selectNow() 方法,最终执行selector.selectNow(),该方法会立即返回。如果taskQueue没有元素,执行 select(oldWakenUp) 方法
select ( oldWakenUp) 方法解决了 Nio 中的 bug,selectCnt 用来记录selector.select方法的执行次数和标识是否执行过selector.selectNow()。
若触发了epoll的空轮询bug,则会反复执行selector.select(timeoutMillis),变量selectCnt 会逐渐变大,当selectCnt 达到阈值(默认512),则执行rebuildSelector方法,进行selector重建,解决cpu占用100%的bug。
rebuildSelector方法先通过openSelector方法创建一个新的selector。然后将old selector的selectionKey执行cancel。
最后将old selector的channel重新注册到新的selector中。rebuild后,需要重新执行方法selectNow,检查是否有已ready的selectionKey。
接下来调用processSelectedKeys 方法(处理I/O任务),当selectedKeys != null时,调用processSelectedKeysOptimized方法,迭代 selectedKeys 获取就绪的 IO 事件的selectkey存放在数组selectedKeys中, 然后为每个事件都调用 processSelectedKey 来处理它,processSelectedKey 中分别处理OP_READ;OP_WRITE;OP_CONNECT事件。
最后调用runAllTasks方法(非IO任务),该方法首先会调用fetchFromScheduledTaskQueue方法,把scheduledTaskQueue中已经超过延迟执行时间的任务移到taskQueue中等待被执行,然后依次从taskQueue中取任务执行,每执行64个任务,进行耗时检查,如果已执行时间超过预先设定的执行时间,则停止执行非IO任务,避免非IO任务太多,影响IO任务的执行。
每个NioEventLoop对应一个线程和一个Selector,NioServerSocketChannel会主动注册到某一个NioEventLoop的Selector上,NioEventLoop负责事件轮询。
Outbound 事件都是请求事件, 发起者是 Channel,处理者是 unsafe,通过 Outbound 事件进行通知,传播方向是 tail到head。
Inbound 事件发起者是 unsafe,事件的处理者是 Channel, 是通知事件,传播方向是从头到尾。
内存管理机制,首先会预申请一大块内存Arena,Arena由许多Chunk组成,而每个Chunk默认由2048个page组成。
Chunk通过AVL树的形式组织Page,每个叶子节点表示一个Page,而中间节点表示内存区域,节点自己记录它在整个Arena中的偏移地址。
当区域被分配出去后,中间节点上的标记位会被标记,这样就表示这个中间节点以下的所有节点都已被分配了。
大于8k的内存分配在poolChunkList中,而PoolSubpage用于分配小于8k的内存,它会把一个page分割成多段,进行内存分配。
ByteBuf的特点:支持自动扩容(4M),保证put方法不会抛出异常、通过内置的复合缓冲类型,实现零拷贝(zero-copy);不需要调用flip()来切换读/写模式,读取和写入索引分开;
方法链;引用计数基于AtomicIntegerFieldUpdater用于内存回收;PooledByteBuf采用二叉树来实现一个内存池,集中管理内存的分配和释放,不用每次使用都新建一个缓冲区对象。
UnpooledHeapByteBuf每次都会新建一个缓冲区对象。