前言
消费者初始化也就是对<dubbo:reference>
中的内容进行解析和初始化,根据Dubbo的官方文档描述,其对应的配置类为com.alibaba.dubbo.config.ReferenceConfig
,但它的入口在哪里呢?
由于Dubbo和Spring是高度整合的,因此Spring中实现InitializingBean
的方式是一个很好的初始化方式,找了一下确实找到了一个类ReferenceBean
,让我们来看一下它的定义
1 | public class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean, ApplicationContextAware, InitializingBean, DisposableBean |
我们看到了InitializingBean
加上ReferenceConfig
,几乎可以肯定这就是我们要找的入口,那么开始吧
源码分析
说到InitializingBean
第一反应肯定是找afterPropertiesSet()
方法,来看一下
1 | "unchecked"}) ({ |
前面一大片的代码都是在做赋值,分别调用了setConsumer
、setApplication
、setModule
、setRegistries
和setMonitor
这几个方法,全部都是AbstractInterfaceConfig
类中的set方法,最核心的还是最后的getObject()
方法
1 | public Object getObject() throws Exception { |
这里终于调用到了父类ReferenceConfig
的get()
方法,继续来看
1 | public synchronized T get() { |
这里的ref定义如下
1 | // 接口代理类引用 |
具体来看看init()
方法
1 | private void init() { |
首先看一下checkDefault()
这个方法
1 | private void checkDefault() { |
这里生成了一个ConsumerConfig
对象,根据官方文档说明这个类对应了<dubbo:consumer>
这个标签中的配置项,顺带提一句该标签是<dubbo:reference>
标签的缺省值,因此会覆盖<dubbo:reference>
中的设置。
接着看一下appendProperties(AbstractConfig config)
方法做了些什么
1 | protected static void appendProperties(AbstractConfig config) { |
看着比较长,其实主要做的事情就是执行入参config
中的set方法来初始化传入的对象,这里就不深入分析了。
继续往下看其中最重要的操作ref = createProxy(map)
,用于生成代理对象,具体来看一下其实现
1 | @SuppressWarnings({ "unchecked", "rawtypes", "deprecation" }) |
这个方法里面最值得关注的地方有三个,分别是
refprotocol.refer(interfaceClass, url)
cluster.join(new StaticDirectory(u, invokers))
proxyFactory.getProxy(invoker)
这三个对象很重要,来看看其定义
1 | private static final Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); |
这三个对象的生成使用的都是Dubbo的SPI机制,具体可以参考我之前发的讲解Dubbo中SPI的那篇文章,以proxyFactory
为例,它的本质是ProxyFactory
的一个代理类proxyFactory$Adpative
,@Adaptive
标注方法的具体实现由真正的实现类完成。
1 | "javassist") ( |
stub=com.alibaba.dubbo.rpc.proxy.wrapper.StubProxyFactoryWrapper
jdk=com.alibaba.dubbo.rpc.proxy.jdk.JdkProxyFactory
javassist=com.alibaba.dubbo.rpc.proxy.javassist.JavassistProxyFactory
来看看JavassistProxyFactory类的源码
在META-INF/dubbo/internal/com.alibaba.dubbo.rpc.ProxyFactory
中的内容如上所示,可以看到javassist
是ProxyFactory
接口的默认选择,对应的实现类就是JavassistProxyFactory
;dubbo
是Protocol
接口的默认选项,对应实现类是DubboProtocol
;failover
是Cluster
接口的默认选项,对应实现类是FailoverCluster
。
了解了三个对象的实现后,首先看refprotocol.refer(interfaceClass, url
方法,其具体实现由url
中对象的protocol
属性决定
1 | for (URL url : urls) { |
有上述代码看出refprotocol的实现由url
决定,此处url
又由loadRegistries(false)
方法获得,其protocol
属性为registry
。loadRegistries(false)
方法获取了所有<dubbo:registry>
注册的信息,根据其返回的类型List<URL>
,我们也可以发现Dubbo是可以配置多个注册中心的。
registry
对应实现类为RegistryProtocol
,根据上一篇关于SPI文章的讲解,此处的refprotocol
是一个代理类,其refer
方法的调用链为
Protocol$Adpative -> ProtocolListenerWrapper -> ProtocolFilterWrapper -> RegistryProtocol
来看一下RegistryProtocol
的refer
方法
1 | @SuppressWarnings("unchecked") |
首先registryFactory.getRegistry(url)
用于注册注册中心,这里的getRegistry
方法属于AbstractRegistryFactory
类,来看一下具体实现
1 | public Registry getRegistry(URL url) { |
这里通过加锁的方式往REGISTRIES
这个map里添加createRegistry(url)
方法生成的registry
对象,createRegistry(url)
方法的具体实现同样由SPI方式生成的代理对象执行,我们直接看zookeeper
情况下在ZookeeperRegistryFactory
这个类中的实现
1 | private ZookeeperTransporter zookeeperTransporter |
这里的zookeeperTransporter
是一个ZookeeperTransporter
类型的对象,该对象同样是由SPI机制管理的,默认实现方式是zkclient
的,对应实现类是ZkclientZookeeperTransporter
接着看一下new ZookeeperRegistry(url, zookeeperTransporter)
在做什么
1 | public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { |
这里ZookeeperRegistry
的构造方法调用了父类FailbackRegistry
的构造方法,然后做了两件事,1.在zookeeper上创建一个ZkClient
节点,2.监听该节点的RECONNECTED事,下面来看一看FailbackRegistry
的构造方法
1 | public FailbackRegistry(URL url) { |
这里又调用了父类AbstractRegistry
的构造方法,并启用了一个线程池来定时重建与注册中心的连接,从这个类的名字FailbackRegistry
就可以看出来它的作用。接着来看AbstractRegistry
的构造方法
1 | public AbstractRegistry(URL url) { |
这里首先从System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getHost() + ".cache"
这个用户本地目录加载缓存的文件到properties
中,接着调用了notify(List<URL> urls)
方法
1 | protected void notify(List<URL> urls) { |
这里用到了观察者模式,当服务变更时,通过getSubscribed()
方法获取订阅者列表,并通知所有的订阅者更新。
回到RegistryProtocol
类的refer(Class<T> type, URL url)
方法,最后返回的是doRefer(cluster, registry, type, url)
,来看一下里面做了什么
1 | private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { |
这里首先调用了register
方法在注册中心进行了订阅,并在注册中心的写入了消费者的信息。这里调用层级较深,就不再深入展开。
接着看一下subscribe
方法,
1 | public void subscribe(URL url) { |
调用了ZookeeperRegistry
的subscribe
方法,也就是FailbackRegistry
的subscribe
方法
1 | @Override |
可以看到首先会调用父类AbstractRegistry
的subscribe
方法添加监听器,然后通过doSubscribe(url, listener)
向服务端发送订阅请求,该方法在ZookeeperRegistry
类中实现,来重点看一下这个方法
1 | protected void doSubscribe(final URL url, final NotifyListener listener) { |
方法很长,主要做的事情是通过zkClient.create(path, false)
在zookeeper中创建监听的节点,然后调用notify(url, listener, urls)方法,其具体实现在FailbackRegistry
中
1 |
|
跟进doNotify
方法
1 | protected void doNotify(URL url, NotifyListener listener, List<URL> urls) { |
调用了父类AbstractRegistry
的notify
方法
1 | protected void notify(URL url, NotifyListener listener, List<URL> urls) { |
最后真正的实现在listener.notify(categoryList)中,跟到RegistryDirectory
的notify(List
1 | public synchronized void notify(List<URL> urls) { |
这里做的事情主要是更新configurators
和routers
,然后调用refreshInvoker(invokerUrls)
,这个方法用于更新服务提供者相关的invoker,因此非常重要
1 | private void refreshInvoker(List<URL> invokerUrls){ |
这里的核心是toInvokers(invokerUrls)
方法将URL列表转成Invoker列表
1 | private Map<String, Invoker<T>> toInvokers(List<URL> urls) { |
这里返回的是Map<String, Invoker<T>>
的map对象,由invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl)
方法创建
这里的protocol
同样由SPI机制创建,最终该refer
方法会由DubboProtocol
实现,来看一下
1 | public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException { |
可以看到创建的是一个DubboInvoker
的对象,这个对象中是存有和服务提供方的连接的,由getClients(url)
方法得到
1 | private ExchangeClient[] getClients(URL url){ |
返回值是ExchangeClient[]
类型的,初始化由initClient(url)
方法创建,默认的底层连接实现是通过Netty的,为了控制篇幅这里就不深入具体连接的细节了,有机会单开一篇讲一下。
继续跳转回ReferenceConfig
的createProxy
方法,到此refprotocol.refer(interfaceClass, urls.get(0))
就解析完了接着看后续的代码
1 | if (registryURL != null) { // 有 注册中心协议的URL |
这里的cluster
对象之前也提过了,同样是SPI机制管理的,从其选项failover
、failfast
及failback
等基本就可以推测出来这个对象和接口的负载均衡机制有关,以默认的FailoverCluster
类为例,来看一下join
方法
1 | public <T> Invoker<T> join(Directory<T> directory) throws RpcException { |
没有做什么特别得事情,就是根据invokers
合并生成一个FailoverClusterInvoker
类型的invoker
对象给消费者。
最后来看proxyFactory.getProxy(invoker)
方法,前面已经提过了proxyFactory
对象的具体实现在JavassistProxyFactory
中,来看一下这个类
1 | public class JavassistProxyFactory extends AbstractProxyFactory { |
在这里我们并没有发现getProxy(Invoker<T> invoker)
的方法,那么往父类去找,果然在AbstractProxyFactory
类中找到了,我们来看一下
1 | public abstract class AbstractProxyFactory implements ProxyFactory { |
从这里可以看到单参的getProxy
方法中调用了双参的getProxy
方法,该方法的具体实现又要根据子类来定,这又是一个经典的模板模式实现。
回过头看JavassistProxyFactory
中的getProxy
方法,这里涉及到了Javassist字节码技术,所以光看这个类的getProxy方法可能会有些看不懂,那么我们来看看另一个同样继承了AbstractProxyFactory
类的工厂类JdkProxyFactory
1 | public class JdkProxyFactory extends AbstractProxyFactory { |
看到这个类的getProxy
方法就很熟悉了,这就是标准的jdk的动态代理的实现,也就是说该方法返回的对象被InvokerInvocationHandler
类代理了。由此我们推断,JavassistProxyFactory
中的getProxy
也是起到了动态代理的效果。也就是说我们对invoker
的调用都会被代理类InvokerInvocationHandler
所代理。
总结
消费者初始化的流程非常长,但其实概括一下一共是做了三件事:
- 监听注册中心
- 连接服务提供者端进行服务引用
- 创建服务代理并返回
当然因为篇幅缘故很多细节没有写到位,不过大致的脉络应该是都没有落下了,这次的写作让我又一次对框架的开发者肃然起敬。