源码分析Dubbo消费端启动流程

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

转载声明:转载请注明出处,本技术博客是本人原创文章

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> 源码分析Dubbo消费端启动流程

微信公众号:**[中间件兴趣圈]** 作者简介:《RocketMQ技术内幕》作者;

通过前面文章详解,我们知道Dubbo服务消费者标签dubbo:reference最终会在Spring容器中创建一个对应的ReferenceBean实例,而ReferenceBean实现了Spring生命周期接口:InitializingBean,接下来应该看一下其afterPropertiesSet方法的实现。

源码分析ReferenceBean#afterPropertiesSet

ReferenceBean#afterPropertiesSet


 1if (getConsumer() == null) {
 2            MapString, ConsumerConfig consumerConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, 
 3                 ConsumerConfig.class, false, false);
 4            if (consumerConfigMap != null && consumerConfigMap.size()  0) {
 5                ConsumerConfig consumerConfig = null;
 6                for (ConsumerConfig config : consumerConfigMap.values()) {
 7                    if (config.isDefault() == null || config.isDefault().booleanValue()) {
 8                        if (consumerConfig != null) {
 9                            throw new IllegalStateException("Duplicate consumer configs: " + consumerConfig + " and " + config);
10                        }
11                        consumerConfig = config;
12                    }
13                }
14                if (consumerConfig != null) {
15                    setConsumer(consumerConfig);
16                }
17            }
18        }

Step1:如果consumer为空,说明dubbo:reference标签未设置consumer属性,如果一个dubbo:consumer标签,则取该实例,如果存在多个dubbo:consumer 配置,则consumer必须设置,否则会抛出异常:”Duplicate consumer configs”。

Step2:如果application为空,则尝试从BeanFactory中查询dubbo:application实例,如果存在多个dubbo:application配置,则抛出异常:”Duplicate application configs”。

Step3:如果ServiceBean的module为空,则尝试从BeanFactory中查询dubbo:module实例,如果存在多个dubbo:module,则抛出异常:”Duplicate module configs: “。

Step4:尝试从BeanFactory中加载所有的注册中心,注意ServiceBean的List RegistryConfig registries属性,为注册中心集合。

Step5:尝试从BeanFacotry中加载一个监控中心,填充ServiceBean的MonitorConfig monitor属性,如果存在多个dubbo:monitor配置,则抛出”Duplicate monitor configs: “。

ReferenceBean#afterPropertiesSet


1Boolean b = isInit();
2if (b == null && getConsumer() != null) {
3      b = getConsumer().isInit();
4}
5if (b != null && b.booleanValue()) {
6      getObject();
7}

Step6:判断是否初始化,如果为初始化,则调用getObject()方法,该方法也是FactoryBean定义的方法,ReferenceBean是dubbo:reference所真实引用的类(interface)的实例工程,getObject发返回的是interface的实例,而不是ReferenceBean实例。

源码分析getObject()


1public Object getObject() throws Exception {
2        return get();
3}

ReferenceBean#getObject()方法直接调用其父类的get方法,get方法内部调用init()方法进行初始化

源码分析ReferenceConfig#init方法

ReferenceConfig#init


1if (initialized) {
2     return;
3 }
4initialized = true;
5if (interfaceName == null || interfaceName.length() == 0) {
6      throw new IllegalStateException("dubbo:reference interface="" / interface not allow null!");
7}

Step1:如果已经初始化,直接返回,如果interfaceName为空,则抛出异常。

ReferenceConfig#init调用ReferenceConfig#checkDefault


1private void checkDefault() {
2        if (consumer == null) {
3            consumer = new ConsumerConfig();
4        }
5        appendProperties(consumer);
6    }

Step2:如果dubbo:reference标签也就是ReferenceBean的consumer属性为空,调用appendProperties方法,填充默认属性,其具体加载顺序:

  • 从系统属性加载对应参数值,参数键:dubbo.consumer.属性名,从系统属性中获取属性值的方法为:System.getProperty(key)。
  • 加载属性配置文件的值。属性配置文件,可通过系统属性:dubbo.properties.file,如果该值未配置,则默认取dubbo.properties属性配置文件。 ReferenceConfig#init

  • 
    1 appendProperties(this);
    

    Step3:调用appendProperties方法,填充ReferenceBean的属性,属性值来源与step2一样,当然只填充ReferenceBean中属性为空的属性。

    ReferenceConfig#init

    
     1if (getGeneric() == null && getConsumer() != null) {
     2      setGeneric(getConsumer().getGeneric());
     3}
     4if (ProtocolUtils.isGeneric(getGeneric())) {
     5      interfaceClass = GenericService.class;
     6} else {
     7      try {
     8              interfaceClass = Class.forName(interfaceName, true, Thread.currentThread().getContextClassLoader());
     9      } catch (ClassNotFoundException e) {
    10                throw new IllegalStateException(e.getMessage(), e);
    11      }
    12      checkInterfaceAndMethods(interfaceClass, methods);
    13}
    

    Step4:如果使用返回引用,将interface值替换为GenericService全路径名,如果不是,则加载interfacename,并检验dubbo:reference子标签dubbo:method引用的方法是否在interface指定的接口中存在。

    ReferenceConfig#init

    
     1String resolve = System.getProperty(interfaceName);      // @1
     2String resolveFile = null;
     3if (resolve == null || resolve.length() == 0) {                       // @2
     4     resolveFile = System.getProperty("dubbo.resolve.file");    // @3 start
     5     if (resolveFile == null || resolveFile.length() == 0) {
     6          File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties");
     7          if (userResolveFile.exists()) {
     8               resolveFile = userResolveFile.getAbsolutePath();
     9          }
    10     }    // @3 end
    11     if (resolveFile != null && resolveFile.length()  0) {    // @4
    12          Properties properties = new Properties();
    13          FileInputStream fis = null;
    14          try {
    15               fis = new FileInputStream(new File(resolveFile));
    16               properties.load(fis);
    17           } catch (IOException e) {
    18               throw new IllegalStateException("Unload " + resolveFile + ", cause: " + e.getMessage(), e);
    19           } finally {
    20               try {
    21                    if (null != fis) fis.close();
    22                } catch (IOException e) {
    23                    logger.warn(e.getMessage(), e);
    24               }
    25           }
    26          resolve = properties.getProperty(interfaceName);
    27      }
    28 }
    29 if (resolve != null && resolve.length()  0) {  // @5
    30      url = resolve;
    31      if (logger.isWarnEnabled()) {
    32         if (resolveFile != null && resolveFile.length()  0) {
    33             logger.warn("Using default dubbo resolve file " + resolveFile + " replace " + interfaceName + "" + resolve + " to p2p invoke remote service.");
    34         } else {
    35            logger.warn("Using -D" + interfaceName + "=" + resolve + " to p2p invoke remote service.");
    36        }
    37    }
    38}
    

    Step5:处理dubbo服务消费端resolve机制,也就是说消息消费者只连服务提供者,绕过注册中心。
    代码@1:从系统属性中获取该接口的直连服务提供者,如果存在 -Dinterface=dubbo://127.0.0.1:20880,其中interface为dubbo:reference interface属性的值。
    代码@2:如果未指定-D属性,尝试从resolve配置文件中查找,从这里看出-D的优先级更高。
    代码@3:首先尝试获取resolve配置文件的路径,其来源可以通过-Ddubbo.resolve.file=文件路径名来指定,如果未配置该系统参数,则默认从${user.home}/dubbo-resolve.properties,如果过文件存在,则设置resolveFile的值,否则resolveFile为null。
    代码@4:如果resolveFile不为空,则加载resolveFile文件中内容,然后通过interface获取其配置的直连服务提供者URL。
    代码@5:如果resolve不为空,则填充ReferenceBean的url属性为resolve(点对点服务提供者URL),打印日志,点对点URL的来源(系统属性、resolve配置文件)。

    ReferenceConfig#init

    
    1checkApplication();
    2checkStubAndMock(interfaceClass);
    

    Step6:校验ReferenceBean的application是否为空,如果为空,new 一个application,并尝试从系统属性(优先)、资源文件中填充其属性;同时校验stub、mock实现类与interface的兼容性。系统属性、资源文件属性的配置如下:
    application     dubbo.application.属性名,例如    dubbo.application.name

    ReferenceConfig#init

    
    1MapString, String map = new HashMapString, String();
    2MapObject, Object attributes = new HashMapObject, Object();
    3map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
    4map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
    5map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
    6if (ConfigUtils.getPid()  0) {
    7     map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
    8}
    

    Step7:构建Map,封装服务消费者引用服务提供者URL的属性,这里主要填充side:consume(消费端)、dubbo:2.0.0(版本)、timestamp、pid:进程ID。

    ReferenceConfig#init

    
     1if (!isGeneric()) {
     2    String revision = Version.getVersion(interfaceClass, version);
     3    if (revision != null && revision.length()  0) {
     4         map.put("revision", revision);
     5    }
     6    String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
     7    if (methods.length == 0) {
     8           logger.warn("NO method found in service interface " + interfaceClass.getName());
     9           map.put("methods", Constants.ANY_VALUE);
    10     } else {
    11          map.put("methods", StringUtils.join(new HashSetString(Arrays.asList(methods)), ","));
    12     }
    13
    14}
    

    Step8:如果不是泛化引用,增加methods:interface的所有方法名,多个用逗号隔开。
    ReferenceConfig#init

    
    1map.put(Constants.INTERFACE_KEY, interfaceName);
    2appendParameters(map, application);
    3appendParameters(map, module);
    4appendParameters(map, consumer, Constants.DEFAULT_KEY);
    5appendParameters(map, this);
    

    Step9:用Map存储application配置、module配置、默认消费者参数(ConsumerConfig)、服务消费者dubbo:reference的属性。
    ReferenceConfig#init

    
     1String prefix = StringUtils.getServiceKey(map);
     2if (methods != null && !methods.isEmpty()) {
     3    for (MethodConfig method : methods) {
     4         appendParameters(map, method, method.getName());
     5         String retryKey = method.getName() + ".retry";
     6         if (map.containsKey(retryKey)) {
     7              String retryValue = map.remove(retryKey);
     8              if ("false".equals(retryValue)) {
     9                  map.put(method.getName() + ".retries", "0");
    10              }
    11         }
    12         appendAttributes(attributes, method, prefix + "." + method.getName());
    13         checkAndConvertImplicitConfig(method, map, attributes);
    14   }
    15}
    

    Step10:获取服务键值 /{group}/interface:版本,如果group为空,则为interface:版本,其值存为prifex,然后将dubbo:method的属性名称也填入map中,键前缀为dubbo.method.methodname.属性名。dubbo:method的子标签dubbo:argument标签的属性也追加到attributes map中,键为 prifex + methodname.属性名。

    ReferenceConfig#init

    
    1String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);
    2if (hostToRegistry == null || hostToRegistry.length() == 0) {
    3      hostToRegistry = NetUtils.getLocalHost();
    4} else if (isInvalidLocalHost(hostToRegistry)) {
    5      throw new IllegalArgumentException("Specified invalid registry ip from property:" + Constants.DUBBO_IP_TO_REGISTRY + ", value:" + 
    6           hostToRegistry);
    7}
    8map.put(Constants.REGISTER_IP_KEY, hostToRegistry);
    

    Step11:填充register.ip属性,该属性是消息消费者连接注册中心的IP,并不是注册中心自身的IP。
    ReferenceConfig#init

    
    1ref = createProxy(map);
    

    Step12:调用createProxy方法创建消息消费者代理,下面详细分析其实现细节。
    ReferenceConfig#init

    
    1ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());
    2ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
    

    Step13:将消息消费者缓存在ApplicationModel中。

    源码分析ReferenceConfig#createProxy方法

    ReferenceConfig#createProxy

    
     1URL tmpUrl = new URL("temp", "localhost", 0, map);
     2final boolean isJvmRefer;
     3if (isInjvm() == null) {
     4    if (url != null && url.length()  0) { // if a url is specified, don't do local reference
     5          isJvmRefer = false;
     6    } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
     7         // by default, reference local service if there is
     8         isJvmRefer = true;
     9    } else {
    10         isJvmRefer = false;
    11    }
    12} else {
    13    isJvmRefer = isInjvm().booleanValue();
    14}
    

    Step1:判断该消费者是否是引用本(JVM)内提供的服务。
    如果dubbo:reference标签的injvm(已过期,被local属性替换)如果不为空,则直接取该值,如果该值未配置,则判断ReferenceConfig的url属性是否为空,如果不为空,则isJvmRefer =false,表明该服务消费者将直连该URL的服务提供者;如果url属性为空,则判断该协议是否是isInjvm,其实现逻辑:获取dubbo:reference的scop属性,根据其值判断:

  • 如果为空,isJvmRefer为false。
  • 如果协议为injvm,就是表示为本地协议,既然提供了本地协议的实现,则无需配置isJvmRefer该标签为true,故,isJvmRerfer=false。
  • 如果scope=local或injvm=true,isJvmRefer=true。
  • 如果scope=remote,isJvmRefer设置为false。
  • 如果是泛化引用,isJvmRefer设置为false。
  • 其他默认情况,isJvmRefer设置为true。
  • 如果协议为injvm,就是表示为本地协议,既然提供了本地协议的实现,则无需配置isJvmRefer该标签为true,故,isJvmRerfer=false。

    如果scope=remote,isJvmRefer设置为false。

    其他默认情况,isJvmRefer设置为true。

    ReferenceConfig#createProxy

    
    1if (isJvmRefer) {
    2   URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
    3   invoker = refprotocol.refer(interfaceClass, url);
    4   if (logger.isInfoEnabled()) {
    5         logger.info("Using injvm service " + interfaceClass.getName());
    6   }
    7} 
    

    Step2:如果消费者引用本地JVM中的服务,则利用InjvmProtocol创建Invoker,dubbo中的invoker主要负责服务调用的功能,是其核心实现,后续会在专门的章节中详细分析,在这里我们需要知道,会创建于协议相关的Invoker即可。

    ReferenceConfig#createProxy

    
     1if (url != null && url.length()  0) { // user specified URL, could be peer-to-peer address, or register center's address.
     2     String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);   // @1
     3     if (us != null && us.length  0) {
     4           for (String u : us) {
     5                  URL url = URL.valueOf(u);
     6                  if (url.getPath() == null || url.getPath().length() == 0) {
     7                       url = url.setPath(interfaceName);
     8                  }
     9                 if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {   // @2
    10                      urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
    11                  } else {
    12                      urls.add(ClusterUtils.mergeUrl(url, map));  // @3
    13                 }
    14             }
    15       }
    16} 
    

    Step3:处理直连情况,与step2互斥。
    代码@1:对直连URL进行分割,多个直连URL用分号隔开,如果URL中不包含path属性,则为URL设置path属性为interfaceName。
    代码@2:如果直连提供者的协议为registry,则对url增加refer属性,其值为消息消费者所有的属性。(表示从注册中心发现服务提供者)
    代码@3:如果是其他协议提供者,则合并服务提供者与消息消费者的属性,并移除服务提供者默认属性。以default开头的属性。

    ReferenceConfig#createProxy

    
     1ListURL us = loadRegistries(false);   // @1
     2if (us != null && !us.isEmpty()) {
     3     for (URL u : us) {
     4           URL monitorUrl = loadMonitor(u);   // @2
     5           if (monitorUrl != null) {
     6                 map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));   // @3
     7            }
     8            urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));  // @4
     9       }
    10}
    11if (urls == null || urls.isEmpty()) {
    12       throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config dubbo:registry 
    13          address="..." / to your spring config.");
    14}
    

    Step4:普通消息消费者,从注册中心订阅服务。
    代码@1:获取所有注册中心URL,其中参数false表示消费端,需要排除dubbo:registry subscribe=false的注册中心,其值为false表示不接受订阅。
    代码@2:根据注册中心URL,构建监控中心URL。
    代码@3:如果监控中心不为空,在注册中心URL后增加属性monitor。
    代码@4:在注册中心URL中,追加属性refer,其值为消费端的所有配置组成的URL。

    ReferenceConfig#createProxy

    
     1if (urls.size() == 1) {
     2    invoker = refprotocol.refer(interfaceClass, urls.get(0));     // @1
     3} else {
     4    ListInvoker? invokers = new ArrayListInvoker?();    // @2,多个服务提供者URL,集群模式
     5    URL registryURL = null;
     6    for (URL url : urls) {
     7         invokers.add(refprotocol.refer(interfaceClass, url));    // @2
     8         if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
     9               registryURL = url; // use last registry url
    10          }
    11     }
    12     if (registryURL != null) { // registry url is available
    13          // use AvailableCluster only when register's cluster is available
    14          URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
    15          invoker = cluster.join(new StaticDirectory(u, invokers));    // @3
    16     } else { // not a registry url
    17          invoker = cluster.join(new StaticDirectory(invokers));
    18     }
    19 }
    

    Step5:根据URL获取对应协议的Invoker。
    代码@1:如果只有一个服务提供者URL,则直接根据协议构建Invoker,具体有如下协议:

    源码分析Dubbo消费端启动流程 源码分析Dubbo消费端启动流程

    集群模式的Invoker和单个协议Invoker一样实现Invoker接口,然后在集群Invoker中利用Directory保证一个一个协议的调用器,十分的巧妙,在后续章节中将重点分析Dubbo Invoker实现原理,包含集群实现机制。

    ReferenceConfig#createProxy

    
     1Boolean c = check;
     2if (c == null && consumer != null) {
     3      c = consumer.isCheck();
     4}
     5if (c == null) {
     6      c = true; // default true
     7}
     8if (c && !invoker.isAvailable()) {
     9       throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group   
    10              == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + 
    11             NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
    12}
    

    代码@4:如果dubbo:referecnce的check=true或默认为空,则需要判断服务提供者是否存在。

    ReferenceConfig#createProxy

    
     1return (T) proxyFactory.getProxy(invoker);
     2AbstractProxyFactory#getProxy
     3public  T getProxy(Invoker invoker) throws RpcException {
     4        Class?[] interfaces = null;
     5        String config = invoker.getUrl().getParameter("interfaces");     // @1
     6        if (config != null && config.length()  0) {
     7            String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);
     8            if (types != null && types.length  0) {
     9                interfaces = new Class?[types.length + 2];
    10                interfaces[0] = invoker.getInterface();
    11                interfaces[1] = EchoService.class;        // @2
    12                for (int i = 0; i  types.length; i++) {
    13                    interfaces[i + 1] = ReflectUtils.forName(types[i]);
    14                }
    15            }
    16        }
    17        if (interfaces == null) {
    18            interfaces = new Class?[]{invoker.getInterface(), EchoService.class};
    19        }
    20        return getProxy(invoker, interfaces);    // @3
    21    }
    

    根据invoker获取代理类,其实现逻辑如下:
    代码@1:从消费者URL中获取interfaces的值,用,分隔出单个服务应用接口。
    代码@2:增加默认接口EchoService接口。
    代码@3:根据需要实现的接口,使用jdk或Javassist创建代理类。
    最后给出消息消费者启动时序图:

    源码分析Dubbo消费端启动流程

    本节关于Dubbo服务消费者(服务调用者)的启动流程就梳理到这里,下一篇将重点关注Invoker(服务调用相关的实现细节)。

    广告:作者新书《RocketMQ技术内幕》已上市

    源码分析Dubbo消费端启动流程

    《RocketMQ技术内幕》已出版上市,目前可在主流购物平台(京东、天猫等)购买,本书从源码角度深度分析了RocketMQ NameServer、消息发送、消息存储、消息消费、消息过滤、主从同步HA、事务消息;在实战篇重点介绍了RocketMQ运维管理界面与当前支持的39个运维命令;并在附录部分罗列了RocketMQ几乎所有的配置参数。本书得到了RocketMQ创始人、阿里巴巴Messaging开源技术负责人、Linux OpenMessaging 主席的高度认可并作序推荐。目前是国内第一本成体系剖析RocketMQ的书籍。
    新书7折优惠!7折优惠!7折优惠!

    更多文章请关注微信公众号:

    源码分析Dubbo消费端启动流程

    推荐关注微信公众号:RocketMQ官方微信公众号

    源码分析Dubbo消费端启动流程

    原文始发于微信公众号(中间件兴趣圈):

    本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

    转载声明:转载请注明出处,本技术博客是本人原创文章

    本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

    原文链接:blog.ouyangsihai.cn >> 源码分析Dubbo消费端启动流程


     上一篇
    源码分析Dubbo服务注册与发现机制RegistryDirectory) 源码分析Dubbo服务注册与发现机制RegistryDirectory)
    微信公众号:**[中间件兴趣圈]** 作者介绍:丁威,《RocketMQ技术内幕》作者; RegistryDirectory,基于注册中心的服务发现,本文将重点探讨Dubbo是如何实现服务的自动注册与发现。从上篇文章,得知在消息消费
    2021-04-05
    下一篇 
    源码分析Dubbo服务提供者启动流程-下篇 源码分析Dubbo服务提供者启动流程-下篇
    微信公众号:**[中间件兴趣圈]** 作者简介:《RocketMQ技术内幕》作者; 本文继续上文Dubbo服务提供者启动流程,在上篇文章中详细梳理了基于dubbo spring文件的配置方式,Dubbo是如何加载配置文件,服务提供者
    2021-04-05