您现在的位置是:首页 >其他 >XXL-JOB中间件【实现分布式任务调度】网站首页其他
XXL-JOB中间件【实现分布式任务调度】
目录
1:XXL-JOB介绍
XXL-JOB是一个轻量级分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。
官网:https://www.xuxueli.com/xxl-job/
文档:https://www.xuxueli.com/xxl-job
XXL-JOB主要有调度中心、执行器、任务:
调度中心:
负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码;
主要职责为执行器管理、任务管理、监控运维、日志管理等
任务执行器:
负责接收调度请求并执行任务逻辑;
只要职责是注册服务、任务执行服务(接收到任务后会放入线程池中的任务队列)、执行结果上报、日志服务等
任务:负责执行具体的业务处理。
调度中心与执行器之间的工作流程如下:
执行流程:
1.任务执行器根据配置的调度中心的地址,自动注册到调度中心
2.达到任务触发条件,调度中心下发任务
3.执行器基于线程池执行任务,并把执行结果放入内存队列中、把执行日志写入日志文件中
4.执行器消费内存队列中的执行结果,主动上报给调度中心
5.当用户在调度中心查看任务日志,调度中心请求任务执行器,任务执行器读取任务日志文件并返回日志详情
2:搭建XXL-JOB
2.1:调度中心
首先下载XXL-JOB
GitHub:GitHub - xuxueli/xxl-job: A distributed task scheduling framework.(分布式任务调度平台XXL-JOB)
码云:xxl-job: 一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。
项目使用2.3.1版本: https://github.com/xuxueli/xxl-job/releases/tag/2.3.1
也可从课程资料目录获取,解压xxl-job-2.3.1.zip
使用IDEA打开解压后的目录
xxl-job-admin:调度中心
xxl-job-core:公共依赖
xxl-job-executor-samples:执行器Sample示例(选择合适的版本执行器,可直接使用)
:xxl-job-executor-sample-springboot:Springboot版本,通过Springboot管理执行器,推荐这种方式;
:xxl-job-executor-sample-frameless:无框架版本;
doc :文档资料,包含数据库脚本
在下发的虚拟机的MySQL中已经创建了xxl_job_2.3.1数据库
如下图:
账号和密码:admin/123456
如果无法使用虚拟机运行xxl-job可以在本机idea运行xxl-job调度中心。
2.2:执行器
下边配置执行器,执行器负责与调度中心通信接收调度中心发起的任务调度请求。
1、下边进入调度中心添加执行器
点击新增,填写执行器信息,appname是前边在nacos中配置xxl信息时指定的执行器的应用名。
添加成功:2: 在你项目中需要使用任务调度的模块中添加相关依赖
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
</dependency>
3: 在yaml配置文件中添加相关配置
xxl:
job:
admin:
addresses: http://192.168.101.65:8088/xxl-job-admin
executor:
appname: media-process-service
address:
ip:
port: 9999
logpath: /data/applogs/xxl-job/jobhandler
logretentiondays: 30
accessToken: default_token
注意配置中的appname这是执行器的应用名,port是执行器启动的端口,如果本地启动多个执行器注意端口不能重复。
4、配置xxl-job的执行器
将xxl-job示例工程下配置类拷贝到媒资管理的service工程下
拷贝至:自己项目需要用到任务调度的模块
到此完成媒资管理模块service工程配置xxl-job执行器,在xxl-job调度中心添加执行器,下边准备测试执行器与调度中心是否正常通信,因为接口工程依赖了service工程,所以启动媒资管理模块的接口工程。
启动后观察日志,出现下边的日志表示执行器在调度中心注册成功
同时观察调度中心中的执行器界面
在线机器地址处已显示1个执行器。
2.3:执行任务
下边编写任务,参考示例工程中任务类的编写方法,如下图:
下边在调度中心添加任务,进入任务管理
点击新增,填写任务信息
注意红色标记处:
调度类型:
固定速度指按固定的间隔定时调度。
Cron,通过Cron表达式实现更丰富的定时调度策略。
Cron表达式是一个字符串,通过它可以定义调度策略,格式如下:
{秒数} {分钟} {小时} {日期} {月份} {星期} {年份(可为空)}
xxl-job提供图形界面去配置:
一些例子如下:
30 10 1 * * ? 每天1点10分30秒触发
0/30 * * * * ? 每30秒触发一次
* 0/10 * * * ? 每10分钟触发一次
运行模式有BEAN和GLUE,bean模式较常用就是在项目工程中编写执行器的任务代码,GLUE是将任务代码编写在调度中心。
JobHandler即任务方法名,填写任务方法上边@XxlJob注解中的名称。
路由策略:当执行器集群部署时,调度中心向哪个执行器下发任务,这里选择第一个表示只向第一个执行器下发任务,路由策略的其它选项稍后在分片广播章节详细解释。
高级配置的其它配置项稍后在分片广播章节详细解释。
添加成功,启动任务
通过调度日志查看任务执行情况
下边启动媒资管理的service工程,启动执行器。
观察执行器方法的执行。
如果要停止任务需要在调度中心操作
任务跑一段时间注意清理日志
3:分片广播
掌握了xxl-job的基本使用,下边思考如何进行分布式任务处理呢?如下图,我们会启动多个执行器组成一个集群,去执行任务。
执行器在集群部署下调度中心有哪些路由策略呢?
查看xxl-job官方文档,阅读高级配置相关的内容:
高级配置:
- 路由策略:当执行器集群部署时,提供丰富的路由策略,包括;
FIRST(第一个):固定选择第一个机器;
LAST(最后一个):固定选择最后一个机器;
ROUND(轮询):;
RANDOM(随机):随机选择在线的机器;
CONSISTENT_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。
LEAST_FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举;
LEAST_RECENTLY_USED(最近最久未使用):最久未使用的机器优先被选举;
FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度;
BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;
SHARDING_BROADCAST(分片广播):广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;
- 子任务:每个任务都拥有一个唯一的任务ID(任务ID可以从任务列表获取),当本任务执行结束并且执行成功时,将会触发子任务ID所对应的任务的一次主动调度,通过子任务可以实现一个任务执行完成去执行另一个任务。
- 调度过期策略:
- 忽略:调度过期后,忽略过期的任务,从当前时间开始重新计算下次触发时间;
- 立即执行一次:调度过期后,立即执行一次,并从当前时间开始重新计算下次触发时间;
- 阻塞处理策略:调度过于密集执行器来不及处理时的处理策略;
单机串行(默认):调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行;
丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败;
覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务;
- 任务超时时间:支持自定义任务超时时间,任务运行超时将会主动中断任务;
- 失败重试次数;支持自定义任务失败重试次数,当任务失败时将会按照预设的失败重试次数主动进行重试;
下边要重点说的是分片广播策略,分片是指是调度中心以执行器为维度进行分片,将集群中的执行器标上序号:0,1,2,3...,广播是指每次调度会向集群中的所有执行器发送任务调度,请求中携带分片参数。
如下图:
每个执行器收到调度请求同时接收分片参数。
xxl-job支持动态扩容执行器集群从而动态增加分片数量,当有任务量增加可以部署更多的执行器到集群中,调度中心会动态修改分片的数量。
作业分片适用哪些场景呢?
- 分片任务场景:10个执行器的集群来处理10w条数据,每台机器只需要处理1w条数据,耗时降低10倍;
- 广播任务场景:广播执行器同时运行shell脚本、广播集群节点进行缓存更新等。
所以,广播分片方式不仅可以充分发挥每个执行器的能力,并且根据分片参数可以控制任务是否执行,最终灵活控制了执行器集群分布式处理任务。
使用说明:
"分片广播" 和普通任务开发流程一致,不同之处在于可以获取分片参数进行分片业务处理。
Java语言任务获取分片参数方式:
BEAN、GLUE模式(Java),可参考Sample示例执行器中的示例任务"ShardingJobHandler":
/**
* 2、分片广播任务
*/
@XxlJob("shardingJobHandler")
public void shardingJobHandler() throws Exception {
// 分片序号,从0开始
int shardIndex = XxlJobHelper.getShardIndex();
// 分片总数
int shardTotal = XxlJobHelper.getShardTotal();
....
下边测试作业分片:
1、定义作业分片的任务方法
/**
* 2、分片广播任务
*/
@XxlJob("shardingJobHandler")
public void shardingJobHandler() throws Exception {
// 分片参数
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();
log.info("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
log.info("开始执行第"+shardIndex+"批任务");
}
2、在调度中心添加任务
添加成功:
启动任务,观察日志
下边启动两个执行器实例,观察每个实例的执行情况
首先在nacos中配置media-service的本地优先配置:
#配置本地优先
spring:
cloud:
config:
override-none: true
将media-service启动两个实例
两个实例的在启动时注意端口不能冲突:
实例1 在VM options处添加:-Dserver.port=63051 -Dxxl.job.executor.port=9998
实例2 在VM options处添加:-Dserver.port=63050 -Dxxl.job.executor.port=9999
例如:
启动两个实例
观察任务调度中心,稍等片刻执行器有两个
观察两个执行实例的日志:
另一实例的日志如下:
从日志可以看每个实例的分片序号不同。
如果其中一个执行器挂掉,只剩下一个执行器在工作,稍等片刻调用中心发现少了一个执行器将动态调整总分片数为1。
到此作业分片任务调试完成