前言
消费者初始化也就是对<dubbo:reference>中的内容进行解析和初始化,根据Dubbo的官方文档描述,其对应的配置类为com.alibaba.dubbo.config.ReferenceConfig,但它的入口在哪里呢?
由于Dubbo和Spring是高度整合的,因此Spring中实现InitializingBean的方式是一个很好的初始化方式,找了一下确实找到了一个类ReferenceBean,让我们来看一下它的定义
1 | public class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean, ApplicationContextAware, InitializingBean, DisposableBean |
我们看到了InitializingBean加上ReferenceConfig,几乎可以肯定这就是我们要找的入口,那么开始吧
源码分析
说到InitializingBean第一反应肯定是找afterPropertiesSet()方法,来看一下
1 | ({ "unchecked"}) |
前面一大片的代码都是在做赋值,分别调用了setConsumer、setApplication、setModule、setRegistries和setMonitor这几个方法,全部都是AbstractInterfaceConfig类中的set方法,最核心的还是最后的getObject()方法
1 | public Object getObject() throws Exception { |
这里终于调用到了父类ReferenceConfig的get()方法,继续来看
1 | public synchronized T get() { |
这里的ref定义如下
1 | // 接口代理类引用 |
具体来看看init()方法
1 | private void init() { |
首先看一下checkDefault()这个方法
1 | private void checkDefault() { |
这里生成了一个ConsumerConfig对象,根据官方文档说明这个类对应了<dubbo:consumer>这个标签中的配置项,顺带提一句该标签是<dubbo:reference>标签的缺省值,因此会覆盖<dubbo:reference>中的设置。
接着看一下appendProperties(AbstractConfig config)方法做了些什么
1 | protected static void appendProperties(AbstractConfig config) { |
看着比较长,其实主要做的事情就是执行入参config中的set方法来初始化传入的对象,这里就不深入分析了。
继续往下看其中最重要的操作ref = createProxy(map),用于生成代理对象,具体来看一下其实现
1 | @SuppressWarnings({ "unchecked", "rawtypes", "deprecation" }) |
这个方法里面最值得关注的地方有三个,分别是
refprotocol.refer(interfaceClass, url)cluster.join(new StaticDirectory(u, invokers))proxyFactory.getProxy(invoker)
这三个对象很重要,来看看其定义
1 | private static final Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); |
这三个对象的生成使用的都是Dubbo的SPI机制,具体可以参考我之前发的讲解Dubbo中SPI的那篇文章,以proxyFactory为例,它的本质是ProxyFactory的一个代理类proxyFactory$Adpative,@Adaptive标注方法的具体实现由真正的实现类完成。
1 | ("javassist") |
stub=com.alibaba.dubbo.rpc.proxy.wrapper.StubProxyFactoryWrapper
jdk=com.alibaba.dubbo.rpc.proxy.jdk.JdkProxyFactory
javassist=com.alibaba.dubbo.rpc.proxy.javassist.JavassistProxyFactory
来看看JavassistProxyFactory类的源码
在META-INF/dubbo/internal/com.alibaba.dubbo.rpc.ProxyFactory中的内容如上所示,可以看到javassist是ProxyFactory接口的默认选择,对应的实现类就是JavassistProxyFactory;dubbo是Protocol接口的默认选项,对应实现类是DubboProtocol;failover是Cluster接口的默认选项,对应实现类是FailoverCluster。
了解了三个对象的实现后,首先看refprotocol.refer(interfaceClass, url方法,其具体实现由url中对象的protocol属性决定
1 | for (URL url : urls) { |
有上述代码看出refprotocol的实现由url决定,此处url又由loadRegistries(false)方法获得,其protocol属性为registry。loadRegistries(false)方法获取了所有<dubbo:registry>注册的信息,根据其返回的类型List<URL>,我们也可以发现Dubbo是可以配置多个注册中心的。
registry对应实现类为RegistryProtocol,根据上一篇关于SPI文章的讲解,此处的refprotocol是一个代理类,其refer方法的调用链为
Protocol$Adpative -> ProtocolListenerWrapper -> ProtocolFilterWrapper -> RegistryProtocol
来看一下RegistryProtocol的refer方法
1 | @SuppressWarnings("unchecked") |
首先registryFactory.getRegistry(url)用于注册注册中心,这里的getRegistry方法属于AbstractRegistryFactory类,来看一下具体实现
1 | public Registry getRegistry(URL url) { |
这里通过加锁的方式往REGISTRIES这个map里添加createRegistry(url)方法生成的registry对象,createRegistry(url)方法的具体实现同样由SPI方式生成的代理对象执行,我们直接看zookeeper情况下在ZookeeperRegistryFactory这个类中的实现
1 | private ZookeeperTransporter zookeeperTransporter |
这里的zookeeperTransporter是一个ZookeeperTransporter类型的对象,该对象同样是由SPI机制管理的,默认实现方式是zkclient的,对应实现类是ZkclientZookeeperTransporter
接着看一下new ZookeeperRegistry(url, zookeeperTransporter)在做什么
1 | public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { |
这里ZookeeperRegistry的构造方法调用了父类FailbackRegistry的构造方法,然后做了两件事,1.在zookeeper上创建一个ZkClient节点,2.监听该节点的RECONNECTED事,下面来看一看FailbackRegistry的构造方法
1 | public FailbackRegistry(URL url) { |
这里又调用了父类AbstractRegistry的构造方法,并启用了一个线程池来定时重建与注册中心的连接,从这个类的名字FailbackRegistry就可以看出来它的作用。接着来看AbstractRegistry的构造方法
1 | public AbstractRegistry(URL url) { |
这里首先从System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getHost() + ".cache"这个用户本地目录加载缓存的文件到properties中,接着调用了notify(List<URL> urls)方法
1 | protected void notify(List<URL> urls) { |
这里用到了观察者模式,当服务变更时,通过getSubscribed()方法获取订阅者列表,并通知所有的订阅者更新。
回到RegistryProtocol类的refer(Class<T> type, URL url)方法,最后返回的是doRefer(cluster, registry, type, url),来看一下里面做了什么
1 | private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { |
这里首先调用了register方法在注册中心进行了订阅,并在注册中心的写入了消费者的信息。这里调用层级较深,就不再深入展开。
接着看一下subscribe方法,
1 | public void subscribe(URL url) { |
调用了ZookeeperRegistry的subscribe方法,也就是FailbackRegistry的subscribe方法
1 | @Override |
可以看到首先会调用父类AbstractRegistry的subscribe方法添加监听器,然后通过doSubscribe(url, listener)向服务端发送订阅请求,该方法在ZookeeperRegistry类中实现,来重点看一下这个方法
1 | protected void doSubscribe(final URL url, final NotifyListener listener) { |
方法很长,主要做的事情是通过zkClient.create(path, false)在zookeeper中创建监听的节点,然后调用notify(url, listener, urls)方法,其具体实现在FailbackRegistry中
1 |
|
跟进doNotify方法
1 | protected void doNotify(URL url, NotifyListener listener, List<URL> urls) { |
调用了父类AbstractRegistry的notify方法
1 | protected void notify(URL url, NotifyListener listener, List<URL> urls) { |
最后真正的实现在listener.notify(categoryList)中,跟到RegistryDirectory的notify(List
1 | public synchronized void notify(List<URL> urls) { |
这里做的事情主要是更新configurators和routers,然后调用refreshInvoker(invokerUrls),这个方法用于更新服务提供者相关的invoker,因此非常重要
1 | private void refreshInvoker(List<URL> invokerUrls){ |
这里的核心是toInvokers(invokerUrls)方法将URL列表转成Invoker列表
1 | private Map<String, Invoker<T>> toInvokers(List<URL> urls) { |
这里返回的是Map<String, Invoker<T>>的map对象,由invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl)方法创建
这里的protocol同样由SPI机制创建,最终该refer方法会由DubboProtocol实现,来看一下
1 | public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException { |
可以看到创建的是一个DubboInvoker的对象,这个对象中是存有和服务提供方的连接的,由getClients(url)方法得到
1 | private ExchangeClient[] getClients(URL url){ |
返回值是ExchangeClient[]类型的,初始化由initClient(url)方法创建,默认的底层连接实现是通过Netty的,为了控制篇幅这里就不深入具体连接的细节了,有机会单开一篇讲一下。
继续跳转回ReferenceConfig的createProxy方法,到此refprotocol.refer(interfaceClass, urls.get(0))就解析完了接着看后续的代码
1 | if (registryURL != null) { // 有 注册中心协议的URL |
这里的cluster对象之前也提过了,同样是SPI机制管理的,从其选项failover、failfast及failback等基本就可以推测出来这个对象和接口的负载均衡机制有关,以默认的FailoverCluster类为例,来看一下join方法
1 | public <T> Invoker<T> join(Directory<T> directory) throws RpcException { |
没有做什么特别得事情,就是根据invokers合并生成一个FailoverClusterInvoker类型的invoker对象给消费者。
最后来看proxyFactory.getProxy(invoker)方法,前面已经提过了proxyFactory对象的具体实现在JavassistProxyFactory中,来看一下这个类
1 | public class JavassistProxyFactory extends AbstractProxyFactory { |
在这里我们并没有发现getProxy(Invoker<T> invoker)的方法,那么往父类去找,果然在AbstractProxyFactory类中找到了,我们来看一下
1 | public abstract class AbstractProxyFactory implements ProxyFactory { |
从这里可以看到单参的getProxy方法中调用了双参的getProxy方法,该方法的具体实现又要根据子类来定,这又是一个经典的模板模式实现。
回过头看JavassistProxyFactory中的getProxy方法,这里涉及到了Javassist字节码技术,所以光看这个类的getProxy方法可能会有些看不懂,那么我们来看看另一个同样继承了AbstractProxyFactory类的工厂类JdkProxyFactory
1 | public class JdkProxyFactory extends AbstractProxyFactory { |
看到这个类的getProxy方法就很熟悉了,这就是标准的jdk的动态代理的实现,也就是说该方法返回的对象被InvokerInvocationHandler类代理了。由此我们推断,JavassistProxyFactory中的getProxy也是起到了动态代理的效果。也就是说我们对invoker的调用都会被代理类InvokerInvocationHandler所代理。
总结
消费者初始化的流程非常长,但其实概括一下一共是做了三件事:
- 监听注册中心
- 连接服务提供者端进行服务引用
- 创建服务代理并返回
当然因为篇幅缘故很多细节没有写到位,不过大致的脉络应该是都没有落下了,这次的写作让我又一次对框架的开发者肃然起敬。