注册中心 Eureka 源码解析 —— 任务批处理

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

转载声明:转载请注明出处,本技术博客是本人原创文章

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> 注册中心 Eureka 源码解析 —— 任务批处理


**本文主要基于 Eureka 1.8.X 版本**

    1. 概述
    1. 整体流程
    1. 任务处理器
    2. 4. 创建任务分发器
  • 4.1 批量任务执行分发器

  • 4.2 单任务执行分发器

  • 6.1 创建批量任务执行器

  • 6.2 创建单任务执行器

  • 6.3 工作线程抽象类

  • 10.1 批量任务工作线程

  • 10.2 单任务工作线程

1. 概述

本文主要分享 任务批处理。Eureka-Server 集群通过任务批处理同步应用实例注册实例,所以本文也是为 Eureka-Server 集群同步的分享做铺垫。本文涉及类在  com.netflix.eureka.util.batcher 包下,涉及到主体类的类图如下( 打开大图 ):

  • 紫色部分 —— 任务分发器
  • 蓝色部分 —— 任务接收器
  • 红色部分 —— 任务执行器
  • 绿色部分 —— 任务处理器
  • 黄色部分 —— 任务持有者( 任务 )

推荐 Spring Cloud 书籍

  • 请支持正版。下载盗版,等于主动编写低级 BUG 。
  • 程序猿DD —— 《Spring Cloud微服务实战》
  • 周立 —— 《Spring Cloud与Docker微服务架构实战》
  • 两书齐买,京东包邮。

推荐 Spring Cloud 视频

  • Java 微服务实践 - Spring Boot
  • Java 微服务实践 - Spring Cloud
  • Java 微服务实践 - Spring Boot / Spring Cloud

2. 整体流程

任务执行的整体流程如下( 打开大图 ):

  • 细箭头 —— 任务执行经历的操作

  • 粗箭头 —— 任务队列流转的方向

  • **不同于**一般情况下,任务提交了**立即**同步或异步执行,任务的执行拆分了**三层队列**:
    • 蓝线:分发器在收到任务执行请求后,提交到接收队列,任务实际未执行

    • 黄线:执行器的工作线程处理任务失败,将符合条件( 见 「3. 任务处理器」 )的失败任务提交到重新执行队列。

    • 第一层,接收队列(  acceptorQueue ),重新处理队列(  reprocessQueue )。

    • 接收队列,避免处理任务的阻塞等待。

    • 接收线程( Runner )合并任务,将相同任务编号( 是的,任务是带有编号的 )的任务合并,只执行一次。

    • Eureka-Server 为集群同步提供批量操作多个应用实例的接口,一个批量任务可以一次调度接口完成,避免多次调用的开销。当然,这样做的前提是合并任务,这也导致 Eureka-Server 集群之间对应用实例的注册和下线带来更大的延迟。毕竟,Eureka 是在 CAP 之间,选择了 AP

    3. 任务处理器

    com.netflix.eureka.util.batcher.TaskProcessor ,任务处理器接口。接口代码如下:

    
    // ... 省略代码,超过微信文章上限
    
  • ProcessingResult ,处理任务结果。
      - `Success` ,成功。 - `Congestion` ,拥挤错误,**任务将会被重试**。例如,请求被限流。 - `TransientError` ,瞬时错误,**任务将会被重试**。例如,网络请求超时。 - `PermanentError` ,永久错误,**任务将会被丢弃**。例如,执行时发生程序异常。

      4. 创建任务分发器

      com.netflix.eureka.util.batcher.TaskDispatcher ,任务分发器接口。接口代码如下:

      
      // ... 省略代码,超过微信文章上限
      
      • #process(…) 方法,提交任务编号,任务,任务过期时间给任务分发器处理。

      com.netflix.eureka.util.batcher.TaskDispatchers ,任务分发器工厂类,用于创建任务分发器。其内部提供两种任务分发器的实现:

      • 批量任务执行的分发器,用于 Eureka-Server 集群注册信息的同步任务。
      • 单任务执行的分发器,用于 Eureka-Server 向亚马逊 AWS 的 ASG ( Autoscaling Group ) 同步状态。虽然本系列暂时对 AWS 相关的不做解析,从工具类的角度来说,本文会对该分发器进行分享。

      com.netflix.eureka.cluster.ReplicationTaskProcessor ,实现 TaskDispatcher ,Eureka-Server 集群任务处理器。感兴趣的同学,可以点击链接自己研究,我们将在 《Eureka 源码解析 —— Eureka-Server 集群同步》 有详细解析。

      4.1 批量任务执行分发器

      调用  TaskDispatchers#createBatchingTaskDispatcher(...) 方法,创建批量任务执行的分发器,实现代码如下:

      
      // TaskDispatchers.java
        1: /**
        2:  * 创建批量任务执行的分发器
        3:  *
        4:  * @param id 任务执行器编号
        5:  * @param maxBufferSize 待执行队列最大数量
        6:  * @param workloadSize 单个批量任务包含任务最大数量
        7:  * @param workerCount 任务执行器工作线程数
        8:  * @param maxBatchingDelay 批量任务等待最大延迟时长,单位:毫秒
        9:  * @param congestionRetryDelayMs 请求限流延迟重试时间,单位:毫秒
       10:  * @param networkFailureRetryMs 网络失败延迟重试时长,单位:毫秒
       11:  * @param taskProcessor 任务处理器
       12:  * @param ID 任务编号泛型
       13:  * @param  任务泛型
       14:  * @return 批量任务执行的分发器
       15:  */
       // ... 省略代码,超过微信文章上限
      
    • 第 1 至 23 行 :方法参数。比较多哈,请耐心理解。
        - `workloadSize` 参数,单个批量任务包含任务最大数量。 - `taskProcessor` 参数,**自定义任务执行器实现**。
        • 第 32 至 35 行 : #process() 方法的实现,调用 AcceptorExecutor#process(…) 方法,提交 [ 任务编号 , 任务 , 任务过期时间 ] 给任务分发器处理。

        4.2 单任务执行分发器

        调用  TaskDispatchers#createNonBatchingTaskDispatcher(...) 方法,创建单任务执行的分发器,实现代码如下:

        
          1: /**
          2:  * 创建单任务执行的分发器
          3:  *
          4:  * @param id 任务执行器编号
          5:  * @param maxBufferSize 待执行队列最大数量
          6:  * @param workerCount 任务执行器工作线程数
          7:  * @param maxBatchingDelay 批量任务等待最大延迟时长,单位:毫秒
          8:  * @param congestionRetryDelayMs 请求限流延迟重试时间,单位:毫秒
          9:  * @param networkFailureRetryMs 网络失败延迟重试时长,单位:毫秒
         10:  * @param taskProcessor 任务处理器
         11:  * @param ID 任务编号泛型
         12:  * @param  任务泛型
         13:  * @return 单任务执行的分发器
         14:  */
         15: public static ID,  TaskDispatcherID,  createNonBatchingTaskDispatcher(String id,
         16:                                                                             int maxBufferSize,
         17:                                                                             int workerCount,
         18:                                                                             long maxBatchingDelay,
         19:                                                                             long congestionRetryDelayMs,
         20:                                                                             long networkFailureRetryMs,
         21:                                                                             TaskProcessor taskProcessor) {
         22:     // 创建 任务接收执行器
         23:     final AcceptorExecutorID,  acceptorExecutor = new AcceptorExecutor(
         24:             id, maxBufferSize, /* workloadSize = 1 */1, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs
         25:     );
         26:     final TaskExecutorsID,  taskExecutor = TaskExecutors.singleItemExecutors(id, workerCount, taskProcessor, acceptorExecutor);
         27:     return new TaskDispatcherID, () {
         28:         @Override
         29:         public void process(ID id, T task, long expiryTime) {
         30:             acceptorExecutor.process(id, task, expiryTime);
         31:         }
         32: 
         33:         @Override
         34:         public void shutdown() {
         35:             acceptorExecutor.shutdown();
         36:             taskExecutor.shutdown();
         37:         }
         38:     };
         39: }
        
      • 第 1 至 21 行 :方法参数。比较多哈,请耐心理解。
          - `workloadSize` 参数,相比 `#createBatchingTaskDispatcher(…)` 少这个参数。**在第 24 行,你会发现该参数传递给 AcceptorExecutor 使用 1 噢**。 - `taskProcessor` 参数,**自定义任务执行器实现**。

          5. 创建任务接收执行器

          com.netflix.eureka.util.batcher.AcceptorExecutor ,任务接收执行器。创建构造方法代码如下:

          
          // ... 省略代码,超过微信文章上限
          
        • 第 5 至 61 行 :属性。比较多哈,请耐心理解。
            - 眼尖如你,会发现 AcceptorExecutor 即存在单任务工作队列( `singleItemWorkQueue` ),又存在批量任务工作队列( `batchWorkQueue` ) ,在 「9. 任务接收线程【调度任务】」 会解答这个疑惑。

            6. 创建任务执行器

            com.netflix.eureka.util.batcher.TaskExecutors ,任务执行器。其内部提供创建单任务和批量任务执行器的两种方法。TaskExecutors 构造方法如下:

            
            // ... 省略代码,超过微信文章上限
            
            • workerThreads 属性,工作线程工作任务队列会被工作线程池并发拉取,并发执行
            • com.netflix.eureka.util.batcher.TaskExecutors.WorkerRunnableFactory ,创建工作线程工厂接口。单任务和批量任务执行器的工作线程实现不同,通过自定义工厂实现类创建。

            6.1 创建批量任务执行器

            调用  TaskExecutors#batchExecutors(...) 方法,创建批量任务执行器。实现代码如下:

            
            /**
            * 创建批量任务执行器
            *
            * @param name 任务执行器名
            * @param workerCount 任务执行器工作线程数
            * @param processor 任务处理器
            * @param acceptorExecutor 接收任务执行器
            * @param ID 任务编号泛型
            * @param  任务泛型
            * @return 批量任务执行器
            */
            // ... 省略代码,超过微信文章上限
            
            • com.netflix.eureka.util.batcher.TaskExecutors.WorkerRunnable.BatchWorkerRunnable ,批量任务工作线程。

            6.2 创建单任务执行器

            调用  TaskExecutors#singleItemExecutors(...) 方法,创建批量任务执行器。实现代码如下:

            
            /**
            * 创建单任务执行器
            *
            * @param name 任务执行器名
            * @param workerCount 任务执行器工作线程数
            * @param processor 任务处理器
            * @param acceptorExecutor 接收任务执行器
            * @param ID 任务编号泛型
            * @param  任务泛型
            * @return 单任务执行器
            */
            // ... 省略代码,超过微信文章上限
            
            • com.netflix.eureka.util.batcher.TaskExecutors.WorkerRunnable.SingleTaskWorkerRunnable ,单任务工作线程。

            6.3 工作线程抽象类

            com.netflix.eureka.util.batcher.TaskExecutors.WorkerRunnable ,任务工作线程抽象类。BatchWorkerRunnable 和 SingleTaskWorkerRunnable 都实现该类,差异在  #run() 的自定义实现。WorkerRunnable 实现代码如下:

            
            // ... 省略代码,超过微信文章上限
            

            7. 网络通信整形器

            com.netflix.eureka.util.batcher.TrafficShaper ,网络通信整形器。当任务执行发生请求限流,或是请求网络失败的情况,则延时 AcceptorRunner 将任务提交到工作任务队列,从而避免任务很快去执行,再次发生上述情况。TrafficShaper 实现代码如下:

            
            // ... 省略代码,超过微信文章上限
            
            • #registerFailure(…) ,在任务执行失败时,提交任务结果给 TrafficShaper ,记录发生时间。在 「10. 任务执行器【执行任务】」 会看到调用该方法。
            • #transmissionDelay(…) ,计算提交延迟,单位:毫秒。「9. 任务接收线程【调度任务】」 会看到调用该方法。

            8. 任务接收执行器【处理任务】

            调用  AcceptorExecutor#process(...) 方法,添加任务到接收任务队列。实现代码如下:

            
            // AcceptorExecutor.java
            // ... 省略代码,超过微信文章上限
            
          • `com.netflix.eureka.util.batcher.TaskHolder` ,任务持有者,实现代码如下:
            `// ... 省略代码,超过微信文章上限`
          • 9. 任务接收线程【调度任务】

            后台线程执行  AcceptorRunner#run(...) 方法,调度任务。实现代码如下:

            
            // ... 省略代码,超过微信文章上限
            
            • 第 4 行 :无限循环执行调度,直到关闭。
            • 第 6 至 7 行 :调用 `#drainInputQueues()` 方法,**循环**处理完输入队列( 接收队列 + 重新执行队列 ),**直到**有待执行的任务。实现代码如下:
              // ... 省略代码,超过微信文章上限
              
              • 第 4 行 :优先从重新执行任务的队尾拿较新的任务,从而实现保留更新的任务在待执行任务映射(  pendingTasks ) 里。
              • 第 12 行 :添加任务编号到待执行队列( `processingOrder` ) 的头部。效果如下图:
              注册中心 Eureka 源码解析 —— 任务批处理
            • - 第 15 至 18 行 :如果待执行队列( `pendingTasks` )已满,清空重新执行队列( `processingOrder` ),放弃较早的任务。 - 重新执行队列( `reprocessQueue` ) 和接收队列( `acceptorQueue` )为空 - 待执行任务映射( `pendingTasks` )**不为空** - 第 2 行 && 第 18 行 :**循环**,直到**同时**满足如下全部条件:
            • 第 3 至 4 行 :处理完重新执行队列( `reprocessQueue` )。实现代码如下:
              `// ... 省略代码,超过微信文章上限`
            • 第 5 至 6 行 :处理完接收队列( `acceptorQueue` ),实现代码如下:
              `// ... 省略代码,超过微信文章上限`
            • - 第 8 至 17 行 :当所有队列为空,阻塞从接收队列( `acceptorQueue` ) 拉取任务 10 ms。若拉取到,添加到待执行队列( `processingOrder` )。
              
              // ... 省略代码,超过微信文章上限
              
              • 当  scheduleTime 小于当前时间,不重新计算,即此时需要延迟等待调度。
              • 当  scheduleTime 大于等于当前时间,配合  TrafficShaper#transmissionDelay(…) 重新计算。
              
              // ... 省略代码,超过微信文章上限
              
              • x
              • 第 2 行 :调用 `#hasEnoughTasksForNextBatch()` 方法,判断是否有足够任务进行下一次批量任务调度:1)待执行任务( `processingOrder` )映射已满;或者 2)到达批量任务处理最大等待延迟。实现代码如下:
                `// ... 省略代码,超过微信文章上限`
              • 第 5 至 17 行 :获取批量任务(  holders )。😈 你会发现,本文说了半天的批量任务,实际是  Listtaskholderid, t="" style="font-size: inherit;color: inherit;line-height: inherit;"/id,>/taskholderid,/id,哈。
              • 第 4 行 :获取批量任务工作请求信号量(  batchWorkRequests ) 。在任务执行器的批量任务执行器,每次执行时,发出  batchWorkRequests 。每一个信号量需要保证获取到一个批量任务
              • 第 19 至 20 行 :未调度到批量任务,释放请求信号量,代表请求实际未完成,每一个信号量需要保证获取到一个批量任务
              • 第 21 至 24 行 :添加批量任务到批量任务工作队列。
              • 第 23 行 :调用  #assignSingleItemWork() 方法,调度单任务。
              
              // ... 省略代码,超过微信文章上限
              
              • x

              10. 任务执行器【执行任务】

              10.1 批量任务工作线程

              批量任务工作后台线程( BatchWorkerRunnable )执行  #run(...) 方法,调度任务。实现代码如下:

              
              // 
              // ... 省略代码,超过微信文章上限
              
              • 第 4 行 :无限循环执行调度,直到关闭。
              • 第 6 行 :调用 `getWork()` 方法,获取**一个**批量任务直到成功。实现代码如下:
                `// ... 省略代码,超过微信文章上限`
                • 注意,批量任务工作队列(  batchWorkQueue ) 和单任务工作队列(  singleItemWorkQueue ) 是不同的队列
                • 第 3 行 :调用 `TaskDispatcher#requestWorkItems()` 方法,发起请求信号量,并获得批量任务的工作队列。实现代码如下:
                  // TaskDispatcher.java
                  // ... 省略代码,超过微信文章上限
                • 第 5 至 8 行 :循环获取一个批量任务,直到成功。
                
                // TaskDispatcher.java
                // ... 省略代码,超过微信文章上限
                
                
                // ... 省略代码,超过微信文章上限
                
                • x
                
                // AcceptorExecutor.java
                // ... 省略代码,超过微信文章上限
                

                10.2 单任务工作线程

                单任务工作后台线程( SingleTaskWorkerRunnable )执行  #run(...) 方法,调度任务,和  BatchWorkerRunnable#run(...) 基本类似,就不啰嗦了。实现代码如下:

                
                @Override
                // SingleTaskWorkerRunnable.java
                // ... 省略代码,超过微信文章上限
                

                666. 彩蛋

                😈 又是一篇长文。建议边看代码,边对照着整体流程图,理解实际不难。当然,欢迎你有任何疑问,在我的公众号( 芋道源码 ) 留言。胖友,分享我的公众号( 芋道源码 ) 给你的胖友可好?

                如果你对 Dubbo 感兴趣,欢迎加入我的知识星球一起交流。注册中心 Eureka 源码解析 —— 任务批处理

                目前在知识星球(https://t.zsxq.com/2VbiaEu)更新了如下 Dubbo 源码解析如下:01. 调试环境搭建

                02. 项目结构一览

                1. 配置 Configuration

                04. 核心流程一览05. 拓展机制 SPI06. 线程池07. 服务暴露 Export08. 服务引用 Refer09. 注册中心 Registry10. 动态编译 Compile11. 动态代理 Proxy12. 服务调用 Invoke13. 调用特性 14. 过滤器 Filter15. NIO 服务器16. P2P 服务器17. HTTP 服务器18. 序列化 Serialization19. 集群容错 Cluster20. 优雅停机21. 日志适配22. 状态检查23. 监控中心 Monitor24. 管理中心 Admin25. 运维命令 QOS26. 链路追踪 Tracing…

                一共 60 篇++

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

转载声明:转载请注明出处,本技术博客是本人原创文章

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> 注册中心 Eureka 源码解析 —— 任务批处理


 上一篇
关于Eureka 2.x,别再人云亦云了! 关于Eureka 2.x,别再人云亦云了!
原文:http://www.itmuch.com/spring-cloud/eureka-2-news/ ,转载请说明出处。 最近朋友圈被Eureka 2.x停止开发的新闻刷屏,例如: Eureka 2.0 开源工作宣告停止,继续使用
2021-04-05
下一篇 
注册中心 Eureka源码解析 —— 应用实例注册发现 (九)之岁月是把萌萌的读写锁 注册中心 Eureka源码解析 —— 应用实例注册发现 (九)之岁月是把萌萌的读写锁
精尽 Dubbo 原理与源码专栏( 已经完成 69+ 篇,预计总共 75+ 篇 ) 中文详细注释的开源项目 Java 并发源码合集 RocketMQ 源码合集 Sharding-JDBC 源码解析合集 Spring MVC 和 Secur
2021-04-05