前导文章:

  1. 【34】Java NIO 学习笔记(基础)
  2. 【31】常见IO模型浅析
  3. 【25】说说Redis的非阻塞IO多路复用技术

1 相关属性设置

先来看JDK NIO启动服务端的过程:

  1. 首先先创建一个server channel,是ServerSocketChannel类型
  2. 将ServerSocketChannel设置为非阻塞
  3. 将ServerSocketChannel绑定监听的接口
  4. 创建一个多路复用选择器Selector
  5. 将ServerSocketChannel注册到Selector
  6. 轮询获取选择器上 准备就绪 的操作,并分配到相应的channel执行

下面是一个NIO服务器的代码,这段代码在我看来有一些问题,下面的代码所有的操作使用一个主线程,根据Reactor模式,应该至少有两个线程,一个Boss线程专门用于监控IO就绪事件,一个Work线程池负责具体的IO读写处理,Boss线程检测到新的IO就绪事件后,根据事件类型,完成IO操作任务的分配,并将具体的操作交由Work线程处理。

【纠错】这是Reactor单线程模型,只需要一个线程!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public void server() throws Exception {
//1 获取server通道
ServerSocketChannel ssChannel = ServerSocketChannel.open();
//2 切换非阻塞模式
ssChannel.configureBlocking(false);
//3 绑定链接
ssChannel.bind(new InetSocketAddress(9898));
//4 获取选择器
Selector selector = Selector.open();
//5 将通道注册到选择器上
ssChannel.register(selector, SelectionKey.OP_ACCEPT);
//6 轮询获取选择器上 准备就绪 的操作
while(selector.select() > 0 ) {
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while(it.hasNext()){
SelectionKey sk = it.next();
// 判断具体是什么事件准备就绪
if(sk.isAcceptable()){
SocketChannel sChanel = ssChannel.accept();
// 切换客户端非阻塞模式
sChanel.configureBlocking(false);
//将 该通道注册到选择器上
sChanel.register(selector, SelectionKey.OP_READ);
}else if(sk.isReadable()){
SocketChannel sChannel = (SocketChannel) sk.channel();
ByteBuffer buf = ByteBuffer.allocate(1024);
int len = 0 ;
while((len = sChannel.read(buf)) > 0){
buf.flip();
System.out.println(new String(buf.array(),0,len));
buf.clear();
}
}
// 取消选择键
it.remove();
}
}
}
}

下载编译好Netty4.1.6源码,来看下Netty源码中的example模块,找一个Netty Server的例子,以io.netty.example.echo.EchoServer为例子,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 省略了部分代码
public final class EchoServer {
...
public static void main(String[] args) throws Exception {
...
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new EchoServerHandler());
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

本文就来分析上面这部分服务端启动的代码,ServerBootstrap是一个辅助服务端启动的类,前一部分只是对ServerBootstrap类中的相关属性进行配置。相对应的还有一个辅助客户端启动的类,就是Bootstrap,类图关系如下:
在这里插入图片描述
ServerBootstrap和Bootstrap都继承自AbstractBootstrap。group方法是将Boss和work分别设置到ServerBootstrap.childGroup属性和AbstractBootstrap.group属性。ServerBootstrap.channel方法设置AbstractBootstrap.channelFactory属性,来看下ChannelFactory接口:

1
2
3
4
5
6
7
public interface ChannelFactory<T extends Channel> extends io.netty.bootstrap.ChannelFactory<T> {
/**
* Creates a new channel.
*/
@Override
T newChannel();
}

只有一个newChannel方法,用于创建一个channel。该接口有一个实现类ReflectiveChannelFactory,实现了newChannel方法,如下:

1
2
3
4
5
6
7
8
9
// ReflectiveChannelFactory.newChannel
@Override
public T newChannel() {
try {
return clazz.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}

从ReflectiveChannelFactory类的名称就能看出来,是通过反射机制来创建实例的,ReflectiveChannelFactory中有一个clazz属性用来保存Class。AbstractBootstrap.channelFactory就是一个ReflectiveChannelFactory类型,ServerBootstrap.channel方法会创建一个ReflectiveChannelFactory实例,并将Clazz属性设置为NioServerSocketChannel.class。

ServerBootstrap.option方法用于设置AbstractBootstrap.options属性。ServerBootstrap.handler方法设置AbstractBootstrap.handler属性。ServerBootstrap.childHandler方法用于设置ServerBootstrap.childHandler属性。很明显,一些通用的配置都在AbstractBootstrap类中,child的属性都在ServerBootstrap中。(暂且不管各种属性的作用)

2 channel的创建及初始化

在这里插入图片描述

各种属性配置好后,下面就是重头戏了。ServerBootstrap.bind方法不仅仅只是绑定接口的作用,还有以下作用:

  1. channel的创建及初始化
  2. 注册selector
  3. 注册感兴趣的事件
  4. 绑定接口

经常看源码,肯定知道真正绑定肯定不是在bind方法中,而是在doBind方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// AbstractBootstrap.doBind
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
...
}
}

doBind方法首先调用initAndRegister方法进行两件事,第一件事channel的创建及初始化;第二件事创建一个多路复用选择器Selector,将channel注册到Selector。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// AbstractBootstrap.initAndRegister
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}

newChannel方法其实会创建一个NioServerSocketChannel实例,netty中的channel类图结构如下:
在这里插入图片描述
NioServerSocketChannel继承AbstractNioChannel抽象类,其中AbstractNioChannel类中保存着一个JDK底层的channel,所以说,创建的NioServerSocketChannel实例,其实是对JDK底层channel的封装,最终还是创建的JDK底层的channel。

channel的初始化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// ServerBootstrap.init
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
channel.config().setOptions(options);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
// We add this handler via the EventLoop as the user may have used a ChannelInitializer as handler.
// In this case the initChannel(...) method will only be called after this method returns. Because
// of this we need to ensure we add our handler in a delayed fashion so all the users handler are
// placed in front of the ServerBootstrapAcceptor.
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}

ServerBootstrap.init方法还是挺复杂的,主要完成以下几件事:

  1. 配置NioServerSocketChannel相关属性,options,attrs,NioServerSocketChannel中有一个配置类ServerSocketChannelConfig;
  2. 配置一个ChannelInitializer-handler,然后在channel初始化时,将ServerBootstrap .handler方法配置的handler设置到pipeline中。(然后启动一个线程将ServerBootstrapAcceptor加入到pipeline中?)

3 注册selector

在这里插入图片描述

注册selector是在AbstractChannel.register方法中实现的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// AbstractChannel.register
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
...
}
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}

AbstractChannel.register0方法主要是三个方法:

  1. doRegister():调用JDK底层进行selector注册
  2. pipeline.invokeHandlerAddedIfNeeded():回调handler的HandlerAdded方法
  3. pipeline.fireChannelRegistered():回调handler的ChannelRegistered方法

此时的isActive()方法返回false,因为channel还没有active,还没有进行端口的绑定。来看下AbstractNioChannel.doRegister方法,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
// AbstractNioChannel.doRegister
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return;
} catch (CancelledKeyException e) {
...
}
}
}

上面说过NioServerSocketChannel继承AbstractNioChannel抽象类,其中AbstractNioChannel类中保存着一个JDK底层的channel,javaChannel()方法就是用来获取JDK底层的channel,所以说selector的注册,也是调用的JDK底层的方法。注册的channel感兴趣的事件为0,表示该channel对任何事件都不感兴趣。

4 绑定端口

回到AbstractBootstrap.doBind方法,进行doBind0方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// AbstractBootstrap.doBind0
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}

调用channel.bind方法进行端口绑定,进入AbstractChannel.bind方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// AbstractChannel.bind
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
...
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
...
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}

AbstractChannel.bind方法方法主要有两个作用:

  1. 调用JDK底层bind方法
  2. 回调handler的ChannelActive方法

isActive()方法的返回值为false,需要执行完doBind方法,isActive()方法才会返回true,doBind方法如下:

1
2
3
4
5
6
7
8
9
// NioServerSocketChannel.doBind
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}

pipeline.fireChannelActive()方法是回调pipeline中的handler的 ChannelActive方法,调用链比较长,最终会调用AbstractNioChannel.doBeginRead方法,在这个方法中,注册channel感兴趣的事件,selectionKey.interestOps,readInterestOp就是ACCEPT事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// AbstractNioChannel.doBeginRead
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}

5 总结

今天这篇文章的类图和时序图,都是用新的工具画的,感觉高大上了许多!!!

本文一窥了Netty创建并启动服务器端的过程,主要包括Server Channel的创建及初始化,端口的绑定,Selector的注册。涉及到了一点EventLoopGroup和pipeline,但是都没有做详细的分析,放在后面分析。可以很明显的感觉到,Netty是对JDK底层NIO相关类的封装,真正使用的还是JDK的底层!