注册中心 Eureka 源码解析 —— 任务批处理
1. 概述
本文主要分享 任务批处理。Eureka-Server 集群通过任务批处理同步应用实例注册实例,所以本文也是为 Eureka-Server 集群同步的分享做铺垫。
本文涉及类在 com.netflix.eureka.util.batcher
包下,涉及到主体类的类图如下( 打开大图 ):
紫色部分 —— 任务分发器
蓝色部分 —— 任务接收器
红色部分 —— 任务执行器
绿色部分 —— 任务处理器
黄色部分 —— 任务持有者( 任务 )
推荐 Spring Cloud 书籍:
请支持正版。下载盗版,等于主动编写低级 BUG 。
程序猿DD —— 《Spring Cloud微服务实战》
周立 —— 《Spring Cloud与Docker微服务架构实战》
两书齐买,京东包邮。
推荐 Spring Cloud 视频:
Java 微服务实践 - Spring Boot
Java 微服务实践 - Spring Cloud
Java 微服务实践 - Spring Boot / Spring Cloud
2. 整体流程
任务执行的整体流程如下( 打开大图 ):
细箭头 —— 任务执行经历的操作
粗箭头 —— 任务队列流转的方向
不同于一般情况下,任务提交了立即同步或异步执行,任务的执行拆分了三层队列:
蓝线:分发器在收到任务执行请求后,提交到接收队列,任务实际未执行。
黄线:执行器的工作线程处理任务失败,将符合条件( 见 「3. 任务处理器」 )的失败任务提交到重新执行队列。
第一层,接收队列(
acceptorQueue
),重新处理队列(reprocessQueue
)。
第二层,待执行队列(
processingOrder
)
* 粉线:接收线程( Runner )将重新执行队列,接收队列提交到待执行队列。第三层,工作队列(
workQueue
)
* 粉线:接收线程( Runner )将待执行队列的任务根据参数(maxBatchingSize
)将任务合并成批量任务,调度( 提交 )到工作队列。
* 黄线:执行器的工作线程池,一个工作线程可以拉取一个批量任务进行执行。三层队列的好处:
接收队列,避免处理任务的阻塞等待。
接收线程( Runner )合并任务,将相同任务编号( 是的,任务是带有编号的 )的任务合并,只执行一次。
Eureka-Server 为集群同步提供批量操作多个应用实例的接口,一个批量任务可以一次调度接口完成,避免多次调用的开销。当然,这样做的前提是合并任务,这也导致 Eureka-Server 集群之间对应用实例的注册和下线带来更大的延迟。毕竟,Eureka 是在 CAP 之间,选择了 AP。
3. 任务处理器
com.netflix.eureka.util.batcher.TaskProcessor
,任务处理器接口。接口代码如下:
// ... 省略代码,超过微信文章上限
ProcessingResult ,处理任务结果。
`Success` ,成功。
`Congestion` ,拥挤错误,任务将会被重试。例如,请求被限流。
`TransientError` ,瞬时错误,任务将会被重试。例如,网络请求超时。
`PermanentError` ,永久错误,任务将会被丢弃。例如,执行时发生程序异常。
#process(task)
方法,处理单任务。#process(tasks)
方法,处理批量任务。
4. 创建任务分发器
com.netflix.eureka.util.batcher.TaskDispatcher
,任务分发器接口。接口代码如下:
// ... 省略代码,超过微信文章上限
#process(…)
方法,提交任务编号,任务,任务过期时间给任务分发器处理。
com.netflix.eureka.util.batcher.TaskDispatchers
,任务分发器工厂类,用于创建任务分发器。其内部提供两种任务分发器的实现:
批量任务执行的分发器,用于 Eureka-Server 集群注册信息的同步任务。
单任务执行的分发器,用于 Eureka-Server 向亚马逊 AWS 的 ASG ( Autoscaling Group ) 同步状态。虽然本系列暂时对 AWS 相关的不做解析,从工具类的角度来说,本文会对该分发器进行分享。
com.netflix.eureka.cluster.ReplicationTaskProcessor
,实现 TaskDispatcher ,Eureka-Server 集群任务处理器。感兴趣的同学,可以点击链接自己研究,我们将在 《Eureka 源码解析 —— Eureka-Server 集群同步》 有详细解析。
4.1 批量任务执行分发器
调用 TaskDispatchers#createBatchingTaskDispatcher(...)
方法,创建批量任务执行的分发器,实现代码如下:
// TaskDispatchers.java
1: /**
2: * 创建批量任务执行的分发器
3: *
4: * @param id 任务执行器编号
5: * @param maxBufferSize 待执行队列最大数量
6: * @param workloadSize 单个批量任务包含任务最大数量
7: * @param workerCount 任务执行器工作线程数
8: * @param maxBatchingDelay 批量任务等待最大延迟时长,单位:毫秒
9: * @param congestionRetryDelayMs 请求限流延迟重试时间,单位:毫秒
10: * @param networkFailureRetryMs 网络失败延迟重试时长,单位:毫秒
11: * @param taskProcessor 任务处理器
12: * @param <ID> 任务编号泛型
13: * @param <T> 任务泛型
14: * @return 批量任务执行的分发器
15: */
// ... 省略代码,超过微信文章上限
第 1 至 23 行 :方法参数。比较多哈,请耐心理解。
`workloadSize` 参数,单个批量任务包含任务最大数量。
`taskProcessor` 参数,自定义任务执行器实现。
第 24 至 27 行 :创建任务接收执行器。在 「5. 创建任务接收器」 详细解析。
第 28 至 29 行 :创建批量任务执行器。在 「6.1 创建批量任务执行器」 详细解析。
第 30 至 42 行 :创建批量任务分发器。
第 32 至 35 行 :`#process()` 方法的实现,调用 `AcceptorExecutor#process(…)` 方法,提交 [ 任务编号 , 任务 , 任务过期时间 ] 给任务分发器处理。
4.2 单任务执行分发器
调用 TaskDispatchers#createNonBatchingTaskDispatcher(...)
方法,创建单任务执行的分发器,实现代码如下:
1: /**
2: * 创建单任务执行的分发器
3: *
4: * @param id 任务执行器编号
5: * @param maxBufferSize 待执行队列最大数量
6: * @param workerCount 任务执行器工作线程数
7: * @param maxBatchingDelay 批量任务等待最大延迟时长,单位:毫秒
8: * @param congestionRetryDelayMs 请求限流延迟重试时间,单位:毫秒
9: * @param networkFailureRetryMs 网络失败延迟重试时长,单位:毫秒
10: * @param taskProcessor 任务处理器
11: * @param <ID> 任务编号泛型
12: * @param <T> 任务泛型
13: * @return 单任务执行的分发器
14: */
15: public static <ID, T> TaskDispatcher<ID, T> createNonBatchingTaskDispatcher(String id,
16: int maxBufferSize,
17: int workerCount,
18: long maxBatchingDelay,
19: long congestionRetryDelayMs,
20: long networkFailureRetryMs,
21: TaskProcessor<T> taskProcessor) {
22: // 创建 任务接收执行器
23: final AcceptorExecutor<ID, T> acceptorExecutor = new AcceptorExecutor<>(
24: id, maxBufferSize, /* workloadSize = 1 */1, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs
25: );
26: final TaskExecutors<ID, T> taskExecutor = TaskExecutors.singleItemExecutors(id, workerCount, taskProcessor, acceptorExecutor);
27: return new TaskDispatcher<ID, T>() {
28: @Override
29: public void process(ID id, T task, long expiryTime) {
30: acceptorExecutor.process(id, task, expiryTime);
31: }
32:
33: @Override
34: public void shutdown() {
35: acceptorExecutor.shutdown();
36: taskExecutor.shutdown();
37: }
38: };
39: }
第 1 至 21 行 :方法参数。比较多哈,请耐心理解。
`workloadSize` 参数,相比 `#createBatchingTaskDispatcher(…)` 少这个参数。在第 24 行,你会发现该参数传递给 AcceptorExecutor 使用 1 噢。
`taskProcessor` 参数,自定义任务执行器实现。
第 21 至 25 行 :创建任务接收执行器。和
#createBatchingTaskDispatcher(…)
只差workloadSize = 1
参数。在 「5. 创建任务接收器」 详细解析。第 28 至 29 行 :创建单任务执行器。和 `#createBatchingTaskDispatcher(…)` 差别很大。「6.2 创建单任务执行器」 详细解析。
第 30 至 42 行 :创建单任务分发器。和
#createBatchingTaskDispatcher(…)
一样。
5. 创建任务接收执行器
com.netflix.eureka.util.batcher.AcceptorExecutor
,任务接收执行器。创建构造方法代码如下:
// ... 省略代码,超过微信文章上限
第 5 至 61 行 :属性。比较多哈,请耐心理解。
眼尖如你,会发现 AcceptorExecutor 即存在单任务工作队列( `singleItemWorkQueue` ),又存在批量任务工作队列( `batchWorkQueue` ) ,在 「9. 任务接收线程【调度任务】」 会解答这个疑惑。
第 78 至 79 行 :创建网络通信整形器。在 「7. 网络通信整形器」 详细解析。
第 81 至 85 行 :创建接收任务线程。
6. 创建任务执行器
com.netflix.eureka.util.batcher.TaskExecutors
,任务执行器。其内部提供创建单任务和批量任务执行器的两种方法。TaskExecutors 构造方法如下:
// ... 省略代码,超过微信文章上限
workerThreads
属性,工作线程池。工作任务队列会被工作线程池并发拉取,并发执行。com.netflix.eureka.util.batcher.TaskExecutors.WorkerRunnableFactory
,创建工作线程工厂接口。单任务和批量任务执行器的工作线程实现不同,通过自定义工厂实现类创建。
6.1 创建批量任务执行器
调用 TaskExecutors#batchExecutors(...)
方法,创建批量任务执行器。实现代码如下:
/**
* 创建批量任务执行器
*
* @param name 任务执行器名
* @param workerCount 任务执行器工作线程数
* @param processor 任务处理器
* @param acceptorExecutor 接收任务执行器
* @param <ID> 任务编号泛型
* @param <T> 任务泛型
* @return 批量任务执行器
*/
// ... 省略代码,超过微信文章上限
com.netflix.eureka.util.batcher.TaskExecutors.WorkerRunnable.BatchWorkerRunnable
,批量任务工作线程。
6.2 创建单任务执行器
调用 TaskExecutors#singleItemExecutors(...)
方法,创建批量任务执行器。实现代码如下:
/**
* 创建单任务执行器
*
* @param name 任务执行器名
* @param workerCount 任务执行器工作线程数
* @param processor 任务处理器
* @param acceptorExecutor 接收任务执行器
* @param <ID> 任务编号泛型
* @param <T> 任务泛型
* @return 单任务执行器
*/
// ... 省略代码,超过微信文章上限
com.netflix.eureka.util.batcher.TaskExecutors.WorkerRunnable.SingleTaskWorkerRunnable
,单任务工作线程。
6.3 工作线程抽象类
com.netflix.eureka.util.batcher.TaskExecutors.WorkerRunnable
,任务工作线程抽象类。BatchWorkerRunnable 和 SingleTaskWorkerRunnable 都实现该类,差异在 #run()
的自定义实现。WorkerRunnable 实现代码如下:
// ... 省略代码,超过微信文章上限
7. 网络通信整形器
com.netflix.eureka.util.batcher.TrafficShaper
,网络通信整形器。当任务执行发生请求限流,或是请求网络失败的情况,则延时 AcceptorRunner 将任务提交到工作任务队列,从而避免任务很快去执行,再次发生上述情况。TrafficShaper 实现代码如下:
// ... 省略代码,超过微信文章上限
#registerFailure(…)
,在任务执行失败时,提交任务结果给 TrafficShaper ,记录发生时间。在 「10. 任务执行器【执行任务】」 会看到调用该方法。#transmissionDelay(…)
,计算提交延迟,单位:毫秒。「9. 任务接收线程【调度任务】」 会看到调用该方法。
8. 任务接收执行器【处理任务】
调用 AcceptorExecutor#process(...)
方法,添加任务到接收任务队列。实现代码如下:
// AcceptorExecutor.java
// ... 省略代码,超过微信文章上限
com.netflix.eureka.util.batcher.TaskHolder
,任务持有者,实现代码如下:// ... 省略代码,超过微信文章上限
9. 任务接收线程【调度任务】
后台线程执行 AcceptorRunner#run(...)
方法,调度任务。实现代码如下:
// ... 省略代码,超过微信文章上限
第 4 行 :无限循环执行调度,直到关闭。
第 6 至 7 行 :调用
#drainInputQueues()
方法,循环处理完输入队列( 接收队列 + 重新执行队列 ),直到有待执行的任务。实现代码如下:// ... 省略代码,超过微信文章上限
第 4 行 :优先从重新执行任务的队尾拿较新的任务,从而实现保留更新的任务在待执行任务映射(
pendingTasks
) 里。第 12 行 :添加任务编号到待执行队列(
processingOrder
) 的头部。效果如下图:第 15 至 18 行 :如果待执行队列(
pendingTasks
)已满,清空重新执行队列(processingOrder
),放弃较早的任务。重新执行队列(
reprocessQueue
) 和接收队列(acceptorQueue
)为空待执行任务映射(
pendingTasks
)不为空第 2 行 && 第 18 行 :循环,直到同时满足如下全部条件:
第 3 至 4 行 :处理完重新执行队列(
reprocessQueue
)。实现代码如下:// ... 省略代码,超过微信文章上限
第 5 至 6 行 :处理完接收队列(
acceptorQueue
),实现代码如下:// ... 省略代码,超过微信文章上限
第 8 至 17 行 :当所有队列为空,阻塞从接收队列(
acceptorQueue
) 拉取任务 10 ms。若拉取到,添加到待执行队列(processingOrder
)。
第 12 至 16 行 :计算可调度任务的最小时间(
scheduleTime
)。当
scheduleTime
小于当前时间,不重新计算,即此时需要延迟等待调度。当
scheduleTime
大于等于当前时间,配合TrafficShaper#transmissionDelay(…)
重新计算。
第 19 行 :当
scheduleTime
小于当前时间,执行任务的调度。第 21 行 :调用
#assignBatchWork()
方法,调度批量任务。实现代码如下:// ... 省略代码,超过微信文章上限
x
注意,批量任务工作队列(
batchWorkQueue
) 和单任务工作队列(singleItemWorkQueue
) 是不同的队列。第 3 行 :调用
TaskDispatcher#requestWorkItems()
方法,发起请求信号量,并获得批量任务的工作队列。实现代码如下:// TaskDispatcher.java
// ... 省略代码,超过微信文章上限第 5 至 8 行 :循环获取一个批量任务,直到成功。
x
你会发现,本文说了半天的批量任务,实际是
List<taskholder></taskholder
哈。第 4 行 :获取批量任务工作请求信号量(
batchWorkRequests
) 。在任务执行器的批量任务执行器,每次执行时,发出batchWorkRequests
。每一个信号量需要保证获取到一个批量任务。第 19 至 20 行 :未调度到批量任务,释放请求信号量,代表请求实际未完成,每一个信号量需要保证获取到一个批量任务。
第 21 至 24 行 :添加批量任务到批量任务工作队列。
第 23 行 :调用
#assignSingleItemWork()
方法,调度单任务。x
第 2 行 :调用
#hasEnoughTasksForNextBatch()
方法,判断是否有足够任务进行下一次批量任务调度:1)待执行任务(processingOrder
)映射已满;或者 2)到达批量任务处理最大等待延迟。实现代码如下:// ... 省略代码,超过微信文章上限
第 5 至 17 行 :获取批量任务(
holders
)。第 23 行 :调用
#assignSingleItemWork()
方法,调度单任务,和#assignBatchWork()
方法类似。实现代码如下:// ... 省略代码,超过微信文章上限
第 26 至 31 行 :当调度任务前的待执行任务数(
totalItems
)等于当前待执行队列(processingOrder
)的任务数,意味着:1)任务执行器无任务请求,正在忙碌处理之前的任务;或者 2)任务延迟调度。睡眠 10 秒,避免资源浪费。10. 任务执行器【执行任务】
10.1 批量任务工作线程
批量任务工作后台线程( BatchWorkerRunnable )执行
#run(...)
方法,调度任务。实现代码如下://
// ... 省略代码,超过微信文章上限第 4 行 :无限循环执行调度,直到关闭。
第 6 行 :调用
getWork()
方法,获取一个批量任务直到成功。实现代码如下:// ... 省略代码,超过微信文章上限
第 12 行 :调用
#getTasksOf(...)
方法,获得实际批量任务。实现代码如下:// ... 省略代码,超过微信文章上限
第 14 至 24 行 :调用处理器( TaskProcessor ) 执行任务。当任务执行结果为
Congestion
或TransientError
,调用AcceptorExecutor#reprocess(...)
提交整个批量任务重新处理,实现代码如下:// AcceptorExecutor.java
// ... 省略代码,超过微信文章上限10.2 单任务工作线程
单任务工作后台线程( SingleTaskWorkerRunnable )执行
#run(...)
方法,调度任务,和BatchWorkerRunnable#run(...)
基本类似,就不啰嗦了。实现代码如下:@Override
// SingleTaskWorkerRunnable.java
// ... 省略代码,超过微信文章上限666. 彩蛋
又是一篇长文。建议边看代码,边对照着整体流程图,理解实际不难。
当然,欢迎你有任何疑问,在我的公众号( 芋道源码 ) 留言。
胖友,分享我的公众号( 芋道源码 ) 给你的胖友可好?
更多相关文章
- 分布式消息队列 RocketMQ源码解析:事务消息
- 分布式消息队列 RocketMQ源码解析:Filtersrv
- 分布式消息队列 RocketMQ 源码分析 —— 定时消息与消息重试
- 分布式消息队列 RocketMQ 源码分析 —— 高可用
- 分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(上)
- 分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(下)
- 分布式爬虫的部署之Scrapyd批量部署
- Python小技巧:如何批量更新已安装的库?
- Fabric 源码学习:如何实现批量管理远程服务器?