【Dubbo源码阅读系列】之远程服务调用(上)

今天打算来讲一讲Dubbo服务远程调用。笔者在开始看Dubbo远程服务相关源码的时候,看的有点迷糊。后来慢慢明白Dubbo远程服务的调用的本质就是动态代理模式的一种实现。本地消费者无须知道远程服务具体的实现,消费者和提供者通过代理类来进行交互!!一、JAVA动态代理简单看一段代码回顾一下动态代理:publicclassMyInvocationHandlerimplementsInvocationH...

【Dubbo源码阅读系列】之远程服务调用(上)

今天打算来讲一讲 Dubbo 服务远程调用。笔者在开始看 Dubbo 远程服务相关源码的时候,看的有点迷糊。后来慢慢明白 Dubbo 远程服务的调用的本质就是动态代理模式的一种实现。本地消费者无须知道远程服务具体的实现,消费者和提供者通过代理类来进行交互!!

一、JAVA 动态代理

简单看一段代码回顾一下动态代理:

public class MyInvocationHandler implements InvocationHandler{ private Object object;  public MyInvocationHandler(Object object){  this.object = object; }  @Override public Object invoke(Object proxy, Method method, Object[] args)throws Throwable {  Object result = method.invoke(object, args);  return result; }}public static void main(String[] args) { MyInvocationHandler handler = new MyInvocationHandler(stu); // 生成代理类 Student proxy = (Student)Proxy.newProxyInstance(loader, interfaces, handler); // 通过代理类调用 sayHello 方法 proxy.sayHello(message);}

实现动态代理的核心步骤有两步:
1、自定义实现了 InvocationHandler 接口的 handler 类,并重写 invoke() 方法
2、调用 Proxy.newProxyInstance 方法创建代理类

我们最后在调用 proxy.sayHello() 的时候,代理类会调用 MyInvocationHandler 类的 invoke() 方法,invoke() 方法通过反射机制最终调用 sayHello() 方法。既然对动态代理的核心组成已经了然,接下来我们就结合这两点分析下 Dubbo 远程服务调用的实现。

二、远程服务代理类的创建

创建时机

在 【Dubbo源码阅读系列】之 Dubbo XML 配置加载 中我们分析了 Dubbo 解析 XML 配置文件的相关流程,其中 标签会被解析为 ServiceBean 对象。类似的, 标签会被解析为 ReferenceBean 对象。ReferenceBean 类继承自 ReferenceConfig 类,仔细观察 ReferenceConfig 类不难发现 ReferenceConfig 中存在这样一条调用链。
get() ==> init() ==> createProxy()
不要怀疑...crateProxy() 方法就是我们今天的主角...更令人激动的是,我们可以在该方法中找到与前文归纳的动态代理实现核心步骤相对应的代码实现:

  • 创建 invoker 对象
    javainvoker = refprotocol.refer(interfaceClass, urls.get(0));
    这里返回的对象为 Invoker 对象。Invoke 类是一个接口类,里面定义了一个 invoke() 方法。
  • 调用工厂类创建代理对象
    javareturn (T) proxyFactory.getProxy(invoker);
    在这一小节,我们只需要对代理类创建流程有个大致的印象即可,我们在后文深入分析具体流程。

    创建 invoker

 invoker = refprotocol.refer(interfaceClass, urls.get(0)); 

熟悉的配方熟悉的料,通过 Dubbo SPI 机制我们发现这里调用的实际为 RegistryProtocol.refer(),问我为啥?详见:【Dubbo源码阅读系列】之 Dubbo SPI 机制
refprotocol.refer() 流程比较长,先放张时序图让大家有个基本的印象:

这里简单概括下上图中的重点内容:

  1. refProtocolProtocol.refer()
    上面我们已经提了这里的 refer() 方法最终调用的是 RegistryProtocol.refer() 方法。
    ** 在 refer() 方法中首先会调用 registryFactory.getRegistry(url) 获取 Registry 对象(Dubbo SPI 机制);
    ** 接着调用 doRefer() 方法。
  2. doRefer()
    • registry.register() 在 step3 介绍
    • directory.subscribe() 在 step4 介绍
  3. registry.register()
    笔者在调试时,用的注册中心为 zookeeper(实际官方也推荐),因此这里会在 zookeeper 上创建一个节点。节点类似:/dubbo/org.apache.dubbo.service.DemoService/consumers/url,如果没有设置 dynamic 参数,默认为临时节点;
  4. RegistryDirectory.subscribe(url)
    • 当前 url 中的 category 参数值被设置成了:providers,consumers,routers
      接着调用 registry.subscribe(url, this) 方法,不难分析最后调用的是 FailbackRegistry 类的 subscribe() 方法。
    • 另外需要注意 RegistryDirectory 类实现了 NotifyListener 接口中的 notify() 方法:
  5. FailbackRegistry.subscribe()
    • 调用 doSubscribe() 方法,在 Step6 介绍
  6. ZookeeperRegistry.doSubscribe()
    • url 会被 toCategoriesPath() 方法转换类似如下形式的 path 集合
      /dubbo/org.apache.dubbo.service.DemoService/providers
      /dubbo/org.apache.dubbo.service.DemoService/consumers
      /dubbo/org.apache.dubbo.service.DemoService/routers
    • 在 zookeeper 上创建对应的路径节点,同时添加监听器,这里监听器检测到变化会执行 ZookeeperRegistry 类的 notify() 方法
    • 如果对应 path 节点子节点为空,设置 url 的 protocol 值为 empty;子节点不为空,符合条件的 url 会被添加到 urls 集合中 。PS:如果服务提供方服务已经成功启动,/dubbo/org.apache.dubbo.service.DemoService/providers 路径下应该会子节点。
    • 执行 notify 方法
  7. notify(url, listener, urls);
    执行 doNotify() 方法,见 step8
  8. doNotify(url, listener, urls)
    调用父类的 notify() 方法
  9. AbstractRegistry.notify(URL url, NotifyListener listener, List urls)
    • 这里拿到的 urls 是 step6 中生成的。我们根据 url 中的 category 值对其分类,最后放到一个 map 集合中(key 为 category,value 为 url 集合)
    • 遍历上面生成的 map 集合,执行 listener.notify(categoryList) 方法。这里的 listener 为 RegistryDirecotry 对象,在 step4 中作为参数开始传递;
  10. RegistryDirectory.notify(categoryList)
    • 遍历 categoryList,将 category 值为 providers 的 url 添加到 invokerUrls 集合中
    • 执行 refreshInvoker(invokerUrls) 方法
  11. refreshInvoker(invokerUrls)
    refreshInvoker 非常重要,用于将 invokerUrls 转换为 invoker 对象。
    • 如果 invokerUrls 只有一条记录,且该条记录的 protocol 参数值为 empty,禁止访问
    • 调用 toInvokers() 方法
  12. toInvokers(List urls)
    这里会维护一个本地缓存 urlInvokerMap,key 值为 url 字符串;
    • 遍历 urls ,如果 urlInvokerMap 集合中 url 对应的 value 不为空,执行如下代码:
      javainvoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
      这里会新建一个 DubboInvoker 对象并返回,我们会在后文详细分析;
    • 直接取缓存中的值,不会重新构造 invoker 对象;
  13. toMethodInvokers
    step12 中最终会生成一个 key 为 url,value 为 invoker 集合。在这里进行处理后最后返回的集合 key 值为 method,value 为 invoker 集合
  14. cluster.join(directory);
    这里最后又用到 Dubbo SPI 机制,实际调用流程为:
    Cluster$Adaptive.join() ==》MockClusterInvoker.join() ==> FailoverCluster().join() ==> FailoverClusterInvoker() ==> AbstractClusterInvoker()
    最后返回的为一个 MockClusterInvoker 对象

DubboInvoker 对象创建流程

在上节中关于 invoker 的创建我们留了个小尾巴没有讲完。代码如下:

invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
  1. protocol.refer()
    这里又用到了 Dubbo SPI 机制,照例给出简单的调用流程:
    Protocol$Adaptive.refer() ==》 ProtocolListenerWrapper.refer() ==》 ProtocolFilterWrapper.refer() ==》 DubboProtocol.refer()
    其中在 DubboProtocol.refer() 方法中会构建 DubboInvoker 对象。
    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
    整体流程比较简单,但是注意看,这里有个很重要的方法:getClient(url)。它是用来干啥的?还记得我们再服务暴露之远程暴露那一节启动了 Netty 服务端吗?当时留了个关于 Netty 客户端在哪里启动的坑。这里的 getClients() 就是用来开启 Netty 客户端的。
  2. getClients(url)
    如果 url 中没有设置 connections 参数,默认共享链接,调用 getSharedClient() 获取 ExchangeClient 对象。
  3. getSharedClient(URL url)
    getSharedClient() 顾名思义是用于获取共享客户端的。referenceClientMap 集合用于缓存 client,key 值为 url 的 address 参数。如果取缓存时对应值为 null ,会调用 initClient(url) 方法新建 ExchangeClient
  4. initClient(url)
    调用 Exchangers.connect() 方法构建 client ,最后返回的 client 会通过构造方法被赋值到到 DubboInvoker 类的 clients 成员变量中;
  5. Exchangers.connect()
    java return getExchanger(url).connect(url, handler);
    • 调用 getExchanger(url) 获取 HeaderExchanger 类(Dubbo SPI 机制)
    • 调用 HeaderExchanger.connet() 方法建立连接
  6. HeaderExchanger.connect()
    java public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true); }
    核心方法为 Transporters.connect()
  7. Transporters.connect()
    java return getTransporter().connect(url, handler);
    • 调用 getTransporter() 方法获取 NettyTransporter 类(Dubbo SPI 机制)
    • 调用 NettyTransporter.connet() 方法建立连接
  8. NettyTransporter.connet()
    java public Client connect(URL url, Chan
源文地址:https://www.guoxiongfei.cn/cntech/9532.html