1 TCP的粘包拆包问题

我们知道Dubbo的网络通信框架Netty是基于TCP协议的,TCP协议的网络通信会存在粘包和拆包的问题,先看下为什么会出现粘包和拆包:

  1. 当要发送的数据大于TCP发送缓冲区剩余空间大小,将会发生拆包
  2. 待发送数据大于MSS(最大报文长度),TCP在传输前将进行拆包待发送数据大于MSS(最大报文长度),TCP在传输前将进行拆包
  3. 要发送的数据小于TCP发送缓冲区的大小,TCP将多次写入缓冲区的数据一次发送出去,将会发生粘包要发送的数据小于TCP发送缓冲区的大小,TCP将多次写入缓冲区的数据一次发送出去,将会发生粘包
  4. 接收数据端的应用层没有及时读取接收缓冲区中的数据,将发生粘包接收数据端的应用层没有及时读取接收缓冲区中的数据,将发生粘包

2 Dubbo消息协议头规范

针对粘包和拆包问题,dubbo使用了魔法数和数据长度作为分割符进行解决。下面是协议头规范:
在这里插入图片描述
dubbo消息规范由4部分组成:IP + TCP + Dubbo + Serialized Data,TCP与IP由Netty负责,其中Dubbo和Serialized Data是否Dubbo框架负责。Dubbo框架严格区分了每个字节的作用及长度。

  1. 0~15(magic High & Magic Low):类似java字节码文件里的魔数,用来判断是不是dubbo协议的数据包,就是一个固定的数字
  2. 1~20(Serialization id):序列ID
  3. 21(event):是否单向
  4. 22(Two way):是否双向
  5. 23(Req/Res):是request还是response
  6. 24~31(status):状态位,设置请求响应状态,request为空,response才有值
  7. 32~95(ID):request ID
  8. 96~127(data length):数据长度

3 源码分析

观察NettyServer与NettyClient中的doOpen方法啊,发现服务端与客户端使用相同的编解码类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// NettyServer.doOpen || NettyClient.doOpen
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(),
getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
/*int idleTimeout = getIdleTimeout();
if (idleTimeout > 10000) {
pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
}*/
// 编码
pipeline.addLast("decoder", adapter.getDecoder());
// 解码
pipeline.addLast("encoder", adapter.getEncoder());
// 逻辑处理类
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});

3.1 request 编码
在这里插入图片描述
这个InternalEncoder是一个NettyCodecAdapter的内部类,我们看到codec.encode(channel, buffer, msg)这里,这个时候codec=DubboCountCodec,这个是在构造方法中传入的,DubboCountCodec.encode–>ExchangeCodec.encode–>ExchangeCodec.encodeRequest,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
Serialization serialization = getSerialization(channel);
// header.16字节,128位
byte[] header = new byte[HEADER_LENGTH];
// set magic number.16位魔法数0xdabb
//short2bytes 将16位数存到两个byte中
Bytes.short2bytes(MAGIC, header);
// set request and serialization flag.
header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
if (req.isTwoWay()) header[2] |= FLAG_TWOWAY;
if (req.isEvent()) header[2] |= FLAG_EVENT;
// set request id.
//long 64位8字节,从第5个字节
Bytes.long2bytes(req.getId(), header, 4);
//buffer这里写入buffer并不是直接写入到netty,是一个新建的buffer
//写完需要netty发送
// encode request data. 保存当前写入位置
int savedWriteIndex = buffer.writerIndex();
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
if (req.isEvent()) {
encodeEventData(channel, out, req.getData());
} else {
//编码写入请求数据DubboCodec
encodeRequestData(channel, out, req.getData());
}
out.flushBuffer();
bos.flush();
bos.close();
//获取写入数据的长度
int len = bos.writtenBytes();
checkPayload(channel, len);
Bytes.int2bytes(len, header, 12);
// write
buffer.writerIndex(savedWriteIndex);
//写header
buffer.writeBytes(header); // write header.
//设置当前写入位置
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
}

3.2 request 解码
在这里插入图片描述
解码流程就是接收到数据之后,先读取header长度的字节。判断开头两字节是不是magicNumber,如果不是则遍历数据的每一位来判断魔法数,校验header长度,数据长度,根据header的属性构造Response、Request等对象,然后读取序列化器来解析消息体。如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// ExchangeCodec.decode
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
int readable = buffer.readableBytes();
//读取header
byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
buffer.readBytes(header);
return decode(channel, buffer, readable, header);
}
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
// check magic number.
//每次解码先判断是否是magic——number
if (readable > 0 && header[0] != MAGIC_HIGH
|| readable > 1 && header[1] != MAGIC_LOW) {
//数据的起始位置不是header
int length = header.length;
if (header.length < readable) {
header = Bytes.copyOf(header, readable);
//读取完整数据
buffer.readBytes(header, length, readable - length);
}
//遍历每一位检查
for (int i = 1; i < header.length - 1; i ++) {
if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
//检查到magic
buffer.readerIndex(buffer.readerIndex() - header.length + i);
header = Bytes.copyOf(header, i);
break;
}
}
return super.decode(channel, buffer, readable, header);
}
// check length.数据长度不够继续等待读取数据
if (readable < HEADER_LENGTH) {
return DecodeResult.NEED_MORE_INPUT;
}
// get data length.消息体长度
int len = Bytes.bytes2int(header, 12);
checkPayload(channel, len);
int tt = len + HEADER_LENGTH;
if( readable < tt ) {
return DecodeResult.NEED_MORE_INPUT;
}
// limit input stream.
ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);
try {
//body解析
return decodeBody(channel, is, header);
} finally {
if (is.available() > 0) {
try {
if (logger.isWarnEnabled()) {
logger.warn("Skip input stream " + is.available());
}
StreamUtils.skipUnusedStream(is);
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
}

3.3 response编码与解码

response编码与request编码类似,区别有:

  1. request编码调用ExchangeCodec.encodeRequest – > DubboCodec.encodeRequestData,response编码调用ExchangeCodec.encodeResponse – > DubboCodec.encodeResponseData
  2. 上面介绍的body消息头的 第24~31(status):状态位,设置请求响应状态,request为空,response才有值
  3. 如果response消息发送异常,则继续发送失败信息给Consumer

response解码与request解码基本一致。

4 前面文章中两个疑问的解答

1.在之前的一篇文章中【65 有错误】Dubbo 2.5源码分析——远程服务引用(向Netty Server发送请求)有提到这样一个问题:不同的请求如何能和相应的provider返回结果对应上?

答:这个问题是通过Dubbo消息头中的requestID解决的,客户端的每一个request,都对应一个requestID,并封装在消息中;消息返回时,也会带上requestID,这样就能对应上了。

2.在之前的文章中【64 有错误】Dubbo 2.5源码分析———Netty在provider和consumer中的应用有提到这样一个问题:有一点不太明白:DecodeHandler.received方法的作用?不是在bootstrap.setPipelineFactory中设置了编解码类了吗?为什么这里又解码了一次?

答:我们知道接收到的数据类型,不是Response就是Request,消息体的内容是存放在Request.mData或者Response.mResult中的,mData是DecodeableRpcInvocation类型;mResult是DecodeableRpcResult类型。DecodeableRpcInvocation或DecodeableRpcResult中都有一个hasDecoded变量,用于记录消息体是否解码。

DecodeableRpcInvocation和DecodeableRpcResult都实现了Decodeable接口,这两个类都有解码的功能。

在pipeline中设置的解码类和DecodeHandler类功能上有重合的地方。DecodeHandler的作用是对消息体做解码操作,pipeline中的解码类既可以对消息头解码,也可以对消息体解码。我们发现这两个类都有对消息体解码的作用,那到底在pipeline中对消息体进行解码还是在DecodeHandler中对消息体进行解码呢?

如果观察DubboCodec.decodeBody就会发现,到底在哪对消息体进行解码,要看DECODE_IN_IO_THREAD_KEY变量与DEFAULT_DECODE_IN_IO_THREAD变量的设置,默认情况下是在pipeline中设置的解码类中进行消息体的解码,DecodeHandler会判断DecodeableRpcInvocation或DecodeableRpcResult的hasDecoded变量是否为true,如果未解码,则再对DecodeableRpcInvocation或DecodeableRpcResult进行解码。
在这里插入图片描述
关于消息体的解码,是调用DecodeableRpcResult.decode或DecodeableRpcInvocation.decode方法,DecodeableRpcResult.decode方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// DecodeableRpcResult.decode
public Object decode(Channel channel, InputStream input) throws IOException {
ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
.deserialize(channel.getUrl(), input);
byte flag = in.readByte();
switch (flag) {
// 返回值为null,不做处理
case DubboCodec.RESPONSE_NULL_VALUE:
break;
// 返回值是正常值
case DubboCodec.RESPONSE_VALUE:
try {
Type[] returnType = RpcUtils.getReturnTypes(invocation);
setValue(returnType == null || returnType.length == 0 ? in.readObject() :
(returnType.length == 1 ? in.readObject((Class<?>) returnType[0])
: in.readObject((Class<?>) returnType[0], returnType[1])));
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read response data failed.", e));
}
break;
// 返回异常
case DubboCodec.RESPONSE_WITH_EXCEPTION:
try {
Object obj = in.readObject();
if (obj instanceof Throwable == false)
throw new IOException("Response data error, expect Throwable, but get " + obj);
setException((Throwable) obj);
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read response data failed.", e));
}
break;
default:
throw new IOException("Unknown result flag, expect '0' '1' '2', get " + flag);
}
return this;
}

对不同的消息体类型做不同的处理,有三种类型,分别是:

  1. RESPONSE_WITH_EXCEPTION:消息体的是个异常,调用父类的setException方法,设置exception变量值
  2. RESPONSE_NULL_VALUE:消息体为null,不做处理
  3. RESPONSE_VALUE:消息体是个正常值,将该值转化为对象,并调用父类的setValue方法,这是value变量值
  4. 否则抛出IOException异常