源码分析Dubbo线程池实现原理

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

转载声明:转载请注明出处,本技术博客是本人原创文章

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> 源码分析Dubbo线程池实现原理

微信公众号:**[中间件兴趣圈]** 作者简介:《RocketMQ技术内幕》作者

本文主要分析Dubbo线程池的构建过程,主要介绍官方文档中有关于ThreadPool的种类:

  • fixed 固定大小线程池,启动时建立线程,不关闭,一致持有。(缺省)

  • cached 缓存线程池,空闲一分钟,线程会消费,需要时重新创建新线程。

  • limited 可伸缩线程池,但池中的线程数只会增长不会收缩。

  • eager 优先使用线程来执行新提交任务。(渴望立即执行,而不是进入队列排队执行)。 配置标签: dubbo:protocol threadpool = "fixed" ../

  • cached
    缓存线程池,空闲一分钟,线程会消费,需要时重新创建新线程。

    eager
    优先使用线程来执行新提交任务。(渴望立即执行,而不是进入队列排队执行)。
    配置标签: dubbo:protocol threadpool = “fixed” ../

    由于各种类型的线程池,内部是根据规则创建不同的ThreadPoolExecutor对象,那我们先简单回顾一下线程池的基本知识。

    线程池核心属性

  • int corePoolSize 线程池核心线程数、常驻线程数

  • int maximumPoolSize 线程池中最大线程数量

  • long keepAliveTime                                     线程保持活跃时间,(如果线程创建,并空闲指定值后,线程会被回收,0表示不开启该特性,其范围针对超过corePoolSize的线程)

  • TimeUnit unit keepAliveTime的时间单位。
  • BlockingQueue Runnable workQueue 任务队列

  • ThreadFactory threadFactory 线程工厂类,一般通过该线程工厂,为线程命名,以便区分线程。

  • RejectedExecutionHandler handler) 拒绝策略。

  • int maximumPoolSize
    线程池中最大线程数量

    TimeUnit unit

    BlockingQueue Runnable workQueue
    任务队列

    RejectedExecutionHandler handler)
    拒绝策略。

    线程池任务提交流程

  • 如果线程池中线程数量小于corePoolSize,则创建一个线程来执行该任务。
  • 如果线程池中的线程大于等于corePoolSize,则尝试将任务放入队列中。
  • 如果成功将任务放入队列,则本次提交任务正常结束,如果放入任务队列失败则继续下一步。
  • 如果线程池中的线程数量小于最大线程数,则创建先的线程,否则执行拒绝策略。
  • 更多有关线程池,请参考作者的另外一篇博文:https://blog.csdn.net/prestigeding/article/details/53929713

    解析来我们将一一分析Dubbo支持的线程池类型。

    fixed

    固定大小线程池。

    
     1public class FixedThreadPool implements ThreadPool {
     2    @Override
     3    public Executor getExecutor(URL url) {
     4        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
     5        int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
     6        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
     7        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
     8                queues == 0 ? new SynchronousQueueRunnable() :
     9                        (queues  0 ? new LinkedBlockingQueueRunnable()
    10                                : new LinkedBlockingQueueRunnable(queues)),
    11                new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    12    }
    13}
    

    实现要点:
      1、首先获取可配置参数threadname、threads、queues三个参数,分别代表线程池中线程名前缀、线程中最大线程数量、任务队列长度。

      2、要实现fixed固定大小线程池,故名思议,就是线程池自创建以来,线程数量始终保持一致。其实现要点是,corePoolSize、maximumPoolSize相等,并且其值等于threads(默认200),并且keepAliveTime=0,表示线程始终活跃。

      3、任务队列,如果queues 为0,则使用SynchronousQueue,如果小于0,则使用无界队列,如果大于0,则创建容量为LinkedBlockingQueue的队列,超过容量,则拒绝入队。

    4、线程工厂,NamedThreadFactory,主要设置线程名称,默认为Dubbo-thread-序号。

    5、拒绝策略AbortPolicyWithReport,其主要是如果拒绝任务,首先会打印出详细日志,包含线程池的核心参数,并且会dump jstack日志,日志文件默认存储在user.home/Dubbo_JStack.log.timestamp,可以通过dump.directory属性配置,可通过dubbo:protocol dubbo:parameter key =“” value = “”/ /dubbo:protocol。

    与该线程池相关的配置属性:threadname、theadpool、threads、queues。

    这里再简单介绍如果队列长度为0(默认),为什么是选用SynchronousQueue队列。

    SynchronousQueue的一个简单理解:调用offer、put之前,必须先调用take,也就是先调用take方法的线程阻塞,然后当别的线程调用offer之后,调用take的线程被唤醒,如果没有线程调用take方法,一个线程调用offer方法,则会返回false,并不会将元素添加到SynchronousQueue队列中,因为SynchronousQueue内部的队列长度为0。

    cached

    缓存线程池,线程空闲后会被回收。

    
     1public class CachedThreadPool implements ThreadPool {
     2    @Override
     3    public Executor getExecutor(URL url) {
     4        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
     5        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
     6        int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
     7        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
     8        int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
     9        return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
    10                queues == 0 ? new SynchronousQueueRunnable() :
    11                        (queues  0 ? new LinkedBlockingQueueRunnable()
    12                                : new LinkedBlockingQueueRunnable(queues)),
    13                new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    14    }
    15}
    

    实现要点:既然要实现线程可以被回收,则必然要设置keepAliveTime。
    故对应线程池核心参数设置,对应如下:

  • corePoolSize:通过参数corethreads设置,默认为0
  • maximumPoolSize:通过参数threads设置,默认200
  • keepAliveTime:通过参数alive设置,默认为60 * 1000
  • workQueue  :通过queues参数设置,默认为0
  • 其他与fixed相同,则不重复介绍
  • maximumPoolSize:通过参数threads设置,默认200

    workQueue  :通过queues参数设置,默认为0

    limited

    可伸缩线程池,其特征:线程数只增不减。

    
     1public class LimitedThreadPool implements ThreadPool {
     2    @Override
     3    public Executor getExecutor(URL url) {
     4        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
     5        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
     6        int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
     7        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
     8        return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
     9                queues == 0 ? new SynchronousQueueRunnable() :
    10                        (queues  0 ? new LinkedBlockingQueueRunnable()
    11                                : new LinkedBlockingQueueRunnable(queues)),
    12                new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    13    }
    14}
    

    与cached不同的是,limited线程池中线程一旦创建,就不回收,与cached不同的就是keepAliveTime的取值不同,limited取值为:Long.MAX_VALUE,其他与cached相同。

    eager

    其核心实现主要由TaskQueue、EagerThreadPoolExecutor共同完成。

    首先,我们关注一下TaskQueued的offer方法。

    
     1public boolean offer(Runnable runnable) {
     2        if (executor == null) {
     3            throw new RejectedExecutionException("The task queue does not have executor!");
     4        }
     5
     6        int currentPoolThreadSize = executor.getPoolSize();     // @1
     7        // have free worker. put task into queue to let the worker deal with task.
     8        if (executor.getSubmittedTaskCount()  currentPoolThreadSize) {   // @2
     9            return super.offer(runnable);
    10        }
    11
    12        // return false to let executor create new worker.
    13        if (currentPoolThreadSize  executor.getMaximumPoolSize()) {    // @3
    14            return false;
    15        }
    16
    17        // currentPoolThreadSize = max     // @4
    18        return super.offer(runnable); 
    19    }
    

    代码@1:获取当前线程池中线程的数量。

    代码@2:如果当前已提交到线程池中的任务数量小于当前存在在的线程数,则走默认的提交流程。

    代码@3:如果当前已提交到线程中的数量大于当前的线程池,并线程池中数量并未达到线程池允许创建的最大线程数时,则返回false,并不入队,其效果是会创建新的线程来执行。

    代码@4:如果当前线程池中的线程已达到允许创建的最大线程数后,走默认的提交任务逻辑。

    其次看一下EagerThreadPoolExecutor#execute

    
     1public void execute(Runnable command) {
     2        if (command == null) {
     3            throw new NullPointerException();
     4        }
     5        // do not increment in method beforeExecute!
     6        submittedTaskCount.incrementAndGet();       // @1 
     7        try {
     8            super.execute(command);
     9        } catch (RejectedExecutionException rx) {
    10            // retry to offer the task into queue.
    11            final TaskQueue queue = (TaskQueue) super.getQueue();
    12            try {
    13                if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
    14                    submittedTaskCount.decrementAndGet();
    15                    throw new RejectedExecutionException("Queue capacity is full.");
    16                }
    17            } catch (InterruptedException x) {
    18                submittedTaskCount.decrementAndGet();
    19                throw new RejectedExecutionException(x);
    20            }
    21        } catch (Throwable t) {
    22            // decrease any way
    23            submittedTaskCount.decrementAndGet();   // @2
    24        }
    25    }
    

    其核心实现逻辑:如果提交任务失败,则再走一次默认的任务提交流程。
    最总后结一下Eager的核心特性。

    
     1public class EagerThreadPool implements ThreadPool {
     2    @Override
     3    public Executor getExecutor(URL url) {
     4        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
     5        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
     6        int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
     7        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
     8        int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
     9
    10        // init queue and executor
    11        TaskQueueRunnable taskQueue = new TaskQueueRunnable(queues = 0 ? 1 : queues);
    12        EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
    13                threads,
    14                alive,
    15                TimeUnit.MILLISECONDS,
    16                taskQueue,
    17                new NamedThreadFactory(name, true),
    18                new AbortPolicyWithReport(name, url));
    19        taskQueue.setExecutor(executor);
    20        return executor;
    21    }
    22}
    

    其核心特性如下:
    1、首先,其配置参数与cached类型的线程池相同,说明eager也是基于缓存的。

    2、eager与cached类型线程池不同的一点是,提交任务后,线程优先于队列,默认的提交流程是如果线程数达到核心线程数后,新提交的任务是首先进入队列,但eager是优先创建线程来执行,这有点与公平锁,非公平锁一样的概念了。

    Dubbo的线程池种类就介绍到这里。

    广告:作者新书《RocketMQ技术内幕》已上市

    源码分析Dubbo线程池实现原理

    《RocketMQ技术内幕》已出版上市,目前可在主流购物平台(京东、天猫等)购买,本书从源码角度深度分析了RocketMQ NameServer、消息发送、消息存储、消息消费、消息过滤、主从同步HA、事务消息;在实战篇重点介绍了RocketMQ运维管理界面与当前支持的39个运维命令;并在附录部分罗列了RocketMQ几乎所有的配置参数。本书得到了RocketMQ创始人、阿里巴巴Messaging开源技术负责人、Linux OpenMessaging 主席的高度认可并作序推荐。目前是国内第一本成体系剖析RocketMQ的书籍。
    新书7折优惠!7折优惠!7折优惠!

    更多文章请关注微信公众号:

    源码分析Dubbo线程池实现原理

    推荐关注微信公众号:RocketMQ官方微信公众号:

    源码分析Dubbo线程池实现原理

    原文始发于微信公众号(中间件兴趣圈):

    本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

    转载声明:转载请注明出处,本技术博客是本人原创文章

    本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

    原文链接:blog.ouyangsihai.cn >> 源码分析Dubbo线程池实现原理


     上一篇
    源码分析Dubbo NettyServer与HeaderExchangeServer 源码分析Dubbo NettyServer与HeaderExchangeServer
    微信公众号:**[中间件兴趣圈]** 作者简介:《RocketMQ技术内幕》作者 本文主要分析一下NettyServer,HeaderExchangeServer实现细节。 NettyServerNettyServer整个类图如下:
    2021-04-05
    下一篇 
    源码分析Dubbo事件派发机制 源码分析Dubbo事件派发机制
    微信公众号:**[中间件兴趣圈]** 作者简介:《RocketMQ技术内幕》作者 本节将主要学习Dubbo是如何使用Netty来实现网络通讯的。 从官网我们得知,Dubbo协议是使用单一长连接来进行网络传输,也就是说服务调用方持久与
    2021-04-05