1 前言

很久之前这篇文章【50】Dubbo 2.5源码分析——远程服务引用分析了远程的服务调用的过程,看到了DubboInvoker的doInvoker方法,这篇文章直接往下看。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// DubboInvoker.doInvoker
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (isOneway) { // 只发送请求
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) { // 异步返回结果
ResponseFuture future = currentClient.request(inv, timeout);
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else { // 请求异步,返回结果同步
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
...
}

RpcInvocation类用来封装调用远程服务的哪个方法、方法需要的参数以及invoker,还可以通过setAttachment方法设置当前请求的附加信息。ExchangeClient 为一个netty服务器的客户端,是之前获取到的。

后半部分的代码就是用来向provider发送请求,有三种方式:

  1. isOneway:只发送请求,client不接收provider返回值
  2. isAsync:异步,client可以接收provider返回值
  3. others:异步转同步,client接收provider返回值,并返回

从方法的名称也能看出来,isOneway是currentClient.send,仅仅是把消息发出去;isAsync是currentClient.request,发送一个请求,会有返回值。

关于isAsync,如果来设置为true,则接收provider返回值的方式有点不一样。将isAsync设置为true,如下:

1
2
3
<dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService">
<dubbo:method name="sayHello" async="true"></dubbo:method>
</dubbo:reference>

接收结果如下:

1
2
Futrue<String> temp = RpcContext.getContext().getFuture();
temp.get();

这里有个问题,不同的请求如何能和相应的provider返回结果对应上?

关于isOneway,可以通过如下方式设置:

1
2
3
<dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService">
<dubbo:method name="sayHello" return="false"></dubbo:method>
</dubbo:reference>

2 isOneway方式,ExchangeClient.send

如果设置了isOneway方式,调用远程sayHello无返回值。

1
2
3
4
5
6
7
// DubboInvoker.doInvoker 代码片段
if (isOneway) { // 只发送请求
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
...

在上面的代码片段中,还获取了另外一个可配置参数sent,用于表示是否等到消息发出再返回,默认情况下,sent为false。下面是请求发送的调用链:
在这里插入图片描述在HeaderExchangeChannel.send方法中对message进行了简单的封装,将message封装成三种类型:Request、Response、或者String。debug发现message的类型为RpcInvocation。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// HeaderExchangeChannel.send
public void send(Object message, boolean sent) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The channel " + this + " is closed!");
}
if (message instanceof Request
|| message instanceof Response
|| message instanceof String) {
channel.send(message, sent);
} else {
Request request = new Request();
request.setVersion("2.0.0");
request.setTwoWay(false);
request.setData(message);
channel.send(request, sent);
}
}

在NettyChannel的send方法中,向Netty服务器发送消息,如果sent为true,则阻塞当前线程,成功后再返回,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// NettyChannel.send
public void send(Object message, boolean sent) throws RemotingException {
super.send(message, sent);
boolean success = true;
int timeout = 0;
try {
// 向netty服务器发送消息
ChannelFuture future = channel.write(message);
if (sent) {
timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// 等待消息发送成功
success = future.await(timeout);
}
...
} catch (Throwable e) {
...
}

3 isAsync方式,ExchangeClient.request

下面是请求发送的调用链:
在这里插入图片描述
同样是在HeaderExchangeChannel类中对message进行了封装,debug发现message的类型为RpcInvocation,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// HeaderExchangeChannel.request
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion("2.0.0");
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}

HeaderExchangeChannel.send方法和request方法有明显的不同,request方法中使用一个future对返回结果进行接收。并且request方法有返回值,send方法没有返回值。后面的内容和isOneway方式一样。

4 异步转同步方式

这里所说的异步是netty的异步,同步是指消费者远程调用服务同步等待结果。

1
2
3
4
5
6
7
// DubboInvoker.doInvoker
...
} else { // 请求异步,返回结果同步
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
...

在isAsync方式中,虽然使用了一个future对返回结果进行接收,但并未调用ResponseFuture的get方法,直接返回了一个new RpcResult。但是isAsync方式会将future放到RpcContext中,只可以直接从RpcContext中获取future,进而得到远程调用结果。

异步转同步方式中直接调用了get方法,等待获取远程调用返回的结果,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// DefaultFuture.get
public Object get(int timeout) throws RemotingException {
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
}
if (!isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
while (!isDone()) {
done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
if (!isDone()) {
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
return returnFromResponse();
}

如果远程调用结果response没有返回,则阻塞当前线程进行等待。当该线程被唤醒后,依然没有返回结果,则再次阻塞当前线程,直到塞时间超过设置的超时时间还没有收到结果,抛出TimeoutException。

dubbo异步转同步的相关类,如SimpleFuture、DefaultFuture等为啥以Futrue结尾呢。仔细比对 java.util.concurrent 包下的Future接口及相关类,就会发现两者实现了相近的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// dubbo
public interface ResponseFuture {
/**
* get result.
*
* @return result.
*/
Object get() throws RemotingException;
/**
* get result with the specified timeout.
*
* @param timeoutInMillis timeout.
* @return result.
*/
Object get(int timeoutInMillis) throws RemotingException;
/**
* set callback.
*
* @param callback
*/
void setCallback(ResponseCallback callback);
/**
* check is done.
*
* @return done or not.
*/
boolean isDone();
}
1
2
3
4
5
6
7
8
9
// java.util.concurrent
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}