本文基于 Elastic-Job V2.1.5 版本分享

  • 1. 概述

  • 2. ReconcileService


1. 概述

本文主要分享 Elastic-Job-Lite 自诊断修复

在分布式的场景下由于网络、时钟等原因,可能导致 Zookeeper 的数据与真实运行的作业产生不一致,这种不一致通过正向的校验无法完全避免。需要另外启动一个线程定时校验注册中心数据与真实作业状态的一致性,即维持 Elastic-Job 的最终一致性

涉及到主要类的类图如下( 打开大图 ):

  • 在 Elastic-Job-lite 里,调解分布式作业不一致状态服务( ReconcileService ) 实现了自诊断修复功能。

你行好事会因为得到赞赏而愉悦 
同理,开源项目贡献者会因为 Star 而更加有动力 
为 Elastic-Job 点赞!传送门

2. ReconcileService

ReconcileService,调解分布式作业不一致状态服务。

ReconcileService 继承 Google Guava AbstractScheduledService 抽象类,实现 #scheduler()#runOneIteration() 方法,达到周期性校验注册中心数据与真实作业状态的一致性。

#scheduler() 方法实现如下

// ReconcileService.java
@Override
protected Scheduler scheduler() {
   return Scheduler.newFixedDelaySchedule(01, TimeUnit.MINUTES);
}
  • 每 1 分钟会调用一次 #runOneIteration() 方法进行校验。

  • Google Guava AbstractScheduledService 相关的知识,有兴趣的同学可以自己 Google 学习哟。

#runOneIteration() 方法实现如下

// ReconcileService.java
@Override
protected void runOneIteration() throws Exception {
   LiteJobConfiguration config = configService.load(true);
   int reconcileIntervalMinutes = null == config ? -1 : config.getReconcileIntervalMinutes();
   if (reconcileIntervalMinutes > 0 && (System.currentTimeMillis() - lastReconcileTime >= reconcileIntervalMinutes * 60 * 1000)) { // 校验是否达到校验周期
       // 设置最后校验时间
       lastReconcileTime = System.currentTimeMillis();
       if (leaderService.isLeaderUntilBlock() // 主作业节点才可以执行
               && !shardingService.isNeedSharding() // 当前作业不需要重新分片
               && shardingService.hasShardingInfoInOfflineServers()) { // 查询是包含有分片节点的不在线服务器
           log.warn("Elastic Job: job status node has inconsistent value,start reconciling...");
           // 设置需要重新分片的标记
           shardingService.setReshardingFlag();
       }
   }
}
  • 通过作业配置,设置修复作业服务器不一致状态服务调度间隔时间属性( LiteJobConfiguration.reconcileIntervalMinutes )。

  • 调用 ShardingService#setReshardingFlag() 方法,设置需要重新分片的标记。这个也是 ReconcileService 最本质的行为,有了这个标记后,作业会重新进行分片,达到作业节点本地分片数据与 Zookeeper 数据一致。作业分片逻辑,在《Elastic-Job-Lite 源码分析 —— 作业分片》有详细解析。

  • 调解分布式作业不一致状态服务一共有三个条件:

    • 调用 LeaderService#isLeaderUntilBlock() 方法,判断当前作业节点是否为主节点。在《Elastic-Job-Lite 源码分析 —— 主节点选举》有详细解析。

    • 调用 ShardingService#isNeedSharding() 方法,判断当前作业是否需要重分片。如果需要重新分片,就不要重复设置当前作业需要重新分片的标记。

    • 调用 ShardingService#hasShardingInfoInOfflineServers() 方法,查询是否包含有分片节点的不在线服务器。永久数据节点 /${JOB_NAME}/sharding/${ITEM_INDEX}/instance存储分配的作业节点主键( ${JOB_INSTANCE_ID} ), 不会随着作业节点因为各种原因断开后会话超时移除,而临时数据节点/${JOB_NAME}/instances/${JOB_INSTANCE_ID} 随着作业节点因为各种原因断开后超时会话超时移除。当查询到包含有分片节点的不在线的作业节点,设置需要重新分片的标记后进行重新分片,将其持有的作业分片分配给其它在线的作业节点。

      // ShardingService.java
       /**
       * 查询是包含有分片节点的不在线服务器.
       * 
       * @return 是包含有分片节点的不在线服务器
       */

      public boolean hasShardingInfoInOfflineServers() {
          List<String> onlineInstances = jobNodeStorage.getJobNodeChildrenKeys(InstanceNode.ROOT); // "/¨E123EJOB¨E95ENAME¨E125E/instances/<annotation encoding="application style="color: rgb(128, 128, 128);overflow-wrap: inherit !important;word-break: inherit !important;" span="" class="hljs-comment" encoding=""application"><span class="katex-html" aria-hidden="true" style="overflow-wrap: inherit !important;word-break: inherit !important;"><span class="strut" style="height:1em;vertical-align:-0.25em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">¨<span class="mord mathit" style="margin-right:0.05764em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">E123<span class="mord mathit" style="margin-right:0.05764em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">E<span class="mord mathit" style="margin-right:0.09618em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">J<span class="mord mathit" style="margin-right:0.02778em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">O<span class="mord mathit" style="margin-right:0.05017em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">B¨<span class="mord mathit" style="margin-right:0.05764em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">E95<span class="mord mathit" style="margin-right:0.05764em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">E<span class="mord mathit" style="margin-right:0.10903em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">N<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">A<span class="mord mathit" style="margin-right:0.10903em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">M<span class="mord mathit" style="margin-right:0.05764em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">E¨<span class="mord mathit" style="margin-right:0.05764em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">E125<span class="mord mathit" style="margin-right:0.05764em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">E/<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">i<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">n<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">s<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">t<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">a<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">n<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">c<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">e<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">s/{JOB_INSTANCE_ID}"
          int shardingTotalCount = configService.load(true).getTypeConfig().getCoreConfig().getShardingTotalCount();
          for (int i = 0; i < shardingTotalCount; i++) {
              if (!onlineInstances.contains(jobNodeStorage.getJobNodeData(ShardingNode.getInstanceNode(i)))) { // "/¨E123EJOB¨E95ENAME¨E125E/sharding/<annotation encoding="application style="color: rgb(128, 128, 128);overflow-wrap: inherit !important;word-break: inherit !important;" span="" class="hljs-comment" encoding=""application"><span class="katex-html" aria-hidden="true" style="overflow-wrap: inherit !important;word-break: inherit !important;"><span class="strut" style="height:1em;vertical-align:-0.25em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">¨<span class="mord mathit" style="margin-right:0.05764em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">E123<span class="mord mathit" style="margin-right:0.05764em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">E<span class="mord mathit" style="margin-right:0.09618em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">J<span class="mord mathit" style="margin-right:0.02778em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">O<span class="mord mathit" style="margin-right:0.05017em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">B¨<span class="mord mathit" style="margin-right:0.05764em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">E95<span class="mord mathit" style="margin-right:0.05764em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">E<span class="mord mathit" style="margin-right:0.10903em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">N<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">A<span class="mord mathit" style="margin-right:0.10903em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">M<span class="mord mathit" style="margin-right:0.05764em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">E¨<span class="mord mathit" style="margin-right:0.05764em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">E125<span class="mord mathit" style="margin-right:0.05764em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">E/<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">s<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">h<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">a<span class="mord mathit" style="margin-right:0.02778em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">r<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">d<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">i<span class="mord mathit" style="overflow-wrap: inherit !important;word-break: inherit !important;">n<span class="mord mathit" style="margin-right:0.03588em;" style="overflow-wrap: inherit !important;word-break: inherit !important;">g/{ITEM_INDEX}/instance"
                  return true;
              }
          }
          return false;
      }</span class="mord mathit" style="margin-right:0.03588em;"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit" style="margin-right:0.02778em;"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit" style="margin-right:0.05764em;"></span class="mord mathit" style="margin-right:0.05764em;"></span class="mord mathit" style="margin-right:0.05764em;"></span class="mord mathit" style="margin-right:0.10903em;"></span class="mord mathit"></span class="mord mathit" style="margin-right:0.10903em;"></span class="mord mathit" style="margin-right:0.05764em;"></span class="mord mathit" style="margin-right:0.05764em;"></span class="mord mathit" style="margin-right:0.05017em;"></span class="mord mathit" style="margin-right:0.02778em;"></span class="mord mathit" style="margin-right:0.09618em;"></span class="mord mathit" style="margin-right:0.05764em;"></span class="mord mathit" style="margin-right:0.05764em;"></span class="strut" style="height:1em;vertical-align:-0.25em;"></span class="katex-html" aria-hidden="true"></annotation encoding="application></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit"></span class="mord mathit" style="margin-right:0.05764em;"></span class="mord mathit" style="margin-right:0.05764em;"></span class="mord mathit" style="margin-right:0.05764em;"></span class="mord mathit" style="margin-right:0.10903em;"></span class="mord mathit"></span class="mord mathit" style="margin-right:0.10903em;"></span class="mord mathit" style="margin-right:0.05764em;"></span class="mord mathit" style="margin-right:0.05764em;"></span class="mord mathit" style="margin-right:0.05017em;"></span class="mord mathit" style="margin-right:0.02778em;"></span class="mord mathit" style="margin-right:0.09618em;"></span class="mord mathit" style="margin-right:0.05764em;"></span class="mord mathit" style="margin-right:0.05764em;"></span class="strut" style="height:1em;vertical-align:-0.25em;"></span class="katex-html" aria-hidden="true"></annotation encoding="application>




©著作权归作者所有:来自51CTO博客作者mb5ff80520dfa04的原创作品,如需转载,请注明出处,否则将追究法律责任

更多相关文章

  1. LeetCode 图解 | 237.删除链表中的节点
  2. 动画:面试必刷之二叉树搜索第 K 大节点
  3. SSH AKS集群节点的几种方法(一)
  4. 动画:面试算法之求二叉树的下一节点
  5. php+nodeJs+thrift协议,实现zookeeper节点数据自动发现
  6. jQuery的DOM操作实例(3)——创建节点&&编写一个弹窗
  7. jQuery编程基础精华02(属性、表单过滤器,元素的each,表单选择器,子元
  8. 如何在java脚本中获取节点内部文本?
  9. Study JQuery《zTree自动点击第一个节点》

随机推荐

  1. 正式开源的优雅测试框架 PestPHP
  2. PHP魔术方法之__call和__callStatic详解(
  3. PHP面向对象之3种数据访问方式详解(代码实
  4. PHP时间戳和日期格式相互转换
  5. php面向对象之析构函数和对象引用
  6. PHP中面向对象之继承
  7. EasySwoole 基础入门
  8. php实现cookie即时生效
  9. CGI,FastCGI,PHP-CGI,PHP-FPM 简单了解
  10. php session垃圾回收机制