写了一个Netty Server的demo,点击查看demo源码。这篇文章分析provider和consumer中如何使用Netty的,是不是和我写的demo类似呢?


1 Netty Server端

在demo中,Server.java的四要素分别是:

  1. NioServerSocketChannelFactory:用于创建基于NIO的服务端
  2. ServerBootstrap:用于帮助服务器启动
  3. ServerChannelPiplineFactory:在该类中设置编解码和逻辑处理类
  4. Channel:监听IP和端口

查看dubbo的NettyServer类中的doOpen方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// com.alibaba.dubbo.remoting.transport.netty
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
bootstrap = new ServerBootstrap(channelFactory);
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler.getChannels();
// https://issues.jboss.org/browse/NETTY-365
// https://issues.jboss.org/browse/NETTY-379
// final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
/*int idleTimeout = getIdleTimeout();
if (idleTimeout > 10000) {
pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
}*/
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
// bind
channel = bootstrap.bind(getBindAddress());
}

四要素一个都不缺,dubbo Netty Server的逻辑处理类为NettyHandler,同样是继承SimpleChannelHandler,和demo中也一样,实现了messageReceived和writeRequested方法,分别用于消息的接收与发送。

但是具体的逻辑的处理NettyHandler类并不做,真正做的是ChannelHandler接口,调用链非常复杂,我们只关心两个方法:消息的接收与发送。

1.1 消息的接收

Netty 消息接收的处理过程,调用链很复杂。举个例子来说,就像请假条的审批过程,首先需要开发小组长签字,然后部门经理签字,然后大部门经理签字,然后技术总监签字,然后,然后,然后…。一个请假条传过了数十人之手,Netty消息接收的处理过程于此类似,只不过人变成了类,dubbo开发人员真是把责任链设计模式玩的炉火纯青。

来看下我画的消息接收处理流程:
在这里插入图片描述
责任链上的类大多都实现了ChannelHandler接口,每一个实现类都有相应的责任,处理完成后调用下一位处理者。

上面的处理过程中有一点值得注意,就是HeaderExchangeHandler.received方法。该方法是责任链的最后,肯定要做的就是对请求做处理。然后从该类的名称中也能找到这个类还可以完成消息的交换,也就是说如果这个请求需要有应答,HeaderExchangeHandler还需要返回一个应答。

1.2 消息的发送

Netty 的消息发送,应该始于NettyHandler类的writeRequested方法。当然了,真正完成消息发送的也不是NettyHandler类。同样是委托给了ChannelHandler接口的实现类。

但是,需要注意是是Netty Server应该很少会通过NettyHandler类的writeRequested方法进行消息发送,一般会直接在接收消息的方法中就进行消息返回。(不知道心跳检测会不会通过writeRequested方法)

有一点不太明白:DecodeHandler.received方法的作用?不是在bootstrap.setPipelineFactory中设置了编解码类了吗?为什么这里又解码了一次?

2 Netty Client端

在demo中,Client.java只有三个要素,没有Channel:

  1. NioClientSocketChannelFactory:同服务端相同
  2. ClientBootstrap:用于帮助客户端启动
  3. ChannelPipelineFactory:在该类中设置编解码和逻辑处理类

查看dubbo的NettyClient类中的doOpen方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
bootstrap = new ClientBootstrap(channelFactory);
// config
// @see org.jboss.netty.channel.socket.SocketChannelConfig
bootstrap.setOption("keepAlive", true);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("connectTimeoutMillis", getTimeout());
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
}

和demo中Client.java中的代码类似,逻辑处理类同样是NettyHandler。

2.1 消息的接收

在这里插入图片描述

Client端消息的接收与Server端消息接收直到HeaderExchangeHandler.received都是一样是,再往下走就不一样了,Server端HeaderExchangeHandler.received方法处理请求并判断是否需要给予request应答;Client端HeaderExchangeHandler.received方法仅仅只是接收结果或者等待结果到达。

2.2 消息的发送

client端消息的发送一般也不会调用NettyHandler类的writeRequested方法,详细信息请参考我们的这篇文章:
【65 有错误】Dubbo 2.5源码分析——远程服务引用(向Netty Server发送请求)