上一篇文章中分析了ReferenceConfig.createProxy方法,分析了如果是本地服务引用Invoker的获取方式。这篇文章来分析远程服务引用Invoker的获取方式及Invoker代理类的生成。

再次贴出ReferenceConfig.createProxy代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// ReferenceConfig.createProxy
private T createProxy(Map<String, String> map) {
URL tmpUrl = new URL("temp", "localhost", 0, map);
final boolean isJvmRefer;
...
if (isJvmRefer) {// 如果是本地引用
..
} else {
if (url != null && url.length() > 0) { // 用户指定URL,指定的URL可能是对点对直连地址,也可能是注册中心URL
String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (url.getPath() == null || url.getPath().length() == 0) {
url = url.setPath(interfaceName);
}
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
} else {
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else { // 通过注册中心配置拼装URL
List<URL> us = loadRegistries(false);
if (us != null && us.size() > 0) {
for (URL u : us) {
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls == null || urls.size() == 0) {
...
}
}
if (urls.size() == 1) {
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // 用了最后一个registry url
}
}
if (registryURL != null) { // 有 注册中心协议的URL
// 对有注册中心的Cluster 只用 AvailableCluster
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // 不是 注册中心的URL
invoker = cluster.join(new StaticDirectory(invokers));
}
}
}
Boolean c = check;
if (c == null && consumer != null) {
c = consumer.isCheck();
}
if (c == null) {
c = true; // default true
}
...
// 创建服务代理
return (T) proxyFactory.getProxy(invoker);
}

1 获取invoker

首先是对服务直连的处理,dubbo可以直接指定服务提供者地址,而不需要通过注册中心,如通过下面的配置直接指定url:

1
2
<dubbo:reference id="xxxService"
interface="com.alibaba.xxx.XxxService" url="dubbo://localhost:20890" />

然后再通过注册中心获取服务提供者url,loadRegistries(false)是获取dubbo:registry标签的内容。获取注册中心信息后,执行下面的代码:

1
2
3
4
// ReferenceConfig.createProxy
if (urls.size() == 1) {
invoker = refprotocol.refer(interfaceClass, urls.get(0));
}

最终是执行RegistryProtocol.refer方法,代码如下啊:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// RegistryProtocol.refer
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
// 获取zookeeper连接
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// group="a,b" or group="*"
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
String group = qs.get(Constants.GROUP_KEY);
if (group != null && group.length() > 0) {
if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
|| "*".equals(group)) {
return doRefer(getMergeableCluster(), registry, type, url);
}
}
return doRefer(cluster, registry, type, url);
}

registryFactory.getRegistry方法前面的文章已经分析过了,就是通过zkClient获取一个zookeeper连接。在上面的代码中,还对分组聚合做了一些处理,如果是分组聚合走doRefer(getMergeableCluster(), registry, type, url)代码,否则走doRefer(cluster, registry, type, url),下面来看不走分组聚合的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// RegistryProtocol.doRefer
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
}
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
return cluster.join(directory);
}

在doRefer,主要做了以下三件事:

  1. 向zookeeper注册消费
  2. 订阅zookeeper节点,添加监听器,触发监听器
  3. 集群相关处理 cluster.join(暂不详细分析)

2 注册与订阅

前两件事的处理过程我们之前分析过,看下向zookeeper注册了哪些节点,debug发现向zookeeper中注册了下面的节点:

1
2
3
4
/dubbo
/com.alibaba.dubbo.demo.DemoService
/consumers
/consumer%3A%2F%2F192.168.43.139%2Fcom.alibaba.dubbo.demo.DemoService%3Fapplication%3Ddemo-consumer%26category%3Dconsumers%26check%3Dfalse%26dubbo%3D2.0.0%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D2420%26side%3Dconsumer%26timestamp%3D1548062659119

前三个是持久化节点,最后一个为临时节点。

再看下订阅了哪些节点,debug发现订阅的节点如下3个节点:

1
2
3
1./dubbo/com.alibaba.dubbo.demo.DemoService/providers
2./dubbo/com.alibaba.dubbo.demo.DemoService/configurators
3./dubbo/com.alibaba.dubbo.demo.DemoService/routers

3 触发监听器

触发监听器是在订阅完成之后做的,我们来看下监听器的触发流程:
在这里插入图片描述监听器的触发过程中做了一个非常重要的事情——获取远程Invoker。RegistryDirectory类中使用urlInvokerMap缓存service url与远程Invoker之间的对应关系,RegistryDirectory.notify方法的一个重要作用就是查看service url对应的Invoker是否获取到,如果未获取到,创建一个DubboInvoker(包含netty client),然后保存到缓存中。

之前已经从zookeeper中获取了服务提供者的地址,通过地址获取一个netty client。下面是远程netty client获取及DubboInvoker生成的时序图:
在这里插入图片描述

4 proxyFactory.getProxy生成代理类

获取到Invoker后,生成的Invoker代理类,该代理类实现DemoService接口,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.alibaba.dubbo.common.bytecode;
import com.alibaba.dubbo.demo.DemoService;
import com.alibaba.dubbo.rpc.service.EchoService;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
public class proxy0 implements ClassGenerator.DC, EchoService, DemoService {
// methods包含proxy0实现的所有接口方法(去重)
public static Method[] methods;
private InvocationHandler handler;
public String sayHello(String arg0) {
Object[] args = new Object[1];
args[0] = arg0;
Object localObject = this.handler.invoke(this, methods[0], args);
return (String)localObject;
}
public Object $echo(Object arg0) {
Object[] args = new Object[1];
args[0] = arg0;
Object localObject = this.handler.invoke(this, methods[1], args);
return (Object)localObject;
}
public proxy0() {
}
public proxy0(InvocationHandler paramInvocationHandler) {
this.handler = paramInvocationHandler;
}

我们发现代理类中有一个对象是InvocationHandler,sayHello方法调用的就是java.lang.reflect.InvocationHandler的Invoker方法。我们来看在创建代理类时,java.lang.reflect.InvocationHandler传的是什么。

1
2
3
4
5
JavassistProxtFactory{
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class InvokerInvocationHandler implements InvocationHandler {
private final Invoker<?> invoker;
public InvokerInvocationHandler(Invoker<?> handler) {
this.invoker = handler;
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
}

com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler实现了java.lang.reflect.InvocationHandler接口,并封装了com.alibaba.dubbo.rpc.Invoker,最终调用的是com.alibaba.dubbo.rpc.Invoker.invoker()。

我们知道封装的Invoker是一个DubboInvoker实例。DubboInvoker继承AbstractInvoker类,调用链如下:

在这里插入图片描述下面来看DubboInvoker的doInvoker方法,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
// 获取netty client,之前就取得了连接
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout);
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
...
}

5 总结

本文分析了远程服务引用的整体过程,内容包括:

  1. 获取服务提供者netty服务器的连接
  2. 消费端,接口代理类的创建过程
  3. 消费端,代理类调用服务提供者的部分过程