注册中心 Eureka 源码解析 —— 应用实例注册发现 (二)之续租

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

转载声明:转载请注明出处,本技术博客是本人原创文章

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> 注册中心 Eureka 源码解析 —— 应用实例注册发现 (二)之续租

本文主要基于 Eureka 1.8.X 版本

    1. 概述
    1. Eureka-Client 发起续租
  • 2.1 初始化定时任务
  • 2.2 HeartbeatThread
  • 2.3 TimedSupervisorTask
    1. Eureka-Server 接收续租
  • 3.1 接收续租请求
  • 3.2 续租应用实例信息
    1. 彩蛋

1. 概述

本文主要分享 Eureka-Client 向 Eureka-Server 续租应用实例的过程

FROM 《深度剖析服务发现组件Netflix Eureka》 二次编辑

注册中心 Eureka 源码解析 —— 应用实例注册发现 (二)之续租
  • 蓝框部分,为本文重点。
  • 蓝框部分,Eureka-Server 集群间复制注册的应用实例信息,不在本文内容范畴。

推荐 Spring Cloud 书籍

  • 请支持正版。下载盗版,等于主动编写低级 BUG 。
  • 程序猿DD —— 《Spring Cloud微服务实战》
  • 周立 —— 《Spring Cloud与Docker微服务架构实战》

推荐 Spring Cloud 视频

  • Java 微服务实践 - Spring Boot
  • Java 微服务实践 - Spring Cloud
  • Java 微服务实践 - Spring Boot / Spring Cloud

2. Eureka-Client 发起续租

Eureka-Client 向 Eureka-Server 发起注册应用实例成功后获得租约 ( Lease )。
Eureka-Client 固定间隔向 Eureka-Server 发起续租( renew ),避免租约过期。

默认情况下,租约有效期为 90 秒,续租频率为 30 秒。两者比例为 1 : 3 ,保证在网络异常等情况下,有三次重试的机会。

2.1 初始化定时任务

Eureka-Client 在初始化过程中,创建心跳线程,固定间隔向 Eureka-Server 发起续租( renew )。实现代码如下:


// DiscoveryClient.java

DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
               ProviderBackupRegistry backupRegistryProvider) {
    // ... 省略无关代码
    scheduler = Executors.newScheduledThreadPool(2,
               new ThreadFactoryBuilder()
                       .setNameFormat("DiscoveryClient-%d")
                       .setDaemon(true)
                       .build());

    heartbeatExecutor = new ThreadPoolExecutor(
              1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
              new SynchronousQueueRunnable(),
              new ThreadFactoryBuilder()
                      .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                      .setDaemon(true)
                      .build()
    );  // use direct handoff

    // ... 省略无关代码

    // 【3.2.14】初始化定时任务
    initScheduledTasks();
}

private void initScheduledTasks() {

    // 向 Eureka-Server 心跳(续租)执行器
    if (clientConfig.shouldRegisterWithEureka()) {
        int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); // 续租频率
        int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); //
        logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);

        // Heartbeat timer
        scheduler.schedule(
               new TimedSupervisorTask(
                       "heartbeat",
                       scheduler,
                       heartbeatExecutor,
                       renewalIntervalInSecs,
                       TimeUnit.SECONDS,
                       expBackOffBound,
                       new HeartbeatThread()
               ),
               renewalIntervalInSecs, TimeUnit.SECONDS);

          // ... 省略无关代码
     }
     // ... 省略无关代码
}
  • scheduler,定时任务服务,用于定时触发心跳( 续租 )。细心如你,会发现任务提交的方式是  ScheduledExecutorService#schedule(...) 方法,只延迟执行一次心跳,说好的固定频率执行心跳呢!!!答案在 「2.3 TimedSupervisorTask」 揭晓。
  • heartbeatExecutor,心跳任务执行线程池。为什么有  scheduler 的情况下,还有  heartbeatExecutor ???答案也在 「2.3 TimedSupervisorTask」 揭晓。
  • HeartbeatThread,心跳线程,在「2.2 TimedSupervisorTask」 详细解析。

2.2 HeartbeatThread

com.netflix.discovery.DiscoveryClient.HeartbeatThread,心跳线程,实现执行 Eureka-Client 向 Eureka-Server 发起续租( renew )请求。实现代码如下:


// DiscoveryClient.java
/**
* 最后成功向 Eureka-Server 心跳时间戳
*/
private volatile long lastSuccessfulHeartbeatTimestamp = -1;

private class HeartbeatThread implements Runnable {

   public void run() {
       if (renew()) {
           lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
       }
   }
}
  • 调用 `#renew` 方法,执行续租逻辑。实现代码如下:
    // DiscoveryClient.java
    boolean renew() {
       EurekaHttpResponseInstanceInfo httpResponse;
       try {
           httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
           logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
           if (httpResponse.getStatusCode() == 404) {
               REREGISTER_COUNTER.increment();
               logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
               long timestamp = instanceInfo.setIsDirtyWithTime();
               // 发起注册
               boolean success = register();
               if (success) {
                   instanceInfo.unsetIsDirty(timestamp);
               }
               return success;
           }
           return httpResponse.getStatusCode() == 200;
       } catch (Throwable e) {
           logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
           return false;
       }
    }
    
      - PUT 请求 Eureka-Server 的 `apps/${APP_NAME}/${INSTANCE_INFO_ID}` 接口,参数为 `status`、`lastDirtyTimestamp`、`overriddenstatus`,实现续租。
    • 调用 `AbstractJerseyEurekaHttpClient#sendHeartBeat(...)` 方法,发起**续租请求**,实现代码如下:
      // AbstractJerseyEurekaHttpClient.java
      @Override
      public EurekaHttpResponseInstanceInfo sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) {
         String urlPath = "apps/" + appName + '/' + id;
         ClientResponse response = null;
         try {
             WebResource webResource = jerseyClient.resource(serviceUrl)
                     .path(urlPath)
                     .queryParam("status", info.getStatus().toString())
                     .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
             if (overriddenStatus != null) {
                 webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());
             }
             Builder requestBuilder = webResource.getRequestBuilder();
             addExtraHeaders(requestBuilder);
             response = requestBuilder.put(ClientResponse.class);
             EurekaHttpResponseBuilderInstanceInfo eurekaResponseBuilder = anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response));
             if (response.hasEntity()) {
                 eurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class));
             }
             return eurekaResponseBuilder.build();
         } finally {
             if (logger.isDebugEnabled()) {
                 logger.debug("Jersey HTTP PUT {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
             }
             if (response != null) {
                 response.close();
             }
         }
      }
      
    • 调用 `AbstractJerseyEurekaHttpClient#register(...)` 方法,当 Eureka-Server **不存在租约**时,重新发起注册,在《Eureka 源码解析 —— 应用实例注册发现 (一)之注册》有详细解析。
    • 调用  AbstractJerseyEurekaHttpClient#sendHeartBeat(...) 方法,发起续租请求,实现代码如下:

      
      // AbstractJerseyEurekaHttpClient.java
      @Override
      public EurekaHttpResponseInstanceInfo sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) {
         String urlPath = "apps/" + appName + '/' + id;
         ClientResponse response = null;
         try {
             WebResource webResource = jerseyClient.resource(serviceUrl)
                     .path(urlPath)
                     .queryParam("status", info.getStatus().toString())
                     .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());
             if (overriddenStatus != null) {
                 webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());
             }
             Builder requestBuilder = webResource.getRequestBuilder();
             addExtraHeaders(requestBuilder);
             response = requestBuilder.put(ClientResponse.class);
             EurekaHttpResponseBuilderInstanceInfo eurekaResponseBuilder = anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response));
             if (response.hasEntity()) {
                 eurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class));
             }
             return eurekaResponseBuilder.build();
         } finally {
             if (logger.isDebugEnabled()) {
                 logger.debug("Jersey HTTP PUT {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
             }
             if (response != null) {
                 response.close();
             }
         }
      }
      

      2.3 TimedSupervisorTask

      com.netflix.discovery.TimedSupervisorTask,监管定时任务的任务。

      A supervisor task that schedules subtasks while enforce a timeout.

      创建 TimedSupervisorTask 代码如下:

      
      public class TimedSupervisorTask extends TimerTask {
      
          private final Counter timeoutCounter;
          private final Counter rejectedCounter;
          private final Counter throwableCounter;
          private final LongGauge threadPoolLevelGauge;
      
          /**
           * 定时任务服务
           */
          private final ScheduledExecutorService scheduler;
          /**
           * 执行子任务线程池
           */
          private final ThreadPoolExecutor executor;
          /**
           * 子任务执行超时时间
           */
          private final long timeoutMillis;
          /**
           * 子任务
           */
          private final Runnable task;
          /**
           * 当前任子务执行频率
           */
          private final AtomicLong delay;
          /**
           * 最大子任务执行频率
           *
           * 子任务执行超时情况下使用
           */
          private final long maxDelay;
      
          public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor,
                                     int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {
              this.scheduler = scheduler;
              this.executor = executor;
              this.timeoutMillis = timeUnit.toMillis(timeout);
              this.task = task;
              this.delay = new AtomicLong(timeoutMillis);
              this.maxDelay = timeoutMillis * expBackOffBound;
      
              // Initialize the counters and register.
              timeoutCounter = Monitors.newCounter("timeouts");
              rejectedCounter = Monitors.newCounter("rejectedExecutions");
              throwableCounter = Monitors.newCounter("throwables");
              threadPoolLevelGauge = new LongGauge(MonitorConfig.builder("threadPoolUsed").build());
              Monitors.registerObject(name, this);
          }
      
      }
      
      • scheduler ,定时任务服务,用于定时【发起】子任务。

      • executor ,执行子任务线程池,用于【提交】子任务执行。

      • task ,子任务。

      • timeoutMillis ,子任务执行超时时间,单位:毫秒。

      • delay ,当前子任务执行频率,单位:毫秒。值等于  timeout 参数。

      • maxDelay ,最大子任务执行频率,单位:毫秒。值等于  timeout * expBackOffBound 参数。

      • scheduler 初始化延迟执行 TimedSupervisorTask 。

      • TimedSupervisorTask 执行时,提交 `task` 到 `executor` 执行任务。
        • 当  task 执行正常,TimedSupervisorTask 再次提交自己 scheduler 延迟  timeoutMillis 执行。

        • 当  task 执行超时,重新计算延迟时间( 不允许超过  maxDelay ),再次提交自己 scheduler 延迟执行。

        实现代码如下:

        
        // TimedSupervisorTask.java
          1: @Override
          2: public void run() {
          3:     Future? future = null;
          4:     try {
          5:         // 提交 任务
          6:         future = executor.submit(task);
          7:         //
          8:         threadPoolLevelGauge.set((long) executor.getActiveCount());
          9:         // 等待任务 执行完成 或 超时
         10:         future.get(timeoutMillis, TimeUnit.MILLISECONDS);  // block until done or timeout
         11:         // 设置 下一次任务执行频率
         12:         delay.set(timeoutMillis);
         13:         //
         14:         threadPoolLevelGauge.set((long) executor.getActiveCount());
         15:     } catch (TimeoutException e) {
         16:         logger.error("task supervisor timed out", e);
         17:         timeoutCounter.increment(); //
         18: 
         19:         // 设置 下一次任务执行频率
         20:         long currentDelay = delay.get();
         21:         long newDelay = Math.min(maxDelay, currentDelay * 2);
         22:         delay.compareAndSet(currentDelay, newDelay);
         23: 
         24:     } catch (RejectedExecutionException e) {
         25:         if (executor.isShutdown() || scheduler.isShutdown()) {
         26:             logger.warn("task supervisor shutting down, reject the task", e);
         27:         } else {
         28:             logger.error("task supervisor rejected the task", e);
         29:         }
         30: 
         31:         rejectedCounter.increment(); //
         32:     } catch (Throwable e) {
         33:         if (executor.isShutdown() || scheduler.isShutdown()) {
         34:             logger.warn("task supervisor shutting down, can't accept the task");
         35:         } else {
         36:             logger.error("task supervisor threw an exception", e);
         37:         }
         38: 
         39:         throwableCounter.increment(); //
         40:     } finally {
         41:         // 取消 未完成的任务
         42:         if (future != null) {
         43:             future.cancel(true);
         44:         }
         45: 
         46:         // 调度 下次任务
         47:         if (!scheduler.isShutdown()) {
         48:             scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
         49:         }
         50:     }
         51: }
        
        • 第 5 至 6 行 :提交子任务  task 到执行子任务线程池  executor 。
        • 第 9 至 10 行 :等待子任务  task 执行完成或执行超时。
        • 第 11 至 12 行 :子任务  task 执行完成,设置下一次执行延迟  delay 。
        • 第 19 至 22 行 :子任务  task 执行超时,重新计算下一次执行延迟  delay 。计算公式为  Math.min(maxDelay, currentDelay * 2) 。如果多次超时,超时时间不断乘以 2 ,不允许超过最大延迟时间(  maxDelay )。
        • 第 41 至 44 行 :强制取消未完成的子任务。
        • 第 46 至 49 行 :调度下一次 TimedSupervisorTask 。

        3. Eureka-Server 接收续租

        3.1 接收续租请求

        com.netflix.eureka.resources.InstanceResource,处理单个应用实例信息的请求操作的 Resource ( Controller )。

        续租应用实例信息的请求,映射  InstanceResource#renewLease() 方法,实现代码如下:

        
          1: @PUT
          2: public Response renewLease(
          3:         @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
          4:         @QueryParam("overriddenstatus") String overriddenStatus,
          5:         @QueryParam("status") String status,
          6:         @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
          7:     boolean isFromReplicaNode = "true".equals(isReplication);
          8:     // 续租
          9:     boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
         10: 
         11:     // 续租失败
         12:     // Not found in the registry, immediately ask for a register
         13:     if (!isSuccess) {
         14:         logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
         15:         return Response.status(Status.NOT_FOUND).build();
         16:     }
         17: 
         18:     // 比较 InstanceInfo 的 lastDirtyTimestamp 属性
         19:     // Check if we need to sync based on dirty time stamp, the client
         20:     // instance might have changed some value
         21:     Response response = null;
         22:     if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
         23:         response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
         24:         // Store the overridden status since the validation found out the node that replicates wins
         25:         if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
         26:                 && (overriddenStatus != null)
         27:                 && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
         28:                 && isFromReplicaNode) {
         29:             registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
         30:         }
         31:     } else { // 成功
         32:         response = Response.ok().build();
         33:     }
         34:     logger.debug("Found (Renew): {} - {}; reply status={}" + app.getName(), id, response.getStatus());
         35:     return response;
         36: }
        
      • 第 8 至 9 行 :调用 `PeerAwareInstanceRegistryImpl#renew(...)` 方法,续租。实现代码如下:
        // PeerAwareInstanceRegistryImpl.java
        public boolean renew(final String appName, final String id, final boolean isReplication) {
           if (super.renew(appName, id, isReplication)) { // 续租
               // Eureka-Server 复制
               replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
               return true;
           }
           return false;
        }
        
          - 调用父类 `AbstractInstanceRegistry#renew(...)` 方法,注册应用实例信息。

          第 11 至 16 行 :续租失败,返回 404 响应。当 Eureka-Client 收到 404 响应后,会重新发起 InstanceInfo 的注册。

          第 18 至 30 行 :比较请求的  lastDirtyTimestamp 和 Server 的 InstanceInfo 的  lastDirtyTimestamp 属性差异,需要配置  eureka.syncWhenTimestampDiffers = true ( 默认开启 )。

          • 第 7 至 11 行 :请求的  lastDirtyTimestamp 较大,意味着请求方( 可能是 Eureka-Client ,也可能是 Eureka-Server 集群内的其他 Server )存在 InstanceInfo 和 Eureka-Server 的 InstanceInfo 的数据不一致,返回 404 响应。请求方收到 404 响应后重新发起注册
          • 第 16 至 21 行 :《Eureka 源码解析 —— Eureka-Server 集群同步》 有详细解析。
          • 第 22 至 24 行 :Server 的  lastDirtyTimestamp 较大,并且请求方为 Eureka-Client,续租成功,返回 200 成功响应。
          • 第 29 行 : lastDirtyTimestamp 一致,返回 200 成功响应。
          • 第 23 行 :调用 `#validateDirtyTimestamp(...)` 方法,比较 `lastDirtyTimestamp` 的差异。实现代码如下:
            // InstanceResource.java
             1: private Response validateDirtyTimestamp(Long lastDirtyTimestamp, boolean isReplication) {
             2:     // 获取 InstanceInfo
             3:     InstanceInfo appInfo = registry.getInstanceByAppAndId(app.getName(), id, false);
             4:     if (appInfo != null) {
             5:         if ((lastDirtyTimestamp != null) && (!lastDirtyTimestamp.equals(appInfo.getLastDirtyTimestamp()))) {
             6:             Object[] args = {id, appInfo.getLastDirtyTimestamp(), lastDirtyTimestamp, isReplication};
             7:             // 请求 的 较大
             8:             if (lastDirtyTimestamp  appInfo.getLastDirtyTimestamp()) {
             9:                 logger.debug("Time to sync, since the last dirty timestamp differs -"
            10:                                 + " ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}", args);
            11:                 return Response.status(Status.NOT_FOUND).build();
            12:             // Server 的 较大
            13:             } else if (appInfo.getLastDirtyTimestamp()  lastDirtyTimestamp) {
            14:                 // In the case of replication, send the current instance info in the registry for the
            15:                 // replicating node to sync itself with this one.
            16:                 if (isReplication) {
            17:                     logger.debug(
            18:                             "Time to sync, since the last dirty timestamp differs -"
            19:                                     + " ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}",
            20:                             args);
            21:                     return Response.status(Status.CONFLICT).entity(appInfo).build();
            22:                 } else {
            23:                     return Response.ok().build();
            24:                 }
            25:             }
            26:         }
            27: 
            28:     }
            29:     return Response.ok().build();
            30: }   
            

          第 24 至 30 行 :《Eureka 源码解析 —— Eureka-Server 集群同步》 有详细解析。

          第 31 至 33 行 :续租成功,返回 200 成功响应。

          3.2 续租应用实例信息

          调用  AbstractInstanceRegistry#renew(...) 方法,续租应用实例信息,实现代码如下:

          
            1: public boolean renew(String appName, String id, boolean isReplication) {
            2:     // 增加 续租次数 到 监控
            3:     RENEW.increment(isReplication);
            4:     // 获得 租约
            5:     MapString, LeaseInstanceInfo gMap = registry.get(appName);
            6:     LeaseInstanceInfo leaseToRenew = null;
            7:     if (gMap != null) {
            8:         leaseToRenew = gMap.get(id);
            9:     }
           10:     // 租约不存在
           11:     if (leaseToRenew == null) {
           12:         RENEW_NOT_FOUND.increment(isReplication);
           13:         logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
           14:         return false;
           15:     } else {
           16:         InstanceInfo instanceInfo = leaseToRenew.getHolder();
           17:         if (instanceInfo != null) {
           18:             // touchASGCache(instanceInfo.getASGName());
           19:             // override status
           20:             InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
           21:                     instanceInfo, leaseToRenew, isReplication);
           22:             if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
           23:                 logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
           24:                         + "; re-register required", instanceInfo.getId());
           25:                 RENEW_NOT_FOUND.increment(isReplication);
           26:                 return false;
           27:             }
           28:             if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
           29:                 Object[] args = {
           30:                         instanceInfo.getStatus().name(),
           31:                         instanceInfo.getOverriddenStatus().name(),
           32:                         instanceInfo.getId()
           33:                 };
           34:                 logger.info(
           35:                         "The instance status {} is different from overridden instance status {} for instance {}. "
           36:                                 + "Hence setting the status to overridden status", args);
           37:                 instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
           38:             }
           39:         }
           40:         // 新增 续租每分钟次数
           41:         renewsLastMin.increment();
           42:         // 设置 租约最后更新时间(续租)
           43:         leaseToRenew.renew();
           44:         return true;
           45:     }
           46: }
          
        • 第 2 至 3 行 :增加续租次数到监控。配合 Netflix Servo 实现监控信息采集。
        • 第 4 至 9 行 :获得租约( Lease )。
        • 第 10 至 14 行 :租约不存在,返回续租失败( `false` )。
        • 第 19 至 21 行 :获得应用实例的**最终状态**。在《应用实例注册发现 (八)之覆盖状态》详细解析。
        • 第 22 至 27 行 :应用实例的**最终状态**为 `UNKNOWN`,无法续约,返回 `false` 。在《应用实例注册发现 (八)之覆盖状态》详细解析。
        • 第 28 至 37 行 :应用实例的状态与**最终状态**不相等,使用**最终状态**覆盖应用实例的状态。在《应用实例注册发现 (八)之覆盖状态》详细解析。
        • 第 40 至 41 行 :新增续租每分钟次数( `renewsLastMin` )。`com.netflix.eureka.util.MeasuredRate`,速度测量类,实现代码如下:
          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20
          21
          22
          23
          24
          25
          26
          27
          28
          29
          30
          31
          32
          33
          34
          35
          36
          37
          38
          39
          40
          41
          42
          43
          44
          45
          46
          47
          48
          49
          50
          51
          52
          53
          54
          55
          56
          57
          58
          59
          60
          61
          62
          63
          64
          65
          66
          67
          68
          69
          70
          71
          72
          
          // AbstractInstanceRegistry.java
          /**
           * 续租每分钟次数
           */
          private final MeasuredRate renewsLastMin;
           
          // MeasuredRate.java
          public class MeasuredRate {
              /**
               * 上一个间隔次数
               */
              private final AtomicLong lastBucket = new AtomicLong(0);
              /**
               * 当前间隔次数
               */
              private final AtomicLong currentBucket = new AtomicLong(0);
              /**
               * 间隔
               */
              private final long sampleInterval;
              /**
               * 定时器
               */
              private final Timer timer;
          private volatile boolean isActive;
           
          public MeasuredRate(long sampleInterval) {
              this.sampleInterval = sampleInterval;
              this.timer = new Timer("Eureka-MeasureRateTimer", true);
              this.isActive = false;
          }
           
          public synchronized void start() {
              if (!isActive) {
                  timer.schedule(new TimerTask() {
           
                      @Override
                      public void run() {
                          try {
                              // Zero out the current bucket.
                              lastBucket.set(currentBucket.getAndSet(0));
                          } catch (Throwable e) {
                              logger.error("Cannot reset the Measured Rate", e);
                          }
                      }
                  }, sampleInterval, sampleInterval);
           
                  isActive = true;
              }
          }
           
          public synchronized void stop() {
              if (isActive) {
                  timer.cancel();
                  isActive = false;
              }
          }
           
          /**
           * Returns the count in the last sample interval.
           */
          public long getCount() {
              return lastBucket.get();
          }
           
          /**
           * Increments the count in the current sample interval.
           */
          public void increment() {
              currentBucket.incrementAndGet();
          }
          }  

          // MeasuredRate.java
          public class MeasuredRate {
             /**
              * 上一个间隔次数
              /
             private final AtomicLong lastBucket = new AtomicLong(0);
             /*

              * 当前间隔次数
              /
             private final AtomicLong currentBucket = new AtomicLong(0);
             /*

              * 间隔
              /
             private final long sampleInterval;
             /*

              * 定时器
              */
             private final Timer timer;
          private volatile boolean isActive;

          public MeasuredRate(long sampleInterval) {
             this.sampleInterval = sampleInterval;
             this.timer = new Timer(“Eureka-MeasureRateTimer”, true);
             this.isActive = false;
          }

          public synchronized void start() {
             if (!isActive) {
                 timer.schedule(new TimerTask() {

                     @Override
                     public void run() {
                         try {
                             // Zero out the current bucket.
                             lastBucket.set(currentBucket.getAndSet(0));
                         } catch (Throwable e) {
                             logger.error(“Cannot reset the Measured Rate”, e);
                         }
                     }
                 }, sampleInterval, sampleInterval);

                 isActive = true;
             }
          }

          public synchronized void stop() {
             if (isActive) {
                 timer.cancel();
                 isActive = false;
             }
          }

          /**

          • Returns the count in the last sample interval.
          • /
            public long getCount() {
               return lastBucket.get();
            }

          /**

          • Increments the count in the current sample interval.
          • /
            public void increment() {
               currentBucket.incrementAndGet();
            }
            }  

            - 配合 Netflix Servo 实现监控信息采集**续租每分钟次数**。 - Eureka-Server 运维界面的显示**续租每分钟次数**。 - 自我保护机制,在 《Eureka 源码解析 —— 应用实例注册发现 (四)之自我保护机制》 详细解析。 - `timer` ,定时器,负责每个 `sampleInterval` 间隔重置**当前次数**( `currentBucket` ),并将**原当前次数**设置到**上一个次数**( `lastBucket` )。 - `#increment()` 方法,返回**当前次数**( `currentBucket` )。 - `#getCount()` 方法,返回**上一个次数**( `lastBucket` )。 - `renewsLastMin` 有如下用途:

            第 4 至 9 行 :获得租约( Lease )。

            第 19 至 21 行 :获得应用实例的最终状态。在《应用实例注册发现 (八)之覆盖状态》详细解析。

            第 28 至 37 行 :应用实例的状态与最终状态不相等,使用最终状态覆盖应用实例的状态。在《应用实例注册发现 (八)之覆盖状态》详细解析。

            // AbstractInstanceRegistry.java
            /**

            • 续租每分钟次数
            • /
              private final MeasuredRate renewsLastMin;

            // MeasuredRate.java
            public class MeasuredRate {
               /**
                * 上一个间隔次数
                /
               private final AtomicLong lastBucket = new AtomicLong(0);
               /*

                * 当前间隔次数
                /
               private final AtomicLong currentBucket = new AtomicLong(0);
               /*

                * 间隔
                /
               private final long sampleInterval;
               /*

                * 定时器
                */
               private final Timer timer;
            private volatile boolean isActive;

            public MeasuredRate(long sampleInterval) {
               this.sampleInterval = sampleInterval;
               this.timer = new Timer(“Eureka-MeasureRateTimer”, true);
               this.isActive = false;
            }

            public synchronized void start() {
               if (!isActive) {
                   timer.schedule(new TimerTask() {

                       @Override
                       public void run() {
                           try {
                               // Zero out the current bucket.
                               lastBucket.set(currentBucket.getAndSet(0));
                           } catch (Throwable e) {
                               logger.error(“Cannot reset the Measured Rate”, e);
                           }
                       }
                   }, sampleInterval, sampleInterval);

                   isActive = true;
               }
            }

            public synchronized void stop() {
               if (isActive) {
                   timer.cancel();
                   isActive = false;
               }
            }

            /**

            • Returns the count in the last sample interval.
            • /
              public long getCount() {
                 return lastBucket.get();
              }

            /**

            • Increments the count in the current sample interval.
            • /
              public void increment() {
                 currentBucket.incrementAndGet();
              }
              }  

            第 42 至 43 行 :调用  Lease#renew() 方法,设置租约最后更新时间( 续租 ),实现代码如下:

            123
            public void renew() {    lastUpdateTimestamp = System.currentTimeMillis() + duration;}

            public void renew() {
               lastUpdateTimestamp = System.currentTimeMillis() + duration;
            }

            第 44 行 :返回续租成功(  true )。

            整个过程修改的租约的过期时间,即使并发请求,也不会对数据的一致性产生不一致的影响,因此像注册操作一样加锁

            666. 彩蛋

            效率比想象的低一些,加油继续更新下一篇。

            胖友,分享我的公众号( 芋道源码 ) 给你的胖友可好?

  • 本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

    转载声明:转载请注明出处,本技术博客是本人原创文章

    本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

    原文链接:blog.ouyangsihai.cn >> 注册中心 Eureka 源码解析 —— 应用实例注册发现 (二)之续租