note:很早之前的一篇文章【50】Dubbo 2.5源码分析——远程服务引用粗略的分析远程服务引用的过程。这篇文章是对那一篇文章内容的补充,补充的内容包括:路由配置处理、动态配置处理、集群容错处理、负载均衡处理,当然了,按照我一贯的作风,只大致分析处理的过程,不做详细分析。

1 写在前面

因为要分析集群相关内容,所以我启动了两个provider的实例,端口分析是20880和20881。启动一个consumer,轮询调用两个provider实例,如下:
在这里插入图片描述
来看zookeeper的节点信息:
在这里插入图片描述
能够很清楚的看到provider有两个,下面对dubbo的节点再做一个解释:

  1. consumers:消费者目录
  2. configurators:使用dubbo-admin设置的动态配置信息
  3. routers:使用dubbo-admin设置的动态路由信息
  4. providers:服务提供者

就消费者来说,远程服务调用时,会订阅providers,routers,configurators三个zookeeper节点,也就是消费者会监听这三个节点,这三个节点的任何改变都会引起消费者监听器的注意。

也就是下面这张Dubbo架构图中的2.subscribe和3.notify。
在这里插入图片描述
如果同一个接口服务提供者有多个,也就是集群的情况下,需要考虑的问题还要再复杂点。例如:如何通过路由规则适配合适的provider?如果一个provider访问出错,应该怎么办?怎么做负载均衡?

这篇文章就是对以上问题的简略分析,下面开始吧。

2 源码分析

还是要从上一篇文章的RegistryProtocol.doRefer方法讲起:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// RegistryProtocol.doRefer
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
}
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
return cluster.join(directory);
}

2.1 RegistryDirectory类

该类实现Directory接口,这个类的作用就是解决一个服务有多个provider的问题,该类使用methodInvokerMap变量记录一个远程方法与多个提供者之间的关系。RegistryDirectory像一个目录一样,远程服务调用时,查找目录获取provider。如下图:
在这里插入图片描述
另外,RegistryDirectory类还实现了NotifyListener接口,所以当消费者订阅的三个zookeeper节点数据发生变化时,会调用RegistryDirectory类的notify方法。例如某一个provider停止了服务的提供,那么RegistryDirectory类的methodInvokerMap变量内容一定会变化。

从上一篇文章的分析我们知道当消费者完成zookeeper的订阅后,会第一次调用RegistryDirectory类的notify方法。

我们知道通过dubbo-admin后台可以动态配置路由,如下:
在这里插入图片描述
所以,如果sayHello方法有三个provider,但是配置了路由规则,过滤掉provider3,如果最终methodInvokerMap的内容只有两个

1
2
sayHello---->provider1
----->provider2

所以,在methodInvokerMap的内容是和路由规则相关的。下面来看RegistryDirectory.notify:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// RegistryDirectory.notify
public synchronized void notify(List<URL> urls) {
List<URL> invokerUrls = new ArrayList<URL>();
List<URL> routerUrls = new ArrayList<URL>();
List<URL> configuratorUrls = new ArrayList<URL>();
for (URL url : urls) {
String protocol = url.getProtocol();
String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
if (Constants.ROUTERS_CATEGORY.equals(category)
|| Constants.ROUTE_PROTOCOL.equals(protocol)) {
routerUrls.add(url);
} else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
|| Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
configuratorUrls.add(url);
} else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
invokerUrls.add(url);
} else {
logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
}
}
// configurators
if (configuratorUrls != null && configuratorUrls.size() > 0) {
this.configurators = toConfigurators(configuratorUrls);
}
// routers
if (routerUrls != null && routerUrls.size() > 0) {
List<Router> routers = toRouters(routerUrls);
if (routers != null) { // null - do nothing
setRouters(routers);
}
}
List<Configurator> localConfigurators = this.configurators; // local reference
// 合并override参数
this.overrideDirectoryUrl = directoryUrl;
if (localConfigurators != null && localConfigurators.size() > 0) {
for (Configurator configurator : localConfigurators) {
this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
}
}
// providers
refreshInvoker(invokerUrls);
}

主要是对订阅的三个节点进行处理,methodInvokerMap变量内容的变化在refreshInvoker方法中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// RegistryDirectory.refreshInvoker
private void refreshInvoker(List<URL> invokerUrls) {
if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
&& Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
this.forbidden = true; // 禁止访问
this.methodInvokerMap = null; // 置空列表
destroyAllInvokers(); // 关闭所有Invoker
} else {
...
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// 将URL列表转成Invoker列表
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // 换方法名映射Invoker列表
...
}
}

debug发现newUrlInvokerMap变量内容就是我们两个provider与对应的Invoker,如下:
在这里插入图片描述
toMethodInvokers方法对methodInvokerMap变量内容进行填充,如下:
在这里插入图片描述2.2 集群容错

RegistryProtocol.doRefer方法的最后一行代码return cluster.join(directory),cluster.join是干嘛的?

我们知道directory中对应多个provider,但是RegistryProtocol.doRefer方法的返回值只是一个Invoker,所以可以大胆的推测cluster.join方法中肯定对多个Invoker进行了选择(进行了负载均衡)

Cluster是一个SPI接口,默认实现类是FailoverCluster,如下:

1
2
3
4
5
6
7
8
9
public class FailoverCluster implements Cluster {
public final static String NAME = "failover";
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new FailoverClusterInvoker<T>(directory);
}
}

其中join方法返回了一个FailoverClusterInvoker实例,FailoverClusterInvoker实现了Invoker接口。所以join方法的作用就是从Directory返回一个适配的Invoker。

在上一篇文章中画的时序图[链接]少画了一些步骤,应该是先调用MockClusterInvoker的invoke和doMockInvoke方法,再调用AbstractClusterInvoker的Invoke方法,再调用FailoverClusterInvoker的doInvoker方法,再调用AbstractInvoker的invoke方法,再调用DubboInvoker的doInvoker方法。如下:
在这里插入图片描述

为什么会先调用MockClusterInvoker?
因为MockClusterInvoker是一个wrapper类,构造函数有一个Invoker类型的参数

FailoverClusterInvoker.doInvoker方法做了一下两件事:

  1. 集群容错方案实现
  2. 负载均衡实现

从FailoverClusterInvoker类的名称了我们能够推断,FailoverClusterInvoker类的集群容错方案是失败转移(出现失败,重试其它服务器)。其他的容错方案如下:
在这里插入图片描述FailoverClusterInvoker类如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
...
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyinvokers = invokers;
checkInvokers(copyinvokers, invocation);
int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// retry loop.
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
//重试时,进行重新选择,避免重试时invoker列表已发生变化.
//注意:如果列表发生了变化,那么invoked判断会失效,因为invoker示例已经改变
if (i > 0) {
checkWhetherDestroyed();
copyinvokers = list(invocation);
//重新检查一下
checkInvokers(copyinvokers, invocation);
}
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
Result result = invoker.invoke(invocation);
if (le != null && logger.isWarnEnabled()) {
...
}
return result;
} catch (RpcException e) {
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
throw new RpcException(le != null ? le.getCode() ...
}
}

容错的方案是当一个provider调用失败,尝试其他provider,默认重试次数为3次。

再看FailfastClusterInvoker的容错解决方案:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class FailfastClusterInvoker<T> extends AbstractClusterInvoker<T> {
public FailfastClusterInvoker(Directory<T> directory) {
super(directory);
}
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
checkInvokers(invokers, invocation);
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
try {
return invoker.invoke(invocation);
} catch (Throwable e) {
if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.
throw (RpcException) e;
}
throw new RpcException...
}
}
}

对fastfail比较熟悉,和HashMap源码中的意思类似。

其他容错方案不做分析

2.3 负载均衡

不论是在FailoverClusterInvoker的doInvoker方法中,还是在FailfastClusterInvoker的doInvoker方法中,都调用了select方法,select方法再调用doSelect方法,作用就是进行负载均衡,挑选出一个合适的Invoker。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// AbstractClusterInvoker.doselect
private Invoker<T> doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
if (invokers == null || invokers.size() == 0)
return null;
if (invokers.size() == 1)
return invokers.get(0);
// 如果只有两个invoker,退化成轮循
if (invokers.size() == 2 && selected != null && selected.size() > 0) {
return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0);
}
// 负载均衡处理
Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
...

所有负载均衡算法实现LoadBalance接口及select方法,select方法即为算法的具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@SPI(RandomLoadBalance.NAME)
public interface LoadBalance {
/**
* select one invoker in list.
*
* @param invokers invokers.
* @param url refer url
* @param invocation invocation.
* @return selected invoker.
*/
@Adaptive("loadbalance")
<T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
}

dubbo默认的负载均衡算法是RandomLoadBalance。实现类如下:
在这里插入图片描述
正是dubbo文档中所说的四种负载均衡算法:随机、轮循、最少活跃调用数和一致性hash,下面就不对具体算法的实现再做分析。

就写这么多吧,结束。