Dubbo(三):深入理解Dubbo源码之如何将服务发布到注册中心,
一、前言
前面有说到Dubbo的服务发现机制,也就是SPI,那既然Dubbo内部实现了更加强大的服务发现机制,现在我们就来一起看看Dubbo在发现服务后需要做什么才能将服务注册到注册中心中。
二、Dubbo服务注册简介
首先需要明白的是Dubbo是依赖于Spring容器的(至于为什么在上篇博客中有介绍),Dubbo服务注册过程也是始于Spring容器发布刷新事件。而后Dubbo在接收到事件后,就会进行服务注册,整个逻辑大致分为三个部分:
1、检查参数,组装URL:服务消费方是通过URL拿到服务提供者的,所以我们需要为服务提供者配置好对应的URL。
2、导出服务到本地和远程:这里的本地指的是JVM,远程指的是实现invoke,使得服务消费方能够通过invoke调用到服务。
3、向注册中心注册服务:能够让服务消费方知道服务提供方提供了那个服务。
三、接收Spring容器刷新事件
在简介中我们提到Dubbo服务注册是始于Spring容器发布刷新事件,那么Dubbo是如何接收该事件的呢?
在我们平常编写provider的接口实现类时,都会打上@Service注解,从而这个标注这个类属于ServiceBean。在ServiceBean中有这样一个方法onApplicationEvent。该方法会在收到 Spring 上下文刷新事件后执行服务注册操作

1 public void onApplicationEvent(ContextRefreshedEvent event) {
2 //是否已导出 && 是不是已被取消导出
3 if (!
this.isExported() && !
this.isUnexported()) {
4 if (logger.isInfoEnabled()) {
5 logger.info("The service ready on spring started. service: " +
this.getInterface());
6 }
7
8 this.export();
9 }
10
11 }
View Code
注意这里是2.7.3的Dubbo,接收Spring上下文刷新事件已经不需要设置延迟导出,而是在导出的时候检查配置再决定是否需要延时,所以只有两个判断。而在2.6.x版本的Dubbo存在着isDelay的判断。这个是判断服务是否延时导出。这里说个题外话2.6.x的版本是com.alibaba.dubbo的,而2.7.x是org.apache.dubbo的,而2.7.0也开始代表dubbo从Apache里毕业了。
在这里就是Dubbo服务导出到注册中心过程的起点。需要我们在服务接口实现类上打上@Service。ServiceBean是Dubbo与Spring 框架进行整合的关键,可以看做是两个框架之间的桥梁。具有同样作用的类还有ReferenceBean。
四、检查配置参数以及URL装配
1、检查配置
在这一阶段Dubbo需要检查用户的配置是否合理,或者为用户补充缺省配置。就是从刷新事件开始,进入export()方法,源码解析如下:

1 public void export() {
2 super.export();
3 this.publishExportEvent();
4 }
5
6 //进入到ServiceConfig.class中的export。
7
8 public synchronized void export() {
9 //检查并且更新配置
10 this.checkAndUpdateSubConfigs();
11 //是否需要导出
12 if (
this.shouldExport()) {
13 //是否需要延时
14 if (
this.shouldDelay()) {
15 DELAY_EXPORT_EXECUTOR.schedule(
this::doExport, (
long)
this.getDelay(), TimeUnit.MILLISECONDS);
16 }
else {
17 //立刻导出
18 this.doExport();
19 }
20
21 }
22 }
23
24 //进入checkAndUpdateSubConfigs。
25
26 public void checkAndUpdateSubConfigs() {
27 //检查配置项包括provider是否存在,导出端口是否可用,注册中心是否可以连接等等
28 this.completeCompoundConfigs();
29 this.startConfigCenter();
30 this.checkDefault();
31 this.checkProtocol();
32 this.checkApplication();
33 if (!
this.isOnlyInJvm()) {
34 this.checkRegistry();
35 }
36 //检查接口内部方法是否不为空
37 this.refresh();
38 this.checkMetadataReport();
39 if (StringUtils.isEmpty(
this.interfaceName)) {
40 throw new IllegalStateException("<dubbo:service interface=\"\" /> interface not allow null!"
);
41 }
else {
42 if (
this.ref
instanceof GenericService) {
43 this.interfaceClass = GenericService.
class;
44 if (StringUtils.isEmpty(
this.generic)) {
45 this.generic =
Boolean.TRUE.toString();
46 }
47 }
else {
48 try {
49 this.interfaceClass = Class.forName(
this.interfaceName,
true, Thread.currentThread().getContextClassLoader());
50 }
catch (ClassNotFoundException var5) {
51 throw new IllegalStateException(var5.getMessage(), var5);
52 }
53
54 this.checkInterfaceAndMethods(
this.interfaceClass,
this.methods);
55 this.checkRef();
56 this.generic =
Boolean.FALSE.toString();
57 }
58 //是否需要导出服务或者只是在本地运行测试
59 Class stubClass;
60 if (
this.local !=
null) {
61 if ("true".equals(
this.local)) {
62 this.local =
this.interfaceName + "Local"
;
63 }
64
65 try {
66 stubClass = ClassUtils.forNameWithThreadContextClassLoader(
this.local);
67 }
catch (ClassNotFoundException var4) {
68 throw new IllegalStateException(var4.getMessage(), var4);
69 }
70
71 if (!
this.interfaceClass.isAssignableFrom(stubClass)) {
72 throw new IllegalStateException("The local implementation class " + stubClass.getName() + " not implement interface " +
this.interfaceName);
73 }
74 }
75
76 if (
this.stub !=
null) {
77 if ("true".equals(
this.stub)) {
78 this.stub =
this.interfaceName + "Stub"
;
79 }
80
81 try {
82 stubClass = ClassUtils.forNameWithThreadContextClassLoader(
this.stub);
83 }
catch (ClassNotFoundException var3) {
84 throw new IllegalStateException(var3.getMessage(), var3);
85 }
86
87 if (!
this.interfaceClass.isAssignableFrom(stubClass)) {
88 throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface " +
this.interfaceName);
89 }
90 }
91
92 this.checkStubAndLocal(
this.interfaceClass);
93 this.checkMock(
this.interfaceClass);
94 }
95 }
View Code
上面的源码分析可看出。export方法主要检查的配置项有@Service标签的类是否属性合法。服务提供者是否存在,是否有对应的Application启动,端口是否能连接,是否有对应的注册中心等等一些配置,在检查完这些配置后Dubbo会识别我们此次启动服务是想在本地启动进行一些调试,还是将服务暴露给别人。不想暴露出去可以进行配置
1 <dubbo:provider export="false" />
2、URL装配
在Dubbo中的URL一般包括以下字段:protocol,host,port,path,parameters。在检查配置后会进入到doExport中。
protocol:就是URL最前面的字段,表示的是协议,一般是:dubbo thrift http zk
host.port:就是对应的IP地址和端口
path:接口名称
parameters:参数键值对

1 protected synchronized void doExport() {
2 if (
this.unexported) {
3 throw new IllegalStateException("The service " +
this.interfaceClass.getName() + " has already unexported!"
);
4 }
else if (!
this.exported) {
5 this.exported =
true;
6 if (StringUtils.isEmpty(
this.path)) {
7 this.path =
this.interfaceName;
8 }
9
10 this.doExportUrls();
11 }
12 }
13
14 //进入到doExportUrls
15 private void doExportUrls() {
16 //加载注册中心链接
17 List<URL> registryURLs =
this.loadRegistries(
true);
18 //使用遍历器遍历protocols,并在每个协议下导出服务
19 Iterator var2 =
this.protocols.iterator();
20
21 while(var2.hasNext()) {
22 ProtocolConfig protocolConfig =
(ProtocolConfig)var2.next();
23 String pathKey = URL.buildKey((String)
this.getContextPath(protocolConfig).map((p) ->
{
24 return p + "/" +
this.path;
25 }).orElse(
this.path),
this.group,
this.version);
26 ProviderModel providerModel =
new ProviderModel(pathKey,
this.ref,
this.interfaceClass);
27 ApplicationModel.initProviderModel(pathKey, providerModel);
28 this.doExportUrlsFor1Protocol(protocolConfig, registryURLs);
29 }
30
31 }
32
33 //进入到加载注册中心链接的方法
34
35 protected List<URL> loadRegistries(
boolean provider) {
36 List<URL> registryList =
new ArrayList();
37 if (CollectionUtils.isNotEmpty(
this.registries)) {
38 Iterator var3 =
this.registries.iterator();
39 //循环的从注册链表中拿取地址及配置
40 label47:
41 while(
true) {
42 RegistryConfig config;
43 String address;
44 do {
45 if (!
var3.hasNext()) {
46 return registryList;
47 }
48
49 config =
(RegistryConfig)var3.next();
50 address =
config.getAddress();
51 //address为空就默认为0.0.0.0
52 if (StringUtils.isEmpty(address)) {
53 address = "0.0.0.0"
;
54 }
55 }
while("N/A"
.equalsIgnoreCase(address));
56
57 Map<String, String> map =
new HashMap();
58 // 添加 ApplicationConfig 中的字段信息到 map 中
59 appendParameters(map,
this.application);
60 // 添加 RegistryConfig 字段信息到 map 中
61 appendParameters(map, config);
62 // 添加 path,protocol 等信息到 map 中
63 map.put("path", RegistryService.
class.getName());
64 appendRuntimeParameters(map);
65 if (!map.containsKey("protocol"
)) {
66 map.put("protocol", "dubbo"
);
67 }
68 // 解析得到 URL 列表,address 可能包含多个注册中心 ip,
69 // 因此解析得到的是一个 URL 列表
70 List<URL> urls =
UrlUtils.parseURLs(address, map);
71 Iterator var8 =
urls.iterator();
72
73 while(
true) {
74 URL url;
75 do {
76 if (!
var8.hasNext()) {
77 continue label47;
78 }
79
80 url =
(URL)var8.next();
81 //// 将 URL 协议头设置为 registry
82 url = URLBuilder.from(url).addParameter("registry", url.getProtocol()).setProtocol("registry"
).build();
83 // 通过判断条件,决定是否添加 url 到 registryList 中,条件如下:
84 // (服务提供者 && register = true 或 null) || (非服务提供者 && subscribe = true 或 null)
85 }
while((!provider || !url.getParameter("register",
true)) && (provider || !url.getParameter("subscribe",
true)));
86
87 //添加url到registryList中
88 registryList.add(url);
89 }
90 }
91 }
else {
92 return registryList;
93 }
94 }
View Code
loadRegistries方法主要包含如下的逻辑:
1、构建参数映射集合,也就是 map
2、构建注册中心链接列表
3、遍历链接列表,并根据条件决定是否将其添加到 registryList 中
实际上因为Dubbo现如今支持很多注册中心,所以对于一些注册中心的URL也要进行遍历构建。这里是生成注册中心的URL。还未生成Dubbo服务的URL。比如说使用的是Zookeeper注册中心,可能从loadRegistries中拿到的就是:
registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.7.3&pid=1528&qos.port=22222®istry=zookeeper×tamp=1530743640901
这种类型的URL,表示这是一个注册协议,现在可以根据这个URL定位到注册中心去了。服务接口是RegistryService,registry的类型为zookeeper。可是我们还未生成Dubbo服务提供方的URL所以接着看下面代码
然后进行到doExportUrlsFor1Protocol(装配Dubbo服务的URL并且实行发布)

1 private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL>
registryURLs) {
2 //首先是将一些信息,比如版本、时间戳、方法名以及各种配置对象的字段信息放入到 map 中
3 //map 中的内容将作为 URL 的查询字符串。构建好 map 后,紧接着是获取上下文路径、主机名以及端口号等信息。
4 //最后将 map 和主机名等数据传给 URL 构造方法创建 URL 对象。需要注意的是,这里出现的 URL 并非 java.net.URL,而是 com.alibaba.dubbo.common.URL。
5 String name =
protocolConfig.getName();
6 // 如果协议名为空,或空串,则将协议名变量设置为 dubbo
7 if (StringUtils.isEmpty(name)) {
8 name = "dubbo"
;
9 }
10
11 Map<String, String> map =
new HashMap();
12 // 添加 side、版本、时间戳以及进程号等信息到 map 中
13 map.put("side", "provider"
);
14 appendRuntimeParameters(map);
15 // 通过反射将对象的字段信息添加到 map 中
16 appendParameters(map,
this.metrics);
17 appendParameters(map,
this.application);
18 appendParameters(map,
this.module);
19 appendParameters(map,
this.provider);
20 appendParameters(map, protocolConfig);
21 appendParameters(map,
this);
22 String scope;
23 Iterator metadataReportService;
24 // methods 为 MethodConfig 集合,MethodConfig 中存储了 <dubbo:method> 标签的配置信息
25 if (CollectionUtils.isNotEmpty(
this.methods)) {
26 Iterator var5 =
this.methods.iterator();
27 //检测 <dubbo:method> 标签中的配置信息,并将相关配置添加到 map 中
28 label166:
29 while(
true) {
30 MethodConfig method;
31 List arguments;
32 do {
33 if (!
var5.hasNext()) {
34 break label166;
35 }
36
37 method =
(MethodConfig)var5.next();
38 appendParameters(map, method, method.getName());
39 String retryKey = method.getName() + ".retry"
;
40 if (map.containsKey(retryKey)) {
41 scope =
(String)map.remove(retryKey);
42 if ("false"
.equals(scope)) {
43 map.put(method.getName() + ".retries", "0"
);
44 }
45 }
46
47 arguments =
method.getArguments();
48 }
while(!
CollectionUtils.isNotEmpty(arguments));
49
50 metadataReportService =
arguments.iterator();
51
52 while(
true) {
53 ArgumentConfig argument;
54 Method[] methods;
55 do {
56 do {
57 while(
true) {
58 if (!
metadataReportService.hasNext()) {
59 continue label166;
60 }
61
62 argument =
(ArgumentConfig)metadataReportService.next();
63 if (argument.getType() !=
null && argument.getType().length() > 0
) {
64 methods =
this.interfaceClass.getMethods();
65 break;
66 }
67
68 if (argument.getIndex() == -1
) {
69 throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>"
);
70 }
71
72 appendParameters(map, argument, method.getName() + "." +
argument.getIndex());
73 }
74 }
while(methods ==
null);
75 }
while(methods.length <= 0
);
76
77 for(
int i = 0; i < methods.length; ++
i) {
78 String methodName =
methods[i].getName();
79 if (methodName.equals(method.getName())) {
80 Class<?>[] argtypes =
methods[i].getParameterTypes();
81 if (argument.getIndex() != -1
) {
82 if (!
argtypes[argument.getIndex()].getName().equals(argument.getType())) {
83 throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" +
argument.getType());
84 }
85
86 appendParameters(map, argument, method.getName() + "." +
argument.getIndex());
87 }
else {
88 for(
int j = 0; j < argtypes.length; ++
j) {
89 Class<?> argclazz =
argtypes[j];
90 if (argclazz.getName().equals(argument.getType())) {
91 appendParameters(map, argument, method.getName() + "." +
j);
92 if (argument.getIndex() != -1 && argument.getIndex() !=
j) {
93 throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" +
argument.getType());
94 }
95 }
96 }
97 }
98 }
99 }
100 }
101 }
102 }
103
104 String host;
105 // 检测 generic 是否为 "true",并根据检测结果向 map 中添加不同的信息
106 if (ProtocolUtils.isGeneric(
this.generic)) {
107 map.put("generic",
this.generic);
108 map.put("methods", "*"
);
109 }
else {
110 host = Version.getVersion(
this.interfaceClass,
this.version);
111 if (host !=
null && host.length() > 0
) {
112 map.put("revision"
, host);
113 }
114 // 为接口生成包裹类 Wrapper,Wrapper 中包含了接口的详细信息,比如接口方法名数组,字段信息等
115 String[] methods = Wrapper.getWrapper(
this.interfaceClass).getMethodNames();
116 if (methods.length == 0
) {
117 logger.warn("No method found in service interface " +
this.interfaceClass.getName());
118 map.put("methods", "*"
);
119 }
else {
120 // 将逗号作为分隔符连接方法名,并将连接后的字符串放入 map 中
121 map.put("methods", StringUtils.join(
new HashSet(Arrays.asList(methods)), ","
));
122 }
123 }
124 // 添加 token 到 map 中
125 if (!ConfigUtils.isEmpty(
this.token)) {
126 if (ConfigUtils.isDefault(
this.token)) {
127 map.put("token"
, UUID.randomUUID().toString());
128 }
else {
129 map.put("token",
this.token);
130 }
131 }
132 //获取host和port
133 host =
this.findConfigedHosts(protocolConfig, registryURLs, map);
134 Integer port =
this.findConfigedPorts(protocolConfig, name, map);
135 // 获取上下文路径并且组装URL
136 URL url =
new URL(name, host, port, (String)
this.getContextPath(protocolConfig).map((p) ->
{
137 return p + "/" +
this.path;
138 }).orElse(
this.path), map);
139 if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.
class).hasExtension(url.getProtocol())) {
140 // 加载 ConfiguratorFactory,并生成 Configurator 实例,然后通过实例配置 url,使用了前面提到的SPI机制
141 url = ((ConfiguratorFactory)ExtensionLoader.getExtensionLoader(ConfiguratorFactory.
class).getExtension(url.getProtocol())).getConfigurator(url).configure(url);
142 }
143 //下面逻辑主要分三步
144 // 如果 scope = none,则什么都不做
145 // scope != remote,导出到本地
146 // scope != local,导出到远程
147 scope = url.getParameter("scope"
);
148 if (!"none"
.equalsIgnoreCase(scope)) {
149 if (!"remote"
.equalsIgnoreCase(scope)) {
150 this.exportLocal(url);
151 }
152
153 if (!"local"
.equalsIgnoreCase(scope)) {
154 if (!
this.isOnlyInJvm() &&
logger.isInfoEnabled()) {
155 logger.info("Export dubbo service " +
this.interfaceClass.getName() + " to url " +
url);
156 }
157
158 if (CollectionUtils.isNotEmpty(registryURLs)) {
159 metadataReportService =
registryURLs.iterator();
160
161 while(metadataReportService.hasNext()) {
162 URL registryURL =
(URL)metadataReportService.next();
163 if (!"injvm"
.equalsIgnoreCase(url.getProtocol())) {
164 url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"
));
165 URL monitorUrl =
this.loadMonitor(registryURL);
166 if (monitorUrl !=
null) {
167 url = url.addParameterAndEncoded("monitor"
, monitorUrl.toFullString());
168 }
169
170 if (logger.isInfoEnabled()) {
171 logger.info("Register dubbo service " +
this.interfaceClass.getName() + " url " + url + " to registry " +
registryURL);
172 }
173
174 String proxy = url.getParameter("proxy"
);
175 if (StringUtils.isNotEmpty(proxy)) {
176 registryURL = registryURL.addParameter("proxy"
, proxy);
177 }
178 // 为服务提供类(ref)生成 Invoker
179 Invoker<?> invoker = PROXY_FACTORY.getInvoker(
this.ref,
this.interfaceClass, registryURL.addParameterAndEncoded("export"
, url.toFullString()));
180 DelegateProviderMetaDataInvoker wrapperInvoker =
new DelegateProviderMetaDataInvoker(invoker,
this);
181 // 导出服务,并生成 Exporter
182 Exporter<?> exporter =
protocol.export(wrapperInvoker);
183 this.exporters.add(exporter);
184 }
185 }
186 // 不存在注册中心,仅导出服务
187 }
else {
188 Invoker<?> invoker = PROXY_FACTORY.getInvoker(
this.ref,
this.interfaceClass, url);
189 DelegateProviderMetaDataInvoker wrapperInvoker =
new DelegateProviderMetaDataInvoker(invoker,
this);
190 Exporter<?> exporter =
protocol.export(wrapperInvoker);
191 this.exporters.add(exporter);
192 }
193
194 metadataReportService =
null;
195 MetadataReportService metadataReportService;
196 if ((metadataReportService =
this.getMetadataReportService()) !=
null) {
197 metadataReportService.publishProvider(url);
198 }
199 }
200 }
201
202 this.urls.add(url);
203 }
View Code
上面的源码前半段是进行URL装配,这个URL就是Dubbo服务的URL,大致如下:
dubbo://192.168.1.6:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=192.168.1.6&bind.port=20880&dubbo=2.7.3&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=5744&qos.port=22222&side=provider×tamp=1530746052546
这个URL表示它是一个dubbo协议(DubboProtocol),地址是当前服务器的ip,端口是要暴露的服务的端口号,可以从dubbo:protocol配置,服务接口为dubbo:service配置发布的接口。
后半段主要是判断scope变量来决定是否将服务导出远程或者本地,导出到本地实际上很简单只需要生成Invoker。当导出到远程就需要添加监视器还要生成invoker。监视器能让Dubbo定时查看注册中心挂了没。会抛出指定异常,而invoker使得服务消费方能够远程调用到服务。并且还会进行注册到注册中心下面我们接着来看看服务的发布。因为Invoker比较重要在消费者和提供者中都有,所以这个后面会单独拿出来进行探讨。
五、服务发布本地与远程
1、服务发布到本地

1 private void exportLocal(URL url) {
2 //进行本地URL的构建
3 URL local = URLBuilder.from(url).setProtocol("injvm").setHost("127.0.0.1").setPort(0
).build();
4 //根据本地的URL来实现对应的Invoker
5 Exporter<?> exporter = protocol.export(PROXY_FACTORY.getInvoker(
this.ref,
this.interfaceClass, local));
6 this.exporters.add(exporter);
7 logger.info("Export dubbo service " +
this.interfaceClass.getName() + " to local registry url : " +
local);
8 }
View Code
可见发布到本地是重新构建了protocol,injvm就是代表在本地的JVM里,host与port都统一默认127.0.0.1:0。
2、服务发布到远程

1 public <T> Exporter<T> export(Invoker<T> originInvoker)
throws RpcException {
2 //获取注册中心的URL,比如:zookeeper://127.0.0.1:2181/......
3 URL registryUrl =
this.getRegistryUrl(originInvoker);
4 //获取所有服务提供者的URL,比如:dubbo://192.168.1.6:20880/.......
5 URL providerUrl =
this.getProviderUrl(originInvoker);
6 //获取订阅URL,比如:provider://192.168.1.6:20880/......
7 URL overrideSubscribeUrl =
this.getSubscribedOverrideUrl(providerUrl);
8 //创建监听器
9 RegistryProtocol.OverrideListener overrideSubscribeListener =
new RegistryProtocol.OverrideListener(overrideSubscribeUrl, originInvoker);
10 //向订阅中心推送监听器
11 this.overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
12 providerUrl =
this.overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
13 //导出服务
14 RegistryProtocol.ExporterChangeableWrapper<T> exporter =
this.doLocalExport(originInvoker, providerUrl);
15 Registry registry =
this.getRegistry(originInvoker);
16 //获取已注册的服务提供者的URL,比如dubbo://192.168.1.6:20880/.......
17 URL registeredProviderUrl =
this.getRegisteredProviderUrl(providerUrl, registryUrl);
18 // 向服务提供者与消费者注册表中注册服务提供者
19 ProviderInvokerWrapper<T> providerInvokerWrapper =
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
20 // 获取 register 参数
21 boolean register = registeredProviderUrl.getParameter("register",
true);
22 // 根据 register 的值决定是否注册服务
23 if (register) {
24 this.register(registryUrl, registeredProviderUrl);
25 providerInvokerWrapper.setReg(
true);
26 }
27 // 向注册中心进行订阅 override 数据
28 registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
29 exporter.setRegisterUrl(registeredProviderUrl);
30 exporter.setSubscribeUrl(overrideSubscribeUrl);
31 // 创建并返回 DestroyableExporter
32 return new RegistryProtocol.DestroyableExporter(exporter);
33 }
View Code
上面的源码主要是根据前面生成的URL进行服务的发布和注册(注册在下一节展开源码)。当执行到doLocalExport也就是发布本地服务到远程时候会调用 DubboProtocol 的 export 方法大致会经历下面一些步骤来导出服务
到这里大致的服务发布图如下:
1 public void register(URL url) {
2 super.register(url);
3 failedRegistered.remove(url);
4 failedUnregistered.remove(url);
5 try {
6 // 模板方法,由子类实现
7 doRegister(url);
8 } catch (Exception e) {
9 Throwable t = e;
10
11 // 获取 check 参数,若 check = true 将会直接抛出异常
12 boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
13 && url.getParameter(Constants.CHECK_KEY, true)
14 && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
15 boolean skipFailback = t instanceof SkipFailbackWrapperException;
16 if (check || skipFailback) {
17 if (skipFailback) {
18 t = t.getCause();
19 }
20 throw new IllegalStateException("Failed to register");
21 } else {
22 logger.error("Failed to register");
23 }
24
25 // 记录注册失败的链接
26 failedRegistered.add(url);
27 }
28 }
29
30 //进入doRegister方法
31
32 protected void doRegister(URL url) {
33 try {
34 // 通过 Zookeeper 客户端创建节点,节点路径由 toUrlPath 方法生成,路径格式如下:
35 // /${group}/${serviceInterface}/providers/${url}
36 // 比如
37 // /dubbo/org.apache.dubbo.DemoService/providers/dubbo%3A%2F%2F127.0.0.1......
38 zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
39 } catch (Throwable e) {
40 throw new RpcException("Failed to register...");
41 }
42 }
43
44 //进入create方法
45
46 public void create(String path, boolean ephemeral) {
47 if (!ephemeral) {
48 // 如果要创建的节点类型非临时节点,那么这里要检测节点是否存在
49 if (checkExists(path)) {
50 return;
51 }
52 }
53 int i = path.lastIndexOf('/');
54 if (i > 0) {
55 // 递归创建上一级路径
56 create(path.substring(0, i), false);
57 }
58
59 // 根据 ephemeral 的值创建临时或持久节点
60 if (ephemeral) {
61 createEphemeral(path);
62 } else {
63 createPersistent(path);
64 }
65 }
66
67 //进入createEphemeral
68
69 public void createEphemeral(String path) {
70 try {
71 // 通过 Curator 框架创建节点
72 client.create().withMode(CreateMode.EPHEMERAL).forPath(path);
73 } catch (NodeExistsException e) {
74 } catch (Exception e) {
75 throw new IllegalStateException(e.getMessage(), e);
76 }
77 }
View Code
根据上面的方法,可以将当前服务对应的配置信息(存储在URL中的)注册到注册中心/dubbo/org.apache.dubbo.demo.DemoService/providers/ 。里面直接使用了Curator进行创建节点(Curator是Netflix公司开源的一套zookeeper客户端框架)
七、总结
到这里Dubbo的服务注册流程终于是解释完。核心在于Dubbo使用规定好的URL+SPI进行寻找和发现服务,通过URL定位注册中心,再通过将服务的URL发布到注册中心从而使得消费者可以知道服务的有哪些,里面可以看见对于URL这种复杂的对象并且需要经常更改的,通常采用建造者模式。而2.7.3版本的Dubbo源码也使用了Java8以后的新特性Lambda表达式来构建隐式函数。而一整套流程下来可以在ZooInspector这个zk可视化客户端看见我们创建的节点,前提是注册中心为zk。
用户点评