ruoyi学习Quartz

依旧是从ruoyi学习,这次是任务调度框架Quartz

参考

Quartz官方文档

quartz (从原理到应用)详解篇

Quartz 是什么?一文带你入坑 - 知乎

任务调度,也可说是定时任务,其实还是很常见的在我们的生活中,最常见的莫过于闹钟了,这就是一种定时任务,其他还有提醒事项,消息订阅等等

入门

关于Quartz,首先要知道JobJobDetailTriggerScheduler

我简单的用非官方的语言,我的理解,讲一下

  • Job:具体的调用接口,关键是要实现其中的execute方法

  • JobDetail:区别不同类型的任务,还包括Job实例的属性

  • Trigger:触发器,配置不同任务的执行策略

  • Scheduler:调度器,调度任务

1
2
3
4
5
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>x.x.x</version>
</dependency>

Scheduler

Scheduler作为真正的任务调度器,肯定是需要配置各种参数的,拿ruoyi配置来看

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
* 定时任务配置
*
* @author ruoyi
*/
@Configuration
public class ScheduleConfig {
@Bean
public SchedulerFactoryBean schedulerFactoryBean(DataSource dataSource) {
SchedulerFactoryBean factory = new SchedulerFactoryBean();
factory.setDataSource(dataSource);

// quartz参数
Properties prop = new Properties();
prop.put("org.quartz.scheduler.instanceName", "RuoyiScheduler");
prop.put("org.quartz.scheduler.instanceId", "AUTO");
// 线程池配置
prop.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool");
prop.put("org.quartz.threadPool.threadCount", "20");
prop.put("org.quartz.threadPool.threadPriority", "5");
// JobStore配置
prop.put("org.quartz.jobStore.class", "org.quartz.impl.jdbcjobstore.JobStoreTX");
// 集群配置
prop.put("org.quartz.jobStore.isClustered", "true");
prop.put("org.quartz.jobStore.clusterCheckinInterval", "15000");
prop.put("org.quartz.jobStore.maxMisfiresToHandleAtATime", "1");
prop.put("org.quartz.jobStore.txIsolationLevelSerializable", "true");

// sqlserver 启用
// prop.put("org.quartz.jobStore.selectWithLockSQL", "SELECT * FROM {0}LOCKS UPDLOCK WHERE LOCK_NAME = ?");
prop.put("org.quartz.jobStore.misfireThreshold", "12000");
prop.put("org.quartz.jobStore.tablePrefix", "QRTZ_");
factory.setQuartzProperties(prop);

factory.setSchedulerName("RuoyiScheduler");
// 延时启动
factory.setStartupDelay(1);
factory.setApplicationContextSchedulerContextKey("applicationContextKey");
// 可选,QuartzScheduler
// 启动时更新己存在的Job,这样就不用每次修改targetObject后删除qrtz_job_details表对应记录了
factory.setOverwriteExistingJobs(true);
// 设置自动启动,默认为true
factory.setAutoStartup(true);

return factory;
}
}

ruoyi的配置是一个很好的实战参考,其上的注解也写的非常明白了

主参数配置

Quartz主参数配置,可参考Quartz主配置ruoyi配置只有两项

  • org.quartz.scheduler.instanceName:本身对于调度程序没有意义,如是集群模式需要同一逻辑调度程序使用相同名称

  • org.quartz.scheduler.instanceId:调度程序id方法,配置为Auto则自动生成)

线程池配置

ThreadPool配置,可参考Quartz配置ThreadPool设置

  • org.quartz.threadPool.classThreadPool的实现类,Quartz自带的org.quartz.simpl.SimpleThreadPool已经可以了

  • org.quartz.threadPool.threadCount:线程数,根据场景需要而配置

  • org.quartz.threadPool.threadPriority:线程优先级

JobStore配置

顾名思义,就是说Job这些任务的存储方式,一共有三种方式

  • RAMJobStore:内存存储,快速方便,易丢失

  • JDBC-JobStoreTXJDBC-JobStoreCMT:这两个对比着说最好,他们都是通过JDBC来存储,因此不易失,两者主要区别是事务由谁管理,TX表示自己来管理事务,CMT表示加入全局事务管理,因为使用Spring事务管理的原因,应该大多数都是用前者吧

参考:

Job Stores

Quartz配置RAMJobStore

Quartz配置JDBC-JobStoreTX

Quartz配置JDBC-JobStoreCMT

数据源配置

详细请参考,Quartz配置DataSources

一旦前面使用了JDBC的存储方式,就一定要配置数据源的,ruoyi使用的是配置注入的方式,就是将ruoyi-framework配置的数据源以注入的方式进行配置,这里不细说了,有机会写ruoyi数据源设计再讲吧

Quartz提供了数据库文件,quartz.sql下载

集群配置

参考使用JDBC-JobStore配置群集

  • org.quartz.jobStore.isClustered:设置为“true”打开集群功能

  • org.quartz.jobStore.clusterCheckinInterval:检测集群间频率,单位毫秒

  • org.quartz.jobStore.maxMisfiresToHandleAtATime:在给定的通行证中,工作区将处理的最大错误次数触发。

  • org.quartz.jobStore.txIsolationLevelSerializable:“true”表示Quartz(使用JobStoreTXCMT)在JDBC连接上调用setTransactionIsolationConnection.TRANSACTION_SERIALIZABLE)。这可以有助于防止在高负载下的某些数据库的锁定超时以及“持久”事务。

  • org.quartz.jobStore.misfireThreshold:在被认为“失火”之前,调度程序将“容忍”一个Triggers将其下一个启动时间通过的毫秒数。

  • org.quartz.jobStore.tablePrefix:表前缀

其他

ruoyi剩下的配置就不多讲了,看注释,查源码即可

Job

关于Job,前面已经说过,需要实现Job接口,实现execute方法,看看ruoyi是怎么做的吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
/**
* 抽象quartz调用
*
* @author ruoyi
*/
public abstract class AbstractQuartzJob implements Job {
private static final Logger log = LoggerFactory.getLogger(AbstractQuartzJob.class);

/**
* 线程本地变量
*/
private static ThreadLocal<Date> threadLocal = new ThreadLocal<>();

@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
SysJob sysJob = new SysJob();
BeanUtils.copyBeanProp(sysJob, context.getMergedJobDataMap().get(ScheduleConstants.TASK_PROPERTIES));
try {
before(context, sysJob);
if (sysJob != null) {
doExecute(context, sysJob);
}
after(context, sysJob, null);
} catch (Exception e) {
log.error("任务执行异常 - :", e);
after(context, sysJob, e);
}
}

/**
* 执行前
*
* @param context 工作执行上下文对象
* @param sysJob 系统计划任务
*/
protected void before(JobExecutionContext context, SysJob sysJob) {
threadLocal.set(new Date());
}

/**
* 执行后
*
* @param context 工作执行上下文对象
* @param sysJob 系统计划任务
*/
protected void after(JobExecutionContext context, SysJob sysJob, Exception e) {
Date startTime = threadLocal.get();
threadLocal.remove();

final SysJobLog sysJobLog = new SysJobLog();
sysJobLog.setJobName(sysJob.getJobName());
sysJobLog.setJobGroup(sysJob.getJobGroup());
sysJobLog.setInvokeTarget(sysJob.getInvokeTarget());
sysJobLog.setStartTime(startTime);
sysJobLog.setStopTime(new Date());
long runMs = sysJobLog.getStopTime().getTime() - sysJobLog.getStartTime().getTime();
sysJobLog.setJobMessage(sysJobLog.getJobName() + " 总共耗时:" + runMs + "毫秒");
if (e != null) {
sysJobLog.setStatus(Constants.FAIL);
String errorMsg = StringUtils.substring(ExceptionUtil.getExceptionMessage(e), 0, 2000);
sysJobLog.setExceptionInfo(errorMsg);
} else {
sysJobLog.setStatus(Constants.SUCCESS);
}

// 写入数据库当中
SpringUtils.getBean(ISysJobLogService.class).addJobLog(sysJobLog);
}

/**
* 执行方法,由子类重载
*
* @param context 工作执行上下文对象
* @param sysJob 系统计划任务
* @throws Exception 执行过程中的异常
*/
protected abstract void doExecute(JobExecutionContext context, SysJob sysJob) throws Exception;
}

这里定义的是抽象类,抽象方法为doExecute,从@Overrideexecute可知,doExecute方法作为execute的中间部分被执行,夹在beforeafter方法之中。说到这个必须要提一下这个线程本地变量ThreadLocal,之前只是觉得面试题中见得多,现在看的代码多了,就知道了实际生产中用的真不少。这里ThreadLocal就不必过多介绍了,保存线程级变量,互不干扰。这里保存的是Date数据,在before方法中设置,在after中获取,并记录job日志,写入数据库。

抽象类看完了,接下来就是具体实现类了,这里有两个

1
2
3
4
5
6
7
8
9
10
11
12
/**
* 定时任务处理(禁止并发执行)
*
* @author ruoyi
*/
@DisallowConcurrentExecution
public class QuartzDisallowConcurrentExecution extends AbstractQuartzJob {
@Override
protected void doExecute(JobExecutionContext context, SysJob sysJob) throws Exception {
JobInvokeUtil.invokeMethod(sysJob);
}
}
1
2
3
4
5
6
7
8
9
10
11
/**
* 定时任务处理(允许并发执行)
*
* @author ruoyi
*/
public class QuartzJobExecution extends AbstractQuartzJob {
@Override
protected void doExecute(JobExecutionContext context, SysJob sysJob) throws Exception {
JobInvokeUtil.invokeMethod(sysJob);
}
}

从命名和有无@DisallowConcurrentExecution注解就可区分,关键是是否允许并发执行!?

可以看到它们都是调用了JobInvokeUtil工具,如下(因为代码过多,有些方法省略掉了)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
/**
* 任务执行工具
*
* @author ruoyi
*/
public class JobInvokeUtil {
/**
* 执行方法
*
* @param sysJob 系统任务
*/
public static void invokeMethod(SysJob sysJob) throws Exception {
String invokeTarget = sysJob.getInvokeTarget();
String beanName = getBeanName(invokeTarget);
String methodName = getMethodName(invokeTarget);
List<Object[]> methodParams = getMethodParams(invokeTarget);

if (!isValidClassName(beanName)) {
Object bean = SpringUtils.getBean(beanName);
invokeMethod(bean, methodName, methodParams);
} else {
Object bean = Class.forName(beanName).newInstance();
invokeMethod(bean, methodName, methodParams);
}
}

/**
* 调用任务方法
*
* @param bean 目标对象
* @param methodName 方法名称
* @param methodParams 方法参数
*/
private static void invokeMethod(Object bean, String methodName, List<Object[]> methodParams)
throws NoSuchMethodException, SecurityException, IllegalAccessException, IllegalArgumentException,
InvocationTargetException {
if (StringUtils.isNotNull(methodParams) && methodParams.size() > 0) {
Method method = bean.getClass().getDeclaredMethod(methodName, getMethodParamsType(methodParams));
method.invoke(bean, getMethodParamsValue(methodParams));
} else {
Method method = bean.getClass().getDeclaredMethod(methodName);
method.invoke(bean);
}
}

/**
* 校验是否为为class包名
*
* @param str 名称
* @return true是 false否
*/
public static boolean isValidClassName(String invokeTarget) {
return StringUtils.countMatches(invokeTarget, ".") > 1;
}

/**
* 获取bean名称
*
* @param invokeTarget 目标字符串
* @return bean名称
*/
public static String getBeanName(String invokeTarget) {
String beanName = StringUtils.substringBefore(invokeTarget, "(");
return StringUtils.substringBeforeLast(beanName, ".");
}

/**
* 获取bean方法
*
* @param invokeTarget 目标字符串
* @return method方法
*/
public static String getMethodName(String invokeTarget) {
String methodName = StringUtils.substringBefore(invokeTarget, "(");
return StringUtils.substringAfterLast(methodName, ".");
}

/**
* 获取method方法参数相关列表
*
* @param invokeTarget 目标字符串
* @return method方法相关参数列表
*/
public static List<Object[]> getMethodParams(String invokeTarget) {
...
}

/**
* 获取参数类型
*
* @param methodParams 参数相关列表
* @return 参数类型列表
*/
public static Class<?>[] getMethodParamsType(List<Object[]> methodParams) {
...
}

/**
* 获取参数值
*
* @param methodParams 参数相关列表
* @return 参数值列表
*/
public static Object[] getMethodParamsValue(List<Object[]> methodParams) {
...
}
}

invokeMethod(SysJob sysJob)方法来看,先从SysJob获取目标字符串(大概就是“类名.方法名(参数...)”这样的);然后分别获取类名、方法名、方法参数;在Spring容器中有的交予bean执行,没有的用反射的方式实例化一个。最后在invokeMethod(Object bean, String methodName, List<Object[]> methodParams)的方法中实现。本类中其他的方法都是辅助做验证,解析的,不在多提。

Trigger

前面也提到了,Trigger触发器,关系到Job的执行策略,关联SchedulerJob。下面便是关键的创建Scheduler代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
/**
* 定时任务工具类
*
* @author ruoyi
*/
public class ScheduleUtils {
/**
* 得到quartz任务类
*
* @param sysJob 执行计划
* @return 具体执行任务类
*/
private static Class<? extends Job> getQuartzJobClass(SysJob sysJob) {
boolean isConcurrent = "0".equals(sysJob.getConcurrent());
return isConcurrent ? QuartzJobExecution.class : QuartzDisallowConcurrentExecution.class;
}

/**
* 构建任务触发对象
*/
public static TriggerKey getTriggerKey(Long jobId, String jobGroup) {
return TriggerKey.triggerKey(ScheduleConstants.TASK_CLASS_NAME + jobId, jobGroup);
}

/**
* 构建任务键对象
*/
public static JobKey getJobKey(Long jobId, String jobGroup) {
return JobKey.jobKey(ScheduleConstants.TASK_CLASS_NAME + jobId, jobGroup);
}

/**
* 创建定时任务
*/
public static void createScheduleJob(Scheduler scheduler, SysJob job) throws SchedulerException, TaskException {
Class<? extends Job> jobClass = getQuartzJobClass(job);
// 构建job信息
Long jobId = job.getJobId();
String jobGroup = job.getJobGroup();
JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(getJobKey(jobId, jobGroup)).build();

// 表达式调度构建器
CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(job.getCronExpression());
cronScheduleBuilder = handleCronScheduleMisfirePolicy(job, cronScheduleBuilder);

// 按新的cronExpression表达式构建一个新的trigger
CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(getTriggerKey(jobId, jobGroup))
.withSchedule(cronScheduleBuilder).build();

// 放入参数,运行时的方法可以获取
jobDetail.getJobDataMap().put(ScheduleConstants.TASK_PROPERTIES, job);

// 判断是否存在
if (scheduler.checkExists(getJobKey(jobId, jobGroup))) {
// 防止创建时存在数据问题 先移除,然后在执行创建操作
scheduler.deleteJob(getJobKey(jobId, jobGroup));
}

scheduler.scheduleJob(jobDetail, trigger);

// 暂停任务
if (job.getStatus().equals(ScheduleConstants.Status.PAUSE.getValue())) {
scheduler.pauseJob(ScheduleUtils.getJobKey(jobId, jobGroup));
}
}

/**
* 设置定时任务策略
*/
public static CronScheduleBuilder handleCronScheduleMisfirePolicy(SysJob job, CronScheduleBuilder cb)
throws TaskException {
switch (job.getMisfirePolicy()) {
case ScheduleConstants.MISFIRE_DEFAULT:
return cb;
case ScheduleConstants.MISFIRE_IGNORE_MISFIRES:
return cb.withMisfireHandlingInstructionIgnoreMisfires();
case ScheduleConstants.MISFIRE_FIRE_AND_PROCEED:
return cb.withMisfireHandlingInstructionFireAndProceed();
case ScheduleConstants.MISFIRE_DO_NOTHING:
return cb.withMisfireHandlingInstructionDoNothing();
default:
throw new TaskException("The task misfire policy '" + job.getMisfirePolicy()
+ "' cannot be used in cron schedule tasks", Code.CONFIG_ERROR);
}
}
}

ruoyi代码写的很漂亮的,注释也非常清楚。直接从createScheduleJob方法来看,首先根据job是否支持并发确定class信息;然后根据Job创建JobDetail,用getJobKey(Long jobId, String jobGroup)区别标识;然后创建表达式构建器,ruoyi只使用cron表达式的方式(本身cron表达式就很有优势),同时根据MisfirePolicy使用不同策略;然后根据表达式构建器创建Trigger,类似于上的getTriggerKey(Long jobId, String jobGroup)方法区别标识;JobDetail放入Job对象,方便随时获取;最后就是将对应的JobDetailTrigger加入任务调度器中。

最后后,通过IDE找到调用createScheduleJob方法的地方,就三个,一个SysJobServiceImplinit方法,另外就是新增和修改Job方法中了。

至此

至此,ruoyi有关于quartz的使用几乎就讲完了,剩下的就是接口和业务设计的问题了,还有一个很重要的点,应该启动体验一下ruoyi设计这个过程,这个留到下次想起来再补充吧😂😂😂

思考

是否可以复用job,多个trigger共同调度?

就是说如果同一个job,它的执行策略比较复杂,一个cron表达式不够用,怎样能在复用这个job的情况下,将它设置多个Trigger呢?

还有。。。