更新時間:2022-09-19 來源:黑馬程序員 瀏覽量:
概述
dubbo是一個簡單易用的RPC框架,通過簡單的提供者,消費(fèi)者配置就能完成無感的網(wǎng)絡(luò)調(diào)用。那么在dubbo中是如何將提供者的服務(wù)暴露出去,消費(fèi)者又是如何獲取到提供者相關(guān)信息的呢?這就是本章我們要討論的內(nèi)容。
dubbo與spring的整合
在了解dubbo的服務(wù)注冊和服務(wù)發(fā)現(xiàn)之前,我們首先需要掌握一個知識點(diǎn):Spring中自定義Schema。
Spring自定義Schema
Dubbo 現(xiàn)在的設(shè)計(jì)是完全無侵入,也就是使用者只依賴于配置契約。在 Dubbo 中,可以使用 XML 配置相關(guān)信息,也可以用來引入服務(wù)或者導(dǎo)出服務(wù)。配置完成,啟動工程,Spring 會讀取配置文件,生成注入相關(guān)Bean。那 Dubbo 如何實(shí)現(xiàn)自定義 XML 被 Spring 加載讀取呢?
從 Spring 2.0 開始,Spring 開始提供了一種基于 XML Schema 格式擴(kuò)展機(jī)制,用于定義和配置 bean。
入門案例
學(xué)習(xí)和使用Spring XML Schema 擴(kuò)展機(jī)制并不難,需要下面幾個步驟:
1. 創(chuàng)建配置屬性的JavaBean對象
2. 創(chuàng)建一個 XML Schema 文件,描述自定義的合法構(gòu)建模塊,也就是xsd文件。
3. 自定義處理器類,并實(shí)現(xiàn)`NamespaceHandler`接口。
4. 自定義解析器,實(shí)現(xiàn)`BeanDefinitionParser`接口(最關(guān)鍵的部分)。
5. 編寫Spring.handlers和Spring.schemas文件配置所有部件
定義JavaBean對象,在spring中此對象會根據(jù)配置自動創(chuàng)建
public class User { private String id; private String name; private Integer age; //省略getter setter方法 }
在META-INF下定義`user.xsd`文件,使用xsd用于描述標(biāo)簽的規(guī)則
<?xml version="1.0" encoding="UTF-8"?> <xsd:schema xmlns="http://3rdspacecomics.com/schema/user" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:beans="http://www.springframework.org/schema/beans" targetNamespace="http://3rdspacecomics.com/schema/user" elementFormDefault="qualified" attributeFormDefault="unqualified"> <xsd:import namespace="http://www.springframework.org/schema/beans" /> <xsd:element name="user"> <xsd:complexType> <xsd:complexContent> <xsd:extension base="beans:identifiedType"> <xsd:attribute name="name" type="xsd:string" /> <xsd:attribute name="age" type="xsd:int" /> </xsd:extension> </xsd:complexContent> </xsd:complexType> </xsd:element> </xsd:schema>
Spring讀取xml文件時,會根據(jù)標(biāo)簽的命名空間找到其對應(yīng)的NamespaceHandler,我們在NamespaceHandler內(nèi)會注冊標(biāo)簽對應(yīng)的解析器BeanDefinitionParser。
package com.itheima.schema; import org.springframework.beans.factory.xml.NamespaceHandlerSupport; public class UserNamespaceHandler extends NamespaceHandlerSupport { public void init() { registerBeanDefinitionParser("user", new UserBeanDefinitionParser()); } }
BeanDefinitionParser是標(biāo)簽對應(yīng)的解析器,Spring讀取到對應(yīng)標(biāo)簽時會使用該類進(jìn)行解析;
public class UserBeanDefinitionParser extends AbstractSingleBeanDefinitionParser { protected Class getBeanClass(Element element) { return User.class; } protected void doParse(Element element, BeanDefinitionBuilder bean) { String name = element.getAttribute("name"); String age = element.getAttribute("age"); String id = element.getAttribute("id"); if (StringUtils.hasText(id)) { bean.addPropertyValue("id", id); } if (StringUtils.hasText(name)) { bean.addPropertyValue("name", name); } if (StringUtils.hasText(age)) { bean.addPropertyValue("age", Integer.valueOf(age)); } } }
定義spring.handlers文件,內(nèi)部保存命名空間與NamespaceHandler類的對應(yīng)關(guān)系;必須放在classpath下的META-INF文件夾中。
```proprties
http\://3rdspacecomics.com/schema/user=com.itheima.schema.UserNamespaceHandler
```
定義spring.schemas文件,內(nèi)部保存命名空間對應(yīng)的xsd文件位置;必須放在classpath下的META-INF文件夾中。
http\://3rdspacecomics.com/schema/user.xsd=META-INF/user.xsd
代碼準(zhǔn)備好了之后,就可以在spring工程中進(jìn)行使用和測試,定義spring配置文件,導(dǎo)入對應(yīng)約束。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:util="http://www.springframework.org/schema/util" xmlns:task="http://www.springframework.org/schema/task" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:itheima="http://3rdspacecomics.com/schema/user" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://3rdspacecomics.com/schema/user http://3rdspacecomics.com/schema/user.xsd"> <itheima:user id="user" name="zhangsan" age="12"></itheima:user> </beans>
編寫測試類,通過spring容器獲取對象user
public class SchemaDemo { public static void main(String[] args) { ApplicationContext ctx = new ClassPathXmlApplicationContext("/spring/applicationContext.xml"); User user = (User)ctx.getBean("user"); System.out.println(user); } }
dubbo中的相關(guān)對象
Dubbo是運(yùn)行在spring容器中,dubbo的配置文件也是通過spring的配置文件applicationContext.xml來加載,所以dubbo的自定義配置標(biāo)簽實(shí)現(xiàn),其實(shí)同樣依賴spring的xml schema機(jī)制
可以看出Dubbo所有的組件都是由`DubboBeanDefinitionParser`解析,并通過registerBeanDefinitionParser方法來注冊到spring中最后解析對應(yīng)的對象。這些對象中我們重點(diǎn)關(guān)注的有以下兩個:
ServiceBean:服務(wù)提供者暴露服務(wù)的核心對象
ReferenceBean:服務(wù)消費(fèi)者發(fā)現(xiàn)服務(wù)的核心對象
RegistryConfig:定義注冊中心的核心配置對象
服務(wù)暴露
前面主要探討了 Dubbo 中 schema 、 XML 的相關(guān)原理 , 這些內(nèi)容對理解框架整體至關(guān)重要 , 在此基礎(chǔ)上我們繼續(xù)探討服務(wù)是如何依靠前面的配置進(jìn)行服務(wù)暴露。
名詞解釋
在 Dubbo 的核心領(lǐng)域模型中:
Invoker 是實(shí)體域,它是 Dubbo 的核心模型,其它模型都向它靠擾,或轉(zhuǎn)換成它,它代表一個可執(zhí)行體,可向它發(fā)起 invoke 調(diào)用,它有可能是一個本地的實(shí)現(xiàn),也可能是一個遠(yuǎn)程的實(shí)現(xiàn),也可能一個集群實(shí)現(xiàn)。在服務(wù)提供方,Invoker用于調(diào)用服務(wù)提供類。在服務(wù)消費(fèi)方,Invoker用于執(zhí)行遠(yuǎn)程調(diào)用。
- Protocol 是服務(wù)域,它是 Invoker 暴露和引用的主功能入口,它負(fù)責(zé) Invoker 的生命周期管理。
- export:暴露遠(yuǎn)程服務(wù)
- refer:引用遠(yuǎn)程服務(wù)
- proxyFactory:獲取一個接口的代理類
- getInvoker:針對server端,將服務(wù)對象,如DemoServiceImpl包裝成一個Invoker對象
- getProxy:針對client端,創(chuàng)建接口的代理對象,例如DemoService的接口。
- Invocation 是會話域,它持有調(diào)用過程中的變量,比如方法名,參數(shù)等
整體流程
在詳細(xì)探討服務(wù)暴露細(xì)節(jié)之前 , 我們先看一下整體duubo的服務(wù)暴露原理
在整體上看,Dubbo 框架做服務(wù)暴露分為兩大部分 , 第一步將持有的服務(wù)實(shí)例通過代理轉(zhuǎn)換成 Invoker, 第二步會把 Invoker 通過具體的協(xié)議 ( 比如 Dubbo ) 轉(zhuǎn)換成 Exporter, 框架做了這層抽象也大大方便了功能擴(kuò)展 。
服務(wù)提供方暴露服務(wù)的藍(lán)色初始化鏈,時序圖如下:
源碼分析
(1) 導(dǎo)出入口
服務(wù)導(dǎo)出的入口方法是 ServiceBean 的 onApplicationEvent。onApplicationEvent 是一個事件響應(yīng)方法,該方法會在收到 Spring 上下文刷新事件后執(zhí)行服務(wù)導(dǎo)出操作。方法代碼如下:
public void onApplicationEvent(ContextRefreshedEvent event) { // 是否有延遲導(dǎo)出 && 是否已導(dǎo)出 && 是不是已被取消導(dǎo)出 if (isDelay() && !isExported() && !isUnexported()) { // 導(dǎo)出服務(wù) export(); } }
onApplicationEvent 方法在經(jīng)過一些判斷后,會決定是否調(diào)用 export 方法導(dǎo)出服務(wù)。在export 根據(jù)配置執(zhí)行相應(yīng)的動作。最終進(jìn)入到doExportUrls導(dǎo)出服務(wù)方法。
private void doExportUrls() { // 加載注冊中心鏈接 List<URL> registryURLs = loadRegistries(true); // 遍歷 protocols,并在每個協(xié)議下導(dǎo)出服務(wù) for (ProtocolConfig protocolConfig : protocols) { doExportUrlsFor1Protocol(protocolConfig, registryURLs); } }
關(guān)于多協(xié)議多注冊中心導(dǎo)出服務(wù)首先是根據(jù)配置,以及其他一些信息組裝 URL。前面說過,URL 是 Dubbo 配置的載體,通過 URL 可讓 Dubbo 的各種配置在各個模塊之間傳遞。
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { String name = protocolConfig.getName(); // 如果協(xié)議名為空,或空串,則將協(xié)議名變量設(shè)置為 dubbo if (name == null || name.length() == 0) { name = "dubbo"; } Map<String, String> map = new HashMap<String, String>(); //略 // 獲取上下文路徑 String contextPath = protocolConfig.getContextpath(); if ((contextPath == null || contextPath.length() == 0) && provider != null) { contextPath = provider.getContextpath(); } // 獲取 host 和 port String host = this.findConfigedHosts(protocolConfig, registryURLs, map); Integer port = this.findConfigedPorts(protocolConfig, name, map); // 組裝 URL URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map); // 省略無關(guān)代碼 }
上面的代碼首先是將一些信息,比如版本、時間戳、方法名以及各種配置對象的字段信息放入到 map 中,最后將 map 和主機(jī)名等數(shù)據(jù)傳給 URL 構(gòu)造方法創(chuàng)建 URL 對象。前置工作做完,接下來就可以進(jìn)行服務(wù)導(dǎo)出了。服務(wù)導(dǎo)出分為導(dǎo)出到本地 (JVM),和導(dǎo)出到遠(yuǎn)程。在深入分析服務(wù)導(dǎo)出的源碼前,我們先來從宏觀層面上看一下服務(wù)導(dǎo)出邏輯。如下:
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { // 省略無關(guān)代碼 String scope = url.getParameter(Constants.SCOPE_KEY); // 如果 scope = none,則什么都不做 if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) { // scope != remote,導(dǎo)出到本地 if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) { exportLocal(url); } // scope != local,導(dǎo)出到遠(yuǎn)程 if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) { if (registryURLs != null && !registryURLs.isEmpty()) { for (URL registryURL : registryURLs) { //省略無關(guān)代碼 // 為服務(wù)提供類(ref)生成 Invoker Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); // DelegateProviderMetaDataInvoker 用于持有 Invoker 和 ServiceConfig DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); // 導(dǎo)出服務(wù),并生成 Exporter Exporter<?> exporter = protocol.export(wrapperInvoker); exporters.add(exporter); } // 不存在注冊中心,僅導(dǎo)出服務(wù) } else { //略 } } } this.urls.add(url); }
上面代碼根據(jù) url 中的 scope 參數(shù)決定服務(wù)導(dǎo)出方式,分別如下:
- scope = none,不導(dǎo)出服務(wù)
- scope != remote,導(dǎo)出到本地
- scope != local,導(dǎo)出到遠(yuǎn)程
不管是導(dǎo)出到本地,還是遠(yuǎn)程。進(jìn)行服務(wù)導(dǎo)出之前,均需要先創(chuàng)建 Invoker,這是一個很重要的步驟。因此下面先來分析 Invoker 的創(chuàng)建過程。Invoker 是由 ProxyFactory 創(chuàng)建而來,Dubbo 默認(rèn)的 ProxyFactory 實(shí)現(xiàn)類是 JavassistProxyFactory。下面我們到 JavassistProxyFactory 代碼中,探索 Invoker 的創(chuàng)建過程。如下:
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // 為目標(biāo)類創(chuàng)建 Wrapper final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); // 創(chuàng)建匿名 Invoker 類對象,并實(shí)現(xiàn) doInvoke 方法。 return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { // 調(diào)用 Wrapper 的 invokeMethod 方法,invokeMethod 最終會調(diào)用目標(biāo)方法 return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; }
如上,JavassistProxyFactory 創(chuàng)建了一個繼承自 AbstractProxyInvoker 類的匿名對象,并覆寫了抽象方法 doInvoke。
(2) 導(dǎo)出服務(wù)到本地
Invoke創(chuàng)建成功之后,接下來我們來看本地導(dǎo)出
private void exportLocal(URL url) { // 如果 URL 的協(xié)議頭等于 injvm,說明已經(jīng)導(dǎo)出到本地了,無需再次導(dǎo)出 if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) { URL local = URL.valueOf(url.toFullString()) .setProtocol(Constants.LOCAL_PROTOCOL) // 設(shè)置協(xié)議頭為 injvm .setHost(LOCALHOST) .setPort(0); ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref)); // 創(chuàng)建 Invoker,并導(dǎo)出服務(wù),這里的 protocol 會在運(yùn)行時調(diào)用 InjvmProtocol 的 export 方法 Exporter<?> exporter = protocol.export( proxyFactory.getInvoker(ref, (Class) interfaceClass, local)); exporters.add(exporter); } }
exportLocal 方法比較簡單,首先根據(jù) URL 協(xié)議頭決定是否導(dǎo)出服務(wù)。若需導(dǎo)出,則創(chuàng)建一個新的 URL 并將協(xié)議頭、主機(jī)名以及端口設(shè)置成新的值。然后創(chuàng)建 Invoker,并調(diào)用 InjvmProtocol 的 export 方法導(dǎo)出服務(wù)。下面我們來看一下 InjvmProtocol 的 export 方法都做了哪些事情。
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { // 創(chuàng)建 InjvmExporter return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap); }
如上,InjvmProtocol 的 export 方法僅創(chuàng)建了一個 InjvmExporter,無其他邏輯。到此導(dǎo)出服務(wù)到本地就分析完了。
(3) 導(dǎo)出服務(wù)到遠(yuǎn)程
接下來,我們繼續(xù)分析導(dǎo)出服務(wù)到遠(yuǎn)程的過程。導(dǎo)出服務(wù)到遠(yuǎn)程包含了服務(wù)導(dǎo)出與服務(wù)注冊兩個過程。先來分析服務(wù)導(dǎo)出邏輯。我們把目光移動到 RegistryProtocol 的 export 方法上。
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { // 導(dǎo)出服務(wù) final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); // 獲取注冊中心 URL URL registryUrl = getRegistryUrl(originInvoker); // 根據(jù) URL 加載 Registry 實(shí)現(xiàn)類,比如 ZookeeperRegistry final Registry registry = getRegistry(originInvoker); // 獲取已注冊的服務(wù)提供者 URL,比如: final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker); // 獲取 register 參數(shù) boolean register = registeredProviderUrl.getParameter("register", true); // 向服務(wù)提供者與消費(fèi)者注冊表中注冊服務(wù)提供者 ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl); // 根據(jù) register 的值決定是否注冊服務(wù) if (register) { // 向注冊中心注冊服務(wù) register(registryUrl, registeredProviderUrl); ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true); } // 獲取訂閱 URL,比如: final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl); // 創(chuàng)建監(jiān)聽器 final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); // 向注冊中心進(jìn)行訂閱 override 數(shù)據(jù) registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); // 創(chuàng)建并返回 DestroyableExporter return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl); }
上面代碼看起來比較復(fù)雜,主要做如下一些操作:
1. 調(diào)用 doLocalExport 導(dǎo)出服務(wù)
2. 向注冊中心注冊服務(wù)
3. 向注冊中心進(jìn)行訂閱 override 數(shù)據(jù)
4. 創(chuàng)建并返回 DestroyableExporter
下面先來分析 doLocalExport 方法的邏輯,如下:
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) { String key = getCacheKey(originInvoker); // 訪問緩存 ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { synchronized (bounds) { exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { // 創(chuàng)建 Invoker 為委托類對象 final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker)); // 調(diào)用 protocol 的 export 方法導(dǎo)出服務(wù) exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker); // 寫緩存 bounds.put(key, exporter); } } } return exporter; }
接下來,我們把重點(diǎn)放在 Protocol 的 export 方法上。假設(shè)運(yùn)行時協(xié)議為 dubbo,此處的 protocol 變量會在運(yùn)行時加載 DubboProtocol,并調(diào)用 DubboProtocol 的 export 方法。
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); // 獲取服務(wù)標(biāo)識,理解成服務(wù)坐標(biāo)也行。由服務(wù)組名,服務(wù)名,服務(wù)版本號以及端口組成。比如: // demoGroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880 String key = serviceKey(url); // 創(chuàng)建 DubboExporter DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); // 將 <key, exporter> 鍵值對放入緩存中 exporterMap.put(key, exporter); //省略無關(guān)代碼 // 啟動服務(wù)器 openServer(url); // 優(yōu)化序列化 optimizeSerialization(url); return exporter; }
(4) 開啟Netty服務(wù)
如上,我們重點(diǎn)關(guān)注 DubboExporter 的創(chuàng)建以及 openServer 方法,其他邏輯看不懂也沒關(guān)系,不影響理解服務(wù)導(dǎo)出過程。下面分析 openServer 方法。
private void openServer(URL url) { // 獲取 host:port,并將其作為服務(wù)器實(shí)例的 key,用于標(biāo)識當(dāng)前的服務(wù)器實(shí)例 String key = url.getAddress(); boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); if (isServer) { // 訪問緩存 ExchangeServer server = serverMap.get(key); if (server == null) { // 創(chuàng)建服務(wù)器實(shí)例 serverMap.put(key, createServer(url)); } else { // 服務(wù)器已創(chuàng)建,則根據(jù) url 中的配置重置服務(wù)器 server.reset(url); } } }
接下來分析服務(wù)器實(shí)例的創(chuàng)建過程。如下:
private ExchangeServer createServer(URL url) { url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, // 添加心跳檢測配置到 url 中 url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); // 獲取 server 參數(shù),默認(rèn)為 netty String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); // 通過 SPI 檢測是否存在 server 參數(shù)所代表的 Transporter 拓展,不存在則拋出異常 if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) throw new RpcException("Unsupported server type: " + str + ", url: " + url); // 添加編碼解碼器參數(shù) url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME); ExchangeServer server; try { // 創(chuàng)建 ExchangeServer server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server..."); } // 獲取 client 參數(shù),可指定 netty,mina str = url.getParameter(Constants.CLIENT_KEY); if (str != null && str.length() > 0) { // 獲取所有的 Transporter 實(shí)現(xiàn)類名稱集合,比如 supportedTypes = [netty, mina] Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); // 檢測當(dāng)前 Dubbo 所支持的 Transporter 實(shí)現(xiàn)類名稱列表中, // 是否包含 client 所表示的 Transporter,若不包含,則拋出異常 if (!supportedTypes.contains(str)) { throw new RpcException("Unsupported client type..."); } } return server; }
如上,createServer 包含三個核心的邏輯。第一是檢測是否存在 server 參數(shù)所代表的 Transporter 拓展,不存在則拋出異常。第二是創(chuàng)建服務(wù)器實(shí)例。第三是檢測是否支持 client 參數(shù)所表示的 Transporter 拓展,不存在也是拋出異常。兩次檢測操作所對應(yīng)的代碼比較直白了,無需多說。但創(chuàng)建服務(wù)器的操作目前還不是很清晰,我們繼續(xù)往下看。
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handler == null) { throw new IllegalArgumentException("handler == null"); } url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); // 獲取 Exchanger,默認(rèn)為 HeaderExchanger。 // 緊接著調(diào)用 HeaderExchanger 的 bind 方法創(chuàng)建 ExchangeServer 實(shí)例 return getExchanger(url).bind(url, handler); }
上面代碼比較簡單,就不多說了。下面看一下 HeaderExchanger 的 bind 方法。
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { // 創(chuàng)建 HeaderExchangeServer 實(shí)例,該方法包含了多個邏輯,分別如下: // 1. new HeaderExchangeHandler(handler) // 2. new DecodeHandler(new HeaderExchangeHandler(handler)) // 3. Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))) return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); }
HeaderExchanger 的 bind 方法包含的邏輯比較多,但目前我們僅需關(guān)心 Transporters 的 bind 方法邏輯即可。該方法的代碼如下:
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handlers == null || handlers.length == 0) { throw new IllegalArgumentException("handlers == null"); } ChannelHandler handler; if (handlers.length == 1) { handler = handlers[0]; } else { // 如果 handlers 元素?cái)?shù)量大于1,則創(chuàng)建 ChannelHandler 分發(fā)器 handler = new ChannelHandlerDispatcher(handlers); } // 獲取自適應(yīng) Transporter 實(shí)例,并調(diào)用實(shí)例方法 return getTransporter().bind(url, handler); }
如上,getTransporter() 方法獲取的 Transporter 是在運(yùn)行時動態(tài)創(chuàng)建的,類名為 TransporterAdaptive,也就是自適應(yīng)拓展類。TransporterAdaptive 會在運(yùn)行時根據(jù)傳入的 URL 參數(shù)決定加載什么類型的 Transporter,默認(rèn)為 NettyTransporter。調(diào)用`NettyTransporter.bind(URL, ChannelHandler)`方法。創(chuàng)建一個`NettyServer`實(shí)例。調(diào)用`NettyServer.doOPen()`方法,服務(wù)器被開啟,服務(wù)也被暴露出來了。
(5) 服務(wù)注冊
本節(jié)內(nèi)容以 Zookeeper 注冊中心作為分析目標(biāo),其他類型注冊中心大家可自行分析。下面從服務(wù)注冊的入口方法開始分析,我們把目光再次移到 RegistryProtocol 的 export 方法上。如下:
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { // ${導(dǎo)出服務(wù)} // 省略其他代碼 boolean register = registeredProviderUrl.getParameter("register", true); if (register) { // 注冊服務(wù) register(registryUrl, registeredProviderUrl); ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true); } final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); // 訂閱 override 數(shù)據(jù) registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); // 省略部分代碼 }
RegistryProtocol 的 export 方法包含了服務(wù)導(dǎo)出,注冊,以及數(shù)據(jù)訂閱等邏輯。其中服務(wù)導(dǎo)出邏輯上一節(jié)已經(jīng)分析過了,本節(jié)將分析服務(wù)注冊邏輯,相關(guān)代碼如下:
public void register(URL registryUrl, URL registedProviderUrl) { // 獲取 Registry Registry registry = registryFactory.getRegistry(registryUrl); // 注冊服務(wù) registry.register(registedProviderUrl); }
register 方法包含兩步操作,第一步是獲取注冊中心實(shí)例,第二步是向注冊中心注冊服務(wù)。接下來分兩節(jié)內(nèi)容對這兩步操作進(jìn)行分析。
這里以 Zookeeper 注冊中心為例進(jìn)行分析。下面先來看一下 getRegistry 方法的源碼,這個方法由 AbstractRegistryFactory 實(shí)現(xiàn)。如下:
public Registry getRegistry(URL url) { url = url.setPath(RegistryService.class.getName()) .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName()) .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY); String key = url.toServiceString(); LOCK.lock(); try { // 訪問緩存 Registry registry = REGISTRIES.get(key); if (registry != null) { return registry; } // 緩存未命中,創(chuàng)建 Registry 實(shí)例 registry = createRegistry(url); if (registry == null) { throw new IllegalStateException("Can not create registry..."); } // 寫入緩存 REGISTRIES.put(key, registry); return registry; } finally { LOCK.unlock(); } } protected abstract Registry createRegistry(URL url);
如上,getRegistry 方法先訪問緩存,緩存未命中則調(diào)用 createRegistry 創(chuàng)建 Registry。在此方法中就是通過`new ZookeeperRegistry(url, zookeeperTransporter)`實(shí)例化一個注冊中心
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { super(url); if (url.isAnyHost()) { throw new IllegalStateException("registry address == null"); } // 獲取組名,默認(rèn)為 dubbo String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT); if (!group.startsWith(Constants.PATH_SEPARATOR)) { // group = "/" + group group = Constants.PATH_SEPARATOR + group; } this.root = group; // 創(chuàng)建 Zookeeper 客戶端,默認(rèn)為 CuratorZookeeperTransporter zkClient = zookeeperTransporter.connect(url); // 添加狀態(tài)監(jiān)聽器 zkClient.addStateListener(new StateListener() { @Override public void stateChanged(int state) { if (state == RECONNECTED) { try { recover(); } catch (Exception e) { logger.error(e.getMessage(), e); } } } }); }
在上面的代碼代碼中,我們重點(diǎn)關(guān)注 ZookeeperTransporter 的 connect 方法調(diào)用,這個方法用于創(chuàng)建 Zookeeper 客戶端。創(chuàng)建好 Zookeeper 客戶端,意味著注冊中心的創(chuàng)建過程就結(jié)束了。接下來,再來分析一下 Zookeeper 客戶端的創(chuàng)建過程。
public ZookeeperClient connect(URL url) { // 創(chuàng)建 CuratorZookeeperClient return new CuratorZookeeperClient(url); }
繼續(xù)向下看。
public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorWatcher> { private final CuratorFramework client; public CuratorZookeeperClient(URL url) { super(url); try { // 創(chuàng)建 CuratorFramework 構(gòu)造器 CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() .connectString(url.getBackupAddress()) .retryPolicy(new RetryNTimes(1, 1000)) .connectionTimeoutMs(5000); String authority = url.getAuthority(); if (authority != null && authority.length() > 0) { builder = builder.authorization("digest", authority.getBytes()); } // 構(gòu)建 CuratorFramework 實(shí)例 client = builder.build(); //省略無關(guān)代碼 // 啟動客戶端 client.start(); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } }
CuratorZookeeperClient 構(gòu)造方法主要用于創(chuàng)建和啟動 CuratorFramework 實(shí)例。至此Zookeeper客戶端就已經(jīng)啟動了
下面我們將 Dubbo 的 demo 跑起來,然后通過 Zookeeper 可視化客戶端 [ZooInspector](https://github.com/apache/zookeeper/tree/b79af153d0f98a4f3f3516910ed47234d7b3d74e/src/contrib/zooinspector) 查看節(jié)點(diǎn)數(shù)據(jù)。如下:
![img](http://dubbo.apache.org/docs/zh-cn/source_code_guide/sources/images/service-registry.png)
從上圖中可以看到DemoService 這個服務(wù)對應(yīng)的配置信息最終被注冊到了zookeeper節(jié)點(diǎn)下。搞懂了服務(wù)注冊的本質(zhì),那么接下來我們就可以去閱讀服務(wù)注冊的代碼了。
protected void doRegister(URL url) { try { // 通過 Zookeeper 客戶端創(chuàng)建節(jié)點(diǎn),節(jié)點(diǎn)路徑由 toUrlPath 方法生成,路徑格式如下: // /${group}/${serviceInterface}/providers/${url} // 比如 // /dubbo/org.apache.dubbo.DemoService/providers/dubbo%3A%2F%2F127.0.0.1...... zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register..."); } }
如上,ZookeeperRegistry 在 doRegister 中調(diào)用了 Zookeeper 客戶端創(chuàng)建服務(wù)節(jié)點(diǎn)。節(jié)點(diǎn)路徑由 toUrlPath 方法生成,該方法邏輯不難理解,就不分析了。接下來分析 create 方法,如下:
public void create(String path, boolean ephemeral) { if (!ephemeral) { // 如果要創(chuàng)建的節(jié)點(diǎn)類型非臨時節(jié)點(diǎn),那么這里要檢測節(jié)點(diǎn)是否存在 if (checkExists(path)) { return; } } int i = path.lastIndexOf('/'); if (i > 0) { // 遞歸創(chuàng)建上一級路徑 create(path.substring(0, i), false); } // 根據(jù) ephemeral 的值創(chuàng)建臨時或持久節(jié)點(diǎn) if (ephemeral) { createEphemeral(path); } else { createPersistent(path); } }
好了,到此關(guān)于服務(wù)注冊的過程就分析完了。整個過程可簡單總結(jié)為:先創(chuàng)建注冊中心實(shí)例,之后再通過注冊中心實(shí)例注冊服務(wù)。
總結(jié)
1. 在有注冊中心,需要注冊提供者地址的情況下,ServiceConfig 解析出的 URL 格式為:`registry:// registry-host/org.apache.dubbo.registry.RegistryService?export=URL.encode("dubbo://service-host/{服務(wù)名}/{版本號}")`
2. 基于 Dubbo SPI 的自適應(yīng)機(jī)制,通過 URL `registry://` 協(xié)議頭識別,就調(diào)用 RegistryProtocol#export() 方法
1. 將具體的服務(wù)類名,比如 `DubboServiceRegistryImpl`,通過 ProxyFactory 包裝成 Invoker 實(shí)例
2. 調(diào)用 doLocalExport 方法,使用 DubboProtocol 將 Invoker 轉(zhuǎn)化為 Exporter 實(shí)例,并打開 Netty 服務(wù)端監(jiān)聽客戶請求
3. 創(chuàng)建 Registry 實(shí)例,連接 Zookeeper,并在服務(wù)節(jié)點(diǎn)下寫入提供者的 URL 地址,注冊服務(wù)
4. 向注冊中心訂閱 override 數(shù)據(jù),并返回一個 Exporter 實(shí)例
3. 根據(jù) URL 格式中的 `"dubbo://service-host/{服務(wù)名}/{版本號}"`中協(xié)議頭 `dubbo://` 識別,調(diào)用 DubboProtocol#export() 方法,開發(fā)服務(wù)端口
4. RegistryProtocol#export() 返回的 Exporter 實(shí)例存放到 ServiceConfig 的 `List exporters` 中
服務(wù)發(fā)現(xiàn)
在學(xué)習(xí)了服務(wù)暴露原理之后 , 接下來重點(diǎn)探討服務(wù)是如何消費(fèi)的 。 這里主要講解如何通過注冊中心進(jìn)行服務(wù)發(fā)現(xiàn)進(jìn)行遠(yuǎn)程服務(wù)調(diào)用等細(xì)節(jié) 。
服務(wù)發(fā)現(xiàn)流程
在詳細(xì)探討服務(wù)暴露細(xì)節(jié)之前 , 我們先看一下整體duubo的服務(wù)消費(fèi)原理
在整體上看 , Dubbo 框架做服務(wù)消費(fèi)也分為兩大部分 , 第一步通過持有遠(yuǎn)程服務(wù)實(shí)例生成Invoker, 這個 Invoker 在客戶端是核心的遠(yuǎn)程代理對象 。 第二步會把 Invoker 通過動態(tài)代理轉(zhuǎn)換成實(shí)現(xiàn)用戶接口的動態(tài)代理引用 。
服務(wù)消費(fèi)方引用服務(wù)的藍(lán)色初始化鏈,時序圖如下:
源碼分析
(1) 引用入口
服務(wù)引用的入口方法為 ReferenceBean 的 getObject 方法,該方法定義在 Spring 的 FactoryBean 接口中,ReferenceBean 實(shí)現(xiàn)了這個方法。
public Object getObject() throws Exception { return get(); } public synchronized T get() { // 檢測 ref 是否為空,為空則通過 init 方法創(chuàng)建 if (ref == null) { // init 方法主要用于處理配置,以及調(diào)用 createProxy 生成代理類 init(); } return ref; }
Dubbo 提供了豐富的配置,用于調(diào)整和優(yōu)化框架行為,性能等。Dubbo 在引用或?qū)С龇?wù)時,首先會對這些配置進(jìn)行檢查和處理,以保證配置的正確性。
private void init() { // 創(chuàng)建代理類 ref = createProxy(map);
此方法代碼很長,主要完成的配置加載,檢查,以及創(chuàng)建引用的代理對象。這里要從 createProxy 開始看起。從字面意思上來看,createProxy 似乎只是用于創(chuàng)建代理對象的。但實(shí)際上并非如此,該方法還會調(diào)用其他方法構(gòu)建以及合并 Invoker 實(shí)例。具體細(xì)節(jié)如下。
private T createProxy(Map<String, String> map) { URL tmpUrl = new URL("temp", "localhost", 0, map); ........... isDvmRefer = InjvmProtocol . getlnjvmProtocol( ) . islnjvmRefer(tmpUrl) // 本地引用略 if (isJvmRefer) { } else { // 點(diǎn)對點(diǎn)調(diào)用略 if (url != null && url.length() > 0) { } else { // 加載注冊中心 url List<URL> us = loadRegistries(false); if (us != null && !us.isEmpty()) { for (URL u : us) { URL monitorUrl = loadMonitor(u); if (monitorUrl != null) { map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString())); } // 添加 refer 參數(shù)到 url 中,并將 url 添加到 urls 中 urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } } } // 單個注冊中心或服務(wù)提供者(服務(wù)直連,下同) if (urls.size() == 1) { // 調(diào)用 RegistryProtocol 的 refer 構(gòu)建 Invoker 實(shí)例 invoker = refprotocol.refer(interfaceClass, urls.get(0)); // 多個注冊中心或多個服務(wù)提供者,或者兩者混合 } else { List<Invoker<?>> invokers = new ArrayList<Invoker<?>>(); URL registryURL = null; // 獲取所有的 Invoker for (URL url : urls) { // 通過 refprotocol 調(diào)用 refer 構(gòu)建 Invoker,refprotocol 會在運(yùn)行時 // 根據(jù) url 協(xié)議頭加載指定的 Protocol 實(shí)例,并調(diào)用實(shí)例的 refer 方法 invokers.add(refprotocol.refer(interfaceClass, url)); if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { registryURL = url; } } if (registryURL != null) { // 如果注冊中心鏈接不為空,則將使用 AvailableCluster URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); // 創(chuàng)建 StaticDirectory 實(shí)例,并由 Cluster 對多個 Invoker 進(jìn)行合并 invoker = cluster.join(new StaticDirectory(u, invokers)); } else { invoker = cluster.join(new StaticDirectory(invokers)); } } } //省略無關(guān)代碼 // 生成代理類 return (T) proxyFactory.getProxy(invoker); }
上面代碼很多,不過邏輯比較清晰。
1、如果是本地調(diào)用,直接jvm 協(xié)議從內(nèi)存中獲取實(shí)例
2、如果只有一個注冊中心,直接通過 Protocol 自適應(yīng)拓展類構(gòu)建 Invoker 實(shí)例接口
3、如果有多個注冊中心,此時先根據(jù) url 構(gòu)建 Invoker。然后再通過 Cluster 合并多個 Invoker,最后調(diào)用 ProxyFactory 生成代理類。
(2) 創(chuàng)建客戶端
在服務(wù)消費(fèi)方,Invoker 用于執(zhí)行遠(yuǎn)程調(diào)用。Invoker 是由 Protocol 實(shí)現(xiàn)類構(gòu)建而來。Protocol 實(shí)現(xiàn)類有很多,這里分析DubboProtocol。
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException { optimizeSerialization(url); // 創(chuàng)建 DubboInvoker DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers); invokers.add(invoker); return invoker; }
上面方法看起來比較簡單,創(chuàng)建一個DubboInvoker。通過構(gòu)造方法傳入遠(yuǎn)程調(diào)用的client對象。默認(rèn)情況下,Dubbo 使用 NettyClient 進(jìn)行通信。接下來,我們簡單看一下 getClients 方法的邏輯。
private ExchangeClient[] getClients(URL url) { // 是否共享連接 boolean service_share_connect = false; // 獲取連接數(shù),默認(rèn)為0,表示未配置 int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0); // 如果未配置 connections,則共享連接 if (connections == 0) { service_share_connect = true; connections = 1; } ExchangeClient[] clients = new ExchangeClient[connections]; for (int i = 0; i < clients.length; i++) { if (service_share_connect) { // 獲取共享客戶端 clients[i] = getSharedClient(url); } else { // 初始化新的客戶端 clients[i] = initClient(url); } } return clients; }
這里根據(jù) connections 數(shù)量決定是獲取共享客戶端還是創(chuàng)建新的客戶端實(shí)例,getSharedClient 方法中也會調(diào)用 initClient 方法,因此下面我們一起看一下這個方法。
private ExchangeClient initClient(URL url) { // 獲取客戶端類型,默認(rèn)為 netty String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT)); //省略無關(guān)代碼 ExchangeClient client; try { // 獲取 lazy 配置,并根據(jù)配置值決定創(chuàng)建的客戶端類型 if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) { // 創(chuàng)建懶加載 ExchangeClient 實(shí)例 client = new LazyConnectExchangeClient(url, requestHandler); } else { // 創(chuàng)建普通 ExchangeClient 實(shí)例 client = Exchangers.connect(url, requestHandler); } } catch (RemotingException e) { throw new RpcException("Fail to create remoting client for service..."); } return client; }
initClient 方法首先獲取用戶配置的客戶端類型,默認(rèn)為 netty。下面我們分析一下 Exchangers 的 connect 方法。
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { // 獲取 Exchanger 實(shí)例,默認(rèn)為 HeaderExchangeClient return getExchanger(url).connect(url, handler); }
如上,getExchanger 會通過 SPI 加載 HeaderExchangeClient 實(shí)例,這個方法比較簡單,大家自己看一下吧。接下來分析 HeaderExchangeClient 的實(shí)現(xiàn)。
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { // 這里包含了多個調(diào)用,分別如下: // 1. 創(chuàng)建 HeaderExchangeHandler 對象 // 2. 創(chuàng)建 DecodeHandler 對象 // 3. 通過 Transporters 構(gòu)建 Client 實(shí)例 // 4. 創(chuàng)建 HeaderExchangeClient 對象 return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true); }
這里的調(diào)用比較多,我們這里重點(diǎn)看一下 Transporters 的 connect 方法。如下:
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } ChannelHandler handler; if (handlers == null || handlers.length == 0) { handler = new ChannelHandlerAdapter(); } else if (handlers.length == 1) { handler = handlers[0]; } else { // 如果 handler 數(shù)量大于1,則創(chuàng)建一個 ChannelHandler 分發(fā)器 handler = new ChannelHandlerDispatcher(handlers); } // 獲取 Transporter 自適應(yīng)拓展類,并調(diào)用 connect 方法生成 Client 實(shí)例 return getTransporter().connect(url, handler); }
如上,getTransporter 方法返回的是自適應(yīng)拓展類,該類會在運(yùn)行時根據(jù)客戶端類型加載指定的 Transporter 實(shí)現(xiàn)類。若用戶未配置客戶端類型,則默認(rèn)加載 NettyTransporter,并調(diào)用該類的 connect 方法。如下:
public Client connect(URL url, ChannelHandler listener) throws RemotingException { // 創(chuàng)建 NettyClient 對象 return new NettyClient(url, listener); }
(3) 注冊
這里就已經(jīng)創(chuàng)建好了NettyClient對象。關(guān)于 DubboProtocol 的 refer 方法就分析完了。接下來,繼續(xù)分析 RegistryProtocol 的 refer 方法邏輯。
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { // 取 registry 參數(shù)值,并將其設(shè)置為協(xié)議頭 url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY); // 獲取注冊中心實(shí)例 Registry registry = registryFactory.getRegistry(url); if (RegistryService.class.equals(type)) { return proxyFactory.getInvoker((T) registry, type, url); } // 將 url 查詢字符串轉(zhuǎn)為 Map Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY)); // 獲取 group 配置 String group = qs.get(Constants.GROUP_KEY); if (group != null && group.length() > 0) { if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) { // 通過 SPI 加載 MergeableCluster 實(shí)例,并調(diào)用 doRefer 繼續(xù)執(zhí)行服務(wù)引用邏輯 return doRefer(getMergeableCluster(), registry, type, url); } } // 調(diào)用 doRefer 繼續(xù)執(zhí)行服務(wù)引用邏輯 return doRefer(cluster, registry, type, url); }
上面代碼首先為 url 設(shè)置協(xié)議頭,然后根據(jù) url 參數(shù)加載注冊中心實(shí)例。然后獲取 group 配置,根據(jù) group 配置決定 doRefer 第一個參數(shù)的類型。這里的重點(diǎn)是 doRefer 方法,如下:
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { // 創(chuàng)建 RegistryDirectory 實(shí)例 RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); // 設(shè)置注冊中心和協(xié)議 directory.setRegistry(registry); directory.setProtocol(protocol); Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters()); // 生成服務(wù)消費(fèi)者鏈接 URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters); // 注冊服務(wù)消費(fèi)者,在 consumers 目錄下新節(jié)點(diǎn) if (!Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)) { registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, Constants.CHECK_KEY, String.valueOf(false))); } // 訂閱 providers、configurators、routers 等節(jié)點(diǎn)數(shù)據(jù) directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY + "," + Constants.ROUTERS_CATEGORY)); // 一個注冊中心可能有多個服務(wù)提供者,因此這里需要將多個服務(wù)提供者合并為一個 Invoker invoker = cluster.join(directory); ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory); return invoker; }
如上,doRefer 方法創(chuàng)建一個 RegistryDirectory 實(shí)例,然后生成服務(wù)者消費(fèi)者鏈接,并向注冊中心進(jìn)行注冊。注冊完畢后,緊接著訂閱 providers、configurators、routers 等節(jié)點(diǎn)下的數(shù)據(jù)。完成訂閱后,RegistryDirectory 會收到這幾個節(jié)點(diǎn)下的子節(jié)點(diǎn)信息。由于一個服務(wù)可能部署在多臺服務(wù)器上,這樣就會在 providers 產(chǎn)生多個節(jié)點(diǎn),這個時候就需要 Cluster 將多個服務(wù)節(jié)點(diǎn)合并為一個,并生成一個 Invoker。
(4)創(chuàng)建代理對象
Invoker 創(chuàng)建完畢后,接下來要做的事情是為服務(wù)接口生成代理對象。有了代理對象,即可進(jìn)行遠(yuǎn)程調(diào)用。代理對象生成的入口方法為 ProxyFactory 的 getProxy,接下來進(jìn)行分析。
public <T> T getProxy(Invoker<T> invoker) throws RpcException { // 調(diào)用重載方法 return getProxy(invoker, false); } public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException { Class<?>[] interfaces = null; // 獲取接口列表 String config = invoker.getUrl().getParameter("interfaces"); if (config != null && config.length() > 0) { // 切分接口列表 String[] types = Constants.COMMA_SPLIT_PATTERN.split(config); if (types != null && types.length > 0) { interfaces = new Class<?>[types.length + 2]; // 設(shè)置服務(wù)接口類和 EchoService.class 到 interfaces 中 interfaces[0] = invoker.getInterface(); interfaces[1] = EchoService.class; for (int i = 0; i < types.length; i++) { // 加載接口類 interfaces[i + 1] = ReflectUtils.forName(types[i]); } } } if (interfaces == null) { interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class}; } // 為 http 和 hessian 協(xié)議提供泛化調(diào)用支持,參考 pull request #1827 if (!invoker.getInterface().equals(GenericService.class) && generic) { int len = interfaces.length; Class<?>[] temp = interfaces; // 創(chuàng)建新的 interfaces 數(shù)組 interfaces = new Class<?>[len + 1]; System.arraycopy(temp, 0, interfaces, 0, len); // 設(shè)置 GenericService.class 到數(shù)組中 interfaces[len] = GenericService.class; } // 調(diào)用重載方法 return getProxy(invoker, interfaces); } public abstract <T> T getProxy(Invoker<T> invoker, Class<?>[] types);
如上,上面大段代碼都是用來獲取 interfaces 數(shù)組的,我們繼續(xù)往下看。getProxy(Invoker, Class[]) 這個方法是一個抽象方法,下面我們到 JavassistProxyFactory 類中看一下該方法的實(shí)現(xiàn)代碼。
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { // 生成 Proxy 子類(Proxy 是抽象類)。并調(diào)用 Proxy 子類的 newInstance 方法創(chuàng)建 Proxy 實(shí)例 return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); }
上面代碼并不多,首先是通過 Proxy 的 getProxy 方法獲取 Proxy 子類,然后創(chuàng)建 InvokerInvocationHandler 對象,并將該對象傳給 newInstance 生成 Proxy 實(shí)例。InvokerInvocationHandler 實(shí)現(xiàn) JDK 的 InvocationHandler 接口,具體的用途是攔截接口類調(diào)用。下面以 org.apache.dubbo.demo.DemoService 這個接口為例,來看一下該接口代理類代碼大致是怎樣的(忽略 EchoService 接口)。
package org.apache.dubbo.common.bytecode; public class proxy0 implements org.apache.dubbo.demo.DemoService { public static java.lang.reflect.Method[] methods; private java.lang.reflect.InvocationHandler handler; public proxy0() { } public proxy0(java.lang.reflect.InvocationHandler arg0) { handler = $1; } public java.lang.String sayHello(java.lang.String arg0) { Object[] args = new Object[1]; args[0] = ($w) $1; Object ret = handler.invoke(this, methods[0], args); return (java.lang.String) ret; } }
好了,到這里代理類生成邏輯就分析完了。整個過程比較復(fù)雜,大家需要耐心看一下。
總結(jié)
1. 從注冊中心發(fā)現(xiàn)引用服務(wù):在有注冊中心,通過注冊中心發(fā)現(xiàn)提供者地址的情況下,ReferenceConfig 解析出的 URL 格式為:`registry://registry-host:/org.apache.registry.RegistryService?refer=URL.encode("conumer-host/com.foo.FooService?version=1.0.0")`。
2. 通過 URL 的registry://協(xié)議頭識別,就會調(diào)用RegistryProtocol#refer()方法
(1). 查詢提供者 URL,如 `dubbo://service-host/com.foo.FooService?version=1.0.0` ,來獲取注冊中心
(2). 創(chuàng)建一個 RegistryDirectory 實(shí)例并設(shè)置注冊中心和協(xié)議
(3). 生成 conusmer 連接,在 consumer 目錄下創(chuàng)建節(jié)點(diǎn),向注冊中心注冊
(4). 注冊完畢后,訂閱 providers,configurators,routers 等節(jié)點(diǎn)的數(shù)據(jù)
(5). 通過 URL 的 `dubbo://` 協(xié)議頭識別,調(diào)用 `DubboProtocol#refer()` 方法,創(chuàng)建一個 ExchangeClient 客戶端并返回 DubboInvoker 實(shí)例
3. 由于一個服務(wù)可能會部署在多臺服務(wù)器上,這樣就會在 providers 產(chǎn)生多個節(jié)點(diǎn),這樣也就會得到多個 DubboInvoker 實(shí)例,就需要 RegistryProtocol 調(diào)用 Cluster 將多個服務(wù)提供者節(jié)點(diǎn)偽裝成一個節(jié)點(diǎn),并返回一個 Invoker
4. Invoker 創(chuàng)建完畢后,調(diào)用 ProxyFactory 為服務(wù)接口生成代理對象,返回提供者引用