依旧是从ruoyi
学习,这次是任务调度框架Quartz
参考
Quartz官方文档
quartz
(从原理到应用)详解篇
Quartz
是什么?一文带你入坑 - 知乎
任务调度,也可说是定时任务,其实还是很常见的在我们的生活中,最常见的莫过于闹钟了,这就是一种定时任务,其他还有提醒事项,消息订阅等等
入门
关于Quartz
,首先要知道Job
、JobDetail
、Trigger
、Scheduler
我简单的用非官方的语言,我的理解,讲一下
1 2 3 4 5
| <dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz</artifactId> <version>x.x.x</version> </dependency>
|
Scheduler
Scheduler
作为真正的任务调度器,肯定是需要配置各种参数的,拿ruoyi
配置来看
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
|
@Configuration public class ScheduleConfig { @Bean public SchedulerFactoryBean schedulerFactoryBean(DataSource dataSource) { SchedulerFactoryBean factory = new SchedulerFactoryBean(); factory.setDataSource(dataSource);
Properties prop = new Properties(); prop.put("org.quartz.scheduler.instanceName", "RuoyiScheduler"); prop.put("org.quartz.scheduler.instanceId", "AUTO"); prop.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool"); prop.put("org.quartz.threadPool.threadCount", "20"); prop.put("org.quartz.threadPool.threadPriority", "5"); prop.put("org.quartz.jobStore.class", "org.quartz.impl.jdbcjobstore.JobStoreTX"); prop.put("org.quartz.jobStore.isClustered", "true"); prop.put("org.quartz.jobStore.clusterCheckinInterval", "15000"); prop.put("org.quartz.jobStore.maxMisfiresToHandleAtATime", "1"); prop.put("org.quartz.jobStore.txIsolationLevelSerializable", "true");
prop.put("org.quartz.jobStore.misfireThreshold", "12000"); prop.put("org.quartz.jobStore.tablePrefix", "QRTZ_"); factory.setQuartzProperties(prop);
factory.setSchedulerName("RuoyiScheduler"); factory.setStartupDelay(1); factory.setApplicationContextSchedulerContextKey("applicationContextKey"); factory.setOverwriteExistingJobs(true); factory.setAutoStartup(true);
return factory; } }
|
ruoyi
的配置是一个很好的实战参考,其上的注解也写的非常明白了
主参数配置
Quartz
主参数配置,可参考Quartz主配置,ruoyi
配置只有两项
线程池配置
ThreadPool
配置,可参考Quartz配置ThreadPool设置
org.quartz.threadPool.class
:ThreadPool
的实现类,Quartz
自带的org.quartz.simpl.SimpleThreadPool
已经可以了
org.quartz.threadPool.threadCount
:线程数,根据场景需要而配置
org.quartz.threadPool.threadPriority
:线程优先级
JobStore配置
顾名思义,就是说Job
这些任务的存储方式,一共有三种方式
参考:
Job
Stores
Quartz配置RAMJobStore
Quartz配置JDBC-JobStoreTX
Quartz配置JDBC-JobStoreCMT
数据源配置
详细请参考,Quartz配置DataSources
一旦前面使用了JDBC
的存储方式,就一定要配置数据源的,ruoyi
使用的是配置注入的方式,就是将ruoyi-framework
配置的数据源以注入的方式进行配置,这里不细说了,有机会写ruoyi
数据源设计再讲吧
Quartz提供了数据库文件,quartz.sql下载
集群配置
参考使用JDBC-JobStore配置群集
org.quartz.jobStore.isClustered
:设置为“true
”打开集群功能
org.quartz.jobStore.clusterCheckinInterval
:检测集群间频率,单位毫秒
org.quartz.jobStore.maxMisfiresToHandleAtATime
:在给定的通行证中,工作区将处理的最大错误次数触发。
org.quartz.jobStore.txIsolationLevelSerializable
:“true
”表示Quartz
(使用JobStoreTX
或CMT
)在JDBC
连接上调用setTransactionIsolation
(Connection.TRANSACTION_SERIALIZABLE
)。这可以有助于防止在高负载下的某些数据库的锁定超时以及“持久”事务。
org.quartz.jobStore.misfireThreshold
:在被认为“失火”之前,调度程序将“容忍”一个Triggers
将其下一个启动时间通过的毫秒数。
org.quartz.jobStore.tablePrefix
:表前缀
其他
ruoyi剩下的配置就不多讲了,看注释,查源码即可
Job
关于Job
,前面已经说过,需要实现Job
接口,实现execute
方法,看看ruoyi
是怎么做的吧。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
|
public abstract class AbstractQuartzJob implements Job { private static final Logger log = LoggerFactory.getLogger(AbstractQuartzJob.class);
private static ThreadLocal<Date> threadLocal = new ThreadLocal<>();
@Override public void execute(JobExecutionContext context) throws JobExecutionException { SysJob sysJob = new SysJob(); BeanUtils.copyBeanProp(sysJob, context.getMergedJobDataMap().get(ScheduleConstants.TASK_PROPERTIES)); try { before(context, sysJob); if (sysJob != null) { doExecute(context, sysJob); } after(context, sysJob, null); } catch (Exception e) { log.error("任务执行异常 - :", e); after(context, sysJob, e); } }
protected void before(JobExecutionContext context, SysJob sysJob) { threadLocal.set(new Date()); }
protected void after(JobExecutionContext context, SysJob sysJob, Exception e) { Date startTime = threadLocal.get(); threadLocal.remove();
final SysJobLog sysJobLog = new SysJobLog(); sysJobLog.setJobName(sysJob.getJobName()); sysJobLog.setJobGroup(sysJob.getJobGroup()); sysJobLog.setInvokeTarget(sysJob.getInvokeTarget()); sysJobLog.setStartTime(startTime); sysJobLog.setStopTime(new Date()); long runMs = sysJobLog.getStopTime().getTime() - sysJobLog.getStartTime().getTime(); sysJobLog.setJobMessage(sysJobLog.getJobName() + " 总共耗时:" + runMs + "毫秒"); if (e != null) { sysJobLog.setStatus(Constants.FAIL); String errorMsg = StringUtils.substring(ExceptionUtil.getExceptionMessage(e), 0, 2000); sysJobLog.setExceptionInfo(errorMsg); } else { sysJobLog.setStatus(Constants.SUCCESS); }
SpringUtils.getBean(ISysJobLogService.class).addJobLog(sysJobLog); }
protected abstract void doExecute(JobExecutionContext context, SysJob sysJob) throws Exception; }
|
这里定义的是抽象类,抽象方法为doExecute
,从@Override
的execute
可知,doExecute
方法作为execute
的中间部分被执行,夹在before
和after
方法之中。说到这个必须要提一下这个线程本地变量ThreadLocal
,之前只是觉得面试题中见得多,现在看的代码多了,就知道了实际生产中用的真不少。这里ThreadLocal
就不必过多介绍了,保存线程级变量,互不干扰。这里保存的是Date
数据,在before
方法中设置,在after
中获取,并记录job
日志,写入数据库。
抽象类看完了,接下来就是具体实现类了,这里有两个
1 2 3 4 5 6 7 8 9 10 11 12
|
@DisallowConcurrentExecution public class QuartzDisallowConcurrentExecution extends AbstractQuartzJob { @Override protected void doExecute(JobExecutionContext context, SysJob sysJob) throws Exception { JobInvokeUtil.invokeMethod(sysJob); } }
|
1 2 3 4 5 6 7 8 9 10 11
|
public class QuartzJobExecution extends AbstractQuartzJob { @Override protected void doExecute(JobExecutionContext context, SysJob sysJob) throws Exception { JobInvokeUtil.invokeMethod(sysJob); } }
|
从命名和有无@DisallowConcurrentExecution
注解就可区分,关键是是否允许并发执行!?
可以看到它们都是调用了JobInvokeUtil
工具,如下(因为代码过多,有些方法省略掉了)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
|
public class JobInvokeUtil {
public static void invokeMethod(SysJob sysJob) throws Exception { String invokeTarget = sysJob.getInvokeTarget(); String beanName = getBeanName(invokeTarget); String methodName = getMethodName(invokeTarget); List<Object[]> methodParams = getMethodParams(invokeTarget);
if (!isValidClassName(beanName)) { Object bean = SpringUtils.getBean(beanName); invokeMethod(bean, methodName, methodParams); } else { Object bean = Class.forName(beanName).newInstance(); invokeMethod(bean, methodName, methodParams); } }
private static void invokeMethod(Object bean, String methodName, List<Object[]> methodParams) throws NoSuchMethodException, SecurityException, IllegalAccessException, IllegalArgumentException, InvocationTargetException { if (StringUtils.isNotNull(methodParams) && methodParams.size() > 0) { Method method = bean.getClass().getDeclaredMethod(methodName, getMethodParamsType(methodParams)); method.invoke(bean, getMethodParamsValue(methodParams)); } else { Method method = bean.getClass().getDeclaredMethod(methodName); method.invoke(bean); } }
public static boolean isValidClassName(String invokeTarget) { return StringUtils.countMatches(invokeTarget, ".") > 1; }
public static String getBeanName(String invokeTarget) { String beanName = StringUtils.substringBefore(invokeTarget, "("); return StringUtils.substringBeforeLast(beanName, "."); }
public static String getMethodName(String invokeTarget) { String methodName = StringUtils.substringBefore(invokeTarget, "("); return StringUtils.substringAfterLast(methodName, "."); }
public static List<Object[]> getMethodParams(String invokeTarget) { ... }
public static Class<?>[] getMethodParamsType(List<Object[]> methodParams) { ... }
public static Object[] getMethodParamsValue(List<Object[]> methodParams) { ... } }
|
从invokeMethod(SysJob sysJob)
方法来看,先从SysJob
获取目标字符串(大概就是“类名.方法名(参数...)”这样的);然后分别获取类名、方法名、方法参数;在Spring容器中有的交予bean执行,没有的用反射的方式实例化一个。最后在invokeMethod(Object bean, String methodName, List<Object[]> methodParams)
的方法中实现。本类中其他的方法都是辅助做验证,解析的,不在多提。
Trigger
前面也提到了,Trigger
触发器,关系到Job
的执行策略,关联Scheduler
和Job
。下面便是关键的创建Scheduler
代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
|
public class ScheduleUtils {
private static Class<? extends Job> getQuartzJobClass(SysJob sysJob) { boolean isConcurrent = "0".equals(sysJob.getConcurrent()); return isConcurrent ? QuartzJobExecution.class : QuartzDisallowConcurrentExecution.class; }
public static TriggerKey getTriggerKey(Long jobId, String jobGroup) { return TriggerKey.triggerKey(ScheduleConstants.TASK_CLASS_NAME + jobId, jobGroup); }
public static JobKey getJobKey(Long jobId, String jobGroup) { return JobKey.jobKey(ScheduleConstants.TASK_CLASS_NAME + jobId, jobGroup); }
public static void createScheduleJob(Scheduler scheduler, SysJob job) throws SchedulerException, TaskException { Class<? extends Job> jobClass = getQuartzJobClass(job); Long jobId = job.getJobId(); String jobGroup = job.getJobGroup(); JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(getJobKey(jobId, jobGroup)).build();
CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(job.getCronExpression()); cronScheduleBuilder = handleCronScheduleMisfirePolicy(job, cronScheduleBuilder);
CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(getTriggerKey(jobId, jobGroup)) .withSchedule(cronScheduleBuilder).build();
jobDetail.getJobDataMap().put(ScheduleConstants.TASK_PROPERTIES, job);
if (scheduler.checkExists(getJobKey(jobId, jobGroup))) { scheduler.deleteJob(getJobKey(jobId, jobGroup)); }
scheduler.scheduleJob(jobDetail, trigger);
if (job.getStatus().equals(ScheduleConstants.Status.PAUSE.getValue())) { scheduler.pauseJob(ScheduleUtils.getJobKey(jobId, jobGroup)); } }
public static CronScheduleBuilder handleCronScheduleMisfirePolicy(SysJob job, CronScheduleBuilder cb) throws TaskException { switch (job.getMisfirePolicy()) { case ScheduleConstants.MISFIRE_DEFAULT: return cb; case ScheduleConstants.MISFIRE_IGNORE_MISFIRES: return cb.withMisfireHandlingInstructionIgnoreMisfires(); case ScheduleConstants.MISFIRE_FIRE_AND_PROCEED: return cb.withMisfireHandlingInstructionFireAndProceed(); case ScheduleConstants.MISFIRE_DO_NOTHING: return cb.withMisfireHandlingInstructionDoNothing(); default: throw new TaskException("The task misfire policy '" + job.getMisfirePolicy() + "' cannot be used in cron schedule tasks", Code.CONFIG_ERROR); } } }
|
ruoyi
代码写的很漂亮的,注释也非常清楚。直接从createScheduleJob
方法来看,首先根据job
是否支持并发确定class
信息;然后根据Job
创建JobDetail
,用getJobKey(Long jobId, String jobGroup)
区别标识;然后创建表达式构建器,ruoyi
只使用cron
表达式的方式(本身cron
表达式就很有优势),同时根据MisfirePolicy
使用不同策略;然后根据表达式构建器创建Trigger
,类似于上的getTriggerKey(Long jobId, String jobGroup)
方法区别标识;JobDetail
放入Job
对象,方便随时获取;最后就是将对应的JobDetail
和Trigger
加入任务调度器中。
最后后,通过IDE
找到调用createScheduleJob
方法的地方,就三个,一个SysJobServiceImpl
的init
方法,另外就是新增和修改Job
方法中了。
至此
至此,ruoyi
有关于quartz
的使用几乎就讲完了,剩下的就是接口和业务设计的问题了,还有一个很重要的点,应该启动体验一下ruoyi
设计这个过程,这个留到下次想起来再补充吧😂😂😂
思考
是否可以复用job
,多个trigger
共同调度?
就是说如果同一个job
,它的执行策略比较复杂,一个cron
表达式不够用,怎样能在复用这个job
的情况下,将它设置多个Trigger
呢?
还有。。。