源码分析Dubbo服务提供者、服务消费者并发度控制机制

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

转载声明:转载请注明出处,本技术博客是本人原创文章

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> 源码分析Dubbo服务提供者、服务消费者并发度控制机制

微信公众号:**[中间件兴趣圈]** 作者简介:《RocketMQ技术内幕》

本文将详细分析dubbo:service executes=””/与dubbo:reference actives = “”/的实现机制,深入探讨Dubbo自身的保护机制。

源码分析ExecuteLimitFilter

@Activate(group = Constants.PROVIDER, value = Constants.EXECUTES_KEY )

  • 过滤器作用 服务调用方并发度控制。

  • 使用场景 对Dubbo服务提供者实现的一种保护机制,控制每个服务的最大并发度。

  • 阻断条件 当服务调用超过允许的并发度后,直接抛出RpcException异常。 接下来源码分析ExecuteLimitFilter的实现细节。

  • 使用场景
    对Dubbo服务提供者实现的一种保护机制,控制每个服务的最大并发度。

    ExecuteLimitFilter#invoke

    
     1public Result invoke(Invoker? invoker, Invocation invocation) throws RpcException {
     2        URL url = invoker.getUrl();
     3        String methodName = invocation.getMethodName();
     4        Semaphore executesLimit = null;
     5        boolean acquireResult = false;
     6        int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);      // @1
     7        if (max  0) {
     8            RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName());             // @2
     9            executesLimit = count.getSemaphore(max);                                                              // @3
    10            if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) {              // @4
    11                throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads 
    12                      greater than dubbo:service executes="" + max + "" / limited.");
    13            }
    14        }
    15        boolean isSuccess = true;
    16        try {
    17            Result result = invoker.invoke(invocation);                 // @5
    18            return result;
    19        } catch (Throwable t) {
    20            isSuccess = false;
    21            if (t instanceof RuntimeException) {
    22                throw (RuntimeException) t;
    23            } else {
    24                throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
    25            }
    26        } finally {
    27            if(acquireResult) {                                   // @6
    28                executesLimit.release();
    29            }
    30        }
    31    }
    

    代码@1:从服务提供者列表中获取参数executes的值,如果该值小于等于0,表示不启用并发度控制,直接沿着调用链进行调用。

    代码@2:根据服务提供者url和服务调用方法名,获取RpcStatus。

    
     1public static RpcStatus getStatus(URL url, String methodName) {
     2        String uri = url.toIdentityString();      
     3        ConcurrentMapString, RpcStatus map = METHOD_STATISTICS.get(uri);         
     4        if (map == null) {
     5            METHOD_STATISTICS.putIfAbsent(uri, new ConcurrentHashMapString, RpcStatus());    
     6            map = METHOD_STATISTICS.get(uri);
     7        }
     8        RpcStatus status = map.get(methodName);          /
     9        if (status == null) {
    10            map.putIfAbsent(methodName, new RpcStatus());
    11            status = map.get(methodName);
    12        }
    13        return status;
    14    }
    

    这里是并发容器ConcurrentHashMap的经典使用,从这里可以看出ConcurrentMapString, ConcurrentMap String, RpcStatus METHOD_STATISTICS的存储结构为 {  服务提供者URL唯一字符串:{方法名:RpcStatus} }。

    代码@3:根据服务提供者配置的最大并发度,创建该服务该方法对应的信号量对象。

    
     1public Semaphore getSemaphore(int maxThreadNum) {
     2        if(maxThreadNum = 0) {
     3            return null;
     4        }
     5        if (executesLimit == null || executesPermits != maxThreadNum) {
     6            synchronized (this) {
     7                if (executesLimit == null || executesPermits != maxThreadNum) {
     8                    executesLimit = new Semaphore(maxThreadNum);
     9                    executesPermits = maxThreadNum;
    10                }
    11            }
    12        }
    13        return executesLimit;
    14    }
    

    使用了双重检测来创建executesLimit 信号量。
    代码@4:如果获取不到锁,并不会阻塞等待,而是直接抛出RpcException,服务端的策略是快速抛出异常,供服务调用方(消费者)根据集群策略进行执行,例如重试其他服务提供者。

    代码@5:执行真实的服务调用。

    代码@6:如果成功申请到信号量,在服务调用结束后,释放信号量。

    总结:dubbo:service executes=””/的含义是,针对每个服务每个方法的最大并发度。如果超过该值,则直接抛出RpcException。

    源码分析ActiveLimitFilter

    @Activate(group = Constants.CONSUMER, value = Constants.ACTIVES_KEY )

  • 过滤器作用 消费端调用服务的并发控制。

  • 使用场景 控制同一个消费端对服务端某一服务的并发调用度,通常该值应该小于 dubbo:service executes=""/

  • 阻断条件 非阻断,但如果超过允许的并发度会阻塞,超过超时时间后将不再调用服务,而是直接抛出超时。

  • 使用场景
    控制同一个消费端对服务端某一服务的并发调用度,通常该值应该小于 dubbo:service executes=””/

    ActiveLimitFilter#invoke

    
     1public Result invoke(Invoker? invoker, Invocation invocation) throws RpcException {
     2        URL url = invoker.getUrl();
     3        String methodName = invocation.getMethodName();
     4        int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);    // @1
     5        RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());           // @2
     6        if (max  0) {                                          
     7            long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);   // @3
     8            long start = System.currentTimeMillis();
     9            long remain = timeout;
    10            int active = count.getActive();                                                                                                                                          // @4
    11            if (active = max) {                                                                                                                                                          // @5
    12                synchronized (count) {                                                                                                                                                                      
    13                    while ((active = count.getActive()) = max) {                                                                                                     
    14                        try {
    15                            count.wait(remain);                                                                                                                                      
    16                        } catch (InterruptedException e) {
    17                        }
    18                        long elapsed = System.currentTimeMillis() - start;                               
    19                        remain = timeout - elapsed;
    20                        if (remain = 0) {                                                                                                                                             // @6
    21                            throw new RpcException("Waiting concurrent invoke timeout in client-side for service:  "
    22                                    + invoker.getInterface().getName() + ", method: "
    23                                    + invocation.getMethodName() + ", elapsed: " + elapsed
    24                                    + ", timeout: " + timeout + ". concurrent invokes: " + active
    25                                    + ". max concurrent invoke limit: " + max);
    26                        }
    27                    }
    28                }
    29            }
    30        }
    31        try {
    32            long begin = System.currentTimeMillis();
    33            RpcStatus.beginCount(url, methodName);        // @7
    34            try {
    35                Result result = invoker.invoke(invocation);     // @8
    36                RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true);    // @9
    37                return result;
    38            } catch (RuntimeException t) {
    39                RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false);
    40                throw t;
    41            }
    42        } finally {
    43            if (max  0) {
    44                synchronized (count) {
    45                    count.notify();     // @10
    46                }
    47            }
    48        }
    49    }
    

    代码@1:从Invoker中获取消息端URL中的配置的actives参数,为什么从Invoker中获取的Url是消费端的Url呢?这是因为在消费端根据服务提供者URL创建调用Invoker时,会用服务提供者URL,然后合并消费端的配置属性,其优先级 -D 消费端 服务端。其代码位于:
    RegistryDirectory#toInvokers

    
    1URL url = mergeUrl(providerUrl);
    

    代码@2:根据服务提供者URL和调用服务提供者方法,获取RpcStatus。

    代码@3:获取接口调用的超时时间,默认为1s。

    代码@4:获取当前消费者,针对特定服务,特定方法的并发调用度,active值。

    代码@5:如果当前的并发 调用大于等于允许的最大值,则针对该RpcStatus申请锁,并调用其wait(timeout)进行等待,也就是在接口调用超时时间内,还是未被唤醒,则直接抛出超时异常。

    代码@6:判断被唤醒的原因是因为等待超时,还是由于调用结束,释放了”名额“,如果是超时唤醒,则直接抛出异常。

    代码@7:在一次服务调用前,先将 服务名+方法名对应的RpcStatus的active加一。

    代码@8:执行RPC服务调用。

    代码@9:记录成功调用或失败调用,并将active减一。

    代码@10:最终成功执行,如果开启了actives机制(dubbo:referecnce actives=””)时,唤醒等待者。

    总结:dubbo:reference actives=””/ 是控制消费端对单个服务提供者单个服务允许调用的最大并发度。该值的取值不应该大于dubbo:service executes=””/的值,并且如果消费者机器的配置,如果性能不尽相同,不建议对该值进行设置。

    广告:作者的新书《RocketMQ技术内幕》已上市

    源码分析Dubbo服务提供者、服务消费者并发度控制机制

    《RocketMQ技术内幕》已出版上市,目前可在主流购物平台(京东、天猫等)购买,本书从源码角度深度分析了RocketMQ NameServer、消息发送、消息存储、消息消费、消息过滤、主从同步HA、事务消息;在实战篇重点介绍了RocketMQ运维管理界面与当前支持的39个运维命令;并在附录部分罗列了RocketMQ几乎所有的配置参数。本书得到了RocketMQ创始人、阿里巴巴Messaging开源技术负责人、Linux OpenMessaging 主席的高度认可并作序推荐。目前是国内第一本成体系剖析RocketMQ的书籍。
    新书7折优惠!7折优惠!7折优惠!

    更多文章请关注微信公众号:

    源码分析Dubbo服务提供者、服务消费者并发度控制机制

    推荐关注微信公众号:RocketMQ官方微信公众号

    源码分析Dubbo服务提供者、服务消费者并发度控制机制

    原文始发于微信公众号(中间件兴趣圈):

    本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

    转载声明:转载请注明出处,本技术博客是本人原创文章

    本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

    原文链接:blog.ouyangsihai.cn >> 源码分析Dubbo服务提供者、服务消费者并发度控制机制


     上一篇
    源码分析Dubbo tps过滤器器实现原理 源码分析Dubbo tps过滤器器实现原理
    微信公众号:**[中间件兴趣圈]** 作者简介:《RocketMQ技术内幕》作者 本文将重点分析一下dubbo限流的另外一个方式,tps过滤器。@Activate(group = Constants.PROVIDER, value
    2021-04-05
    下一篇 
    源码分析Dubbo服务调用日志(accesslog参数)实现原理 源码分析Dubbo服务调用日志(accesslog参数)实现原理
    微信公众号:**[中间件兴趣圈]** 作者简介:《RocketMQ技术内幕》 谈到服务调用日志,大家恐怕第一想到就是如果开启了这个参数,会影响性能。那真实的情况是怎么样了?性能损耗到底有多大呢?在实践中我们如何使用该功能呢?本文将详
    2021-04-05