1 NioEventLoopGroup初始化

构造函数调用链:

1
2
3
[1]NioEventLoopGroup
[2]MultithreadEventLoopGroup
[3]MultithreadEventExecutorGroup

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// MultithreadEventExecutorGroup
protected MultithreadEventExecutorGroup(
int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
...
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
...
} finally {
...
}
}
}
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
  1. 调用newChild()方法初始化MultithreadEventExecutorGroup.children属性,children是一个数组用于保存EventExecutor实例。至于children数组的大小可以指定,如果未指定,则为CPU*2;
  2. 创建线程选择器,对于children数组其实放的都是线程,线程选择器从数组中选择一个线程使用,线程选择器分成两种PowerOfTowEventExecutorChooser和GenericEventExecutorChooser;如果children数组的大小为2的幂次方,则创建PowerOfTowEventExecutorChooser选择器,否则创建GenericEventExecutorChooser选择器。PowerOfTowEventExecutorChooser能够优化线程的选择效率。

下面进行详细的分析。

1.1 ThreadPerTaskExecutor的创建

在上面MultithreadEventExecutorGroup的构造函数中,首先创建了一个ThreadPerTaskExecutor实例,上一篇文章中已经说过了ThreadPerTaskExecutor是用来创建线程实例的。如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// ThreadPerTaskExecutor
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}

每一个NioEventLoop中都会保存一个ThreadPerTaskExecutor实例,当调用NioEventLoop的execute()方法是,会调用ThreadPerTaskExecutor的execute()方法。另外,还需要注意children及chooser都是MultithreadEventExecutorGroup类的属性。

1.2 两种线程选择器的比较

前面说了,相比较GenericEventExecutorChooser,PowerOfTowEventExecutorChooser能够优化线程选择的效率。下面来看下GenericEventExecutorChooser的具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// DefaultEventExecutorChooserFactory
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
// DefaultEventExecutorChooserFactory
private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTowEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}

GenericEventExecutorChooser类与PowerOfTowEventExecutorChooser均为DefaultEventExecutorChooserFactory的内部类。我们看到在创建两个类的实例时,都需要将children作为参数传入,并把children保存在实例的executors属性中。

我们主要关注next()方法,即从数组中取得一个executors。GenericEventExecutorChooser 类的next()就是简单的取余(%)从数组中循环取;PowerOfTowEventExecutorChooser的next()方法不使用取余操作,而使用了与(&),通过与操作同样能达到循环取数组,且与的效率比取余要高的多。

1.3 children数组赋值

在给children数组赋值时,调用了newChild(executor, args),newChild方法需要把前面创建的ThreadPerTaskExecutor实例传入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// NioEventLoopGroup
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
// NioEventLoop
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
selector = openSelector();
selectStrategy = strategy;
}

我们看到最终创建children数组保存的是NioEventLoop实例,传入的ThreadPerTaskExecutor实例会保存在SingleThreadEventExecutor类中。每个NioEventLoop实例中还有一个selector

下面简单画一个NioEventLoopGroup初始化的时序图:

在这里插入图片描述

2 Channel与NioEventLoop的绑定

上一篇文章梳理了Channel与EventLoop的关系:

EventLoop接口的一个实现类ThreadPerChannelEventLoop中有一个成员变量ch可以保存一个Channel,没在NioEventLoop类中发现用于保存Channel的属性。NioServerSocketChannel的父类AbstractChannel中有一个成员变量eventLoop可以保存一个EventLoop,通过eventLoop()方法可获得实例。

也就是说每一个Channel对应一个NioEventLoop,关于每一个NioEventLoop是否对应一个Channel这就不好说了。

那么来看看NioServerSocketChannel的父类AbstractChannel中成员变量eventLoop是什么时候保存一个NioEventLoop实例的?

在上上篇文章 【68】Netty 4.1.6源码分析———服务端启动过程中,说道AbstractBootstrap.initAndRegister方法,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// AbstractBootstrap.initAndRegister
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
...
return regFuture;
}

其中有这么一段代码ChannelFuture regFuture = config().group().register(channel),group()方法返回的是NioEventLoopGroup,NioEventLoopGroup实现MultithreadEventLoopGroup类,所以最终调用的是MultithreadEventLoopGroup.register方法,如下:

1
2
3
4
5
6
7
8
9
// MultithreadEventLoopGroup
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
@Override
public EventLoop next() {
return (EventLoop) super.next();
}

MultithreadEventLoopGroup的register方法中,首先调用next(),从children数组中选择一个线程,也就是选择一个NioEventLoop实例,next()方法调用了chooser的next()方法,如下:

1
2
3
4
5
// MultithreadEventExecutorGroup
@Override
public EventExecutor next() {
return chooser.next();
}

然后调用NioEventLoop父类SingleThreadEventLoop的register方法,如下:

1
2
3
4
5
6
7
8
9
10
11
12
// SingleThreadEventLoop
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}

SingleThreadEventLoop.register(final ChannelPromise promise) 方法中,promise.channel()获取的就是NioServerSocketChannel实例,unsafe()方法获取到channel的unsafe,debug发现最终调用的是AbstractChannel.register方法,后面的上上篇文章 【68】Netty 4.1.6源码分析———服务端启动过程已经分析过了,在AbstractChannel.register方法中,将从children数组中获取到的NioEventLoop赋值给AbstractChannel成员变量eventLoop,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
// AbstractChannel
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
// 赋值eventLoop
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
...
}
}

3 NioEventLoop的启动过程

NioEventLoop的启动在执行AbstractBootstrap.doBind0方法时触发,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// AbstractBootstrap
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}

在doBind0方法中,这段代码channel.eventLoop().execute(...,eventLoop()方法获取与channel绑定的NioEventLoop,然后调用NioEventLoop的execute方法。可以理解为在为channel绑定端口时,需要执行(启动)一下该channel的NioEventLoop线程。

NioEventLoop的继承SingleThreadEventExecutor抽象类,调用的是SingleThreadEventExecutor.execute方法,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// SingleThreadEventExecutor
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}

SingleThreadEventExecutor方法做了以下三件事:

  1. 执行inEventLoop()方法,???
  2. 执行startThread()方法 ,调用ThreadPerTaskExecutor,创建一个线程并启动
  3. 执行addTask(task)方法,添加一个任务到taskQueue

3.1 startThread()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// SingleThreadEventExecutor
private void startThread() {
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
doStartThread();
}
}
}
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
// 重要
thread = Thread.currentThread();
...
try {
// 核心
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
...
} finally {
...
}
}
});
}

在SingleThreadEventExecutor的doStartThread方法中,首先把当前线程赋值给SingleThreadEventExecutor.thread属性,该属性是inEventLoop()方法执行的关键。然后调用NioEventLoop的run()方法。注意executor.execute(…)调用的是ThreadPerTaskExecutor类的execute方法。下面来看看NioEventLoop的run()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// NioEventLoop
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
...
if (wakenUp.get()) {
selector.wakeup();
}
default:
// fallthrough
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
...
}
}
}

这是一个死循环,主要做了一下三件事:

  1. 通过 select/selectNow 调用查询当前是否有就绪的 IO 事件,select(wakenUp.getAndSet(false))
  2. 查询就绪的 IO 事件, 然后处理它,processSelectedKeys()
  3. 运行 taskQueue 中的任务,runAllTasks()