登录/注册
小开开
2649
占位
2
占位
0
浏览量
占位
粉丝
占位
关注
冷饭新炒
小开开
2021-08-29 11:27:58 2021-08-29
134
0

目录

前言 相信大家对Quartz这框架并不陌生,日常工作经常会接触到,我们团队也在使用。但是我发现大家在工作中对其仅停留在简单配置使用层面,很多时候发生问题,并不知道它问题root cause是什么,配置参数也是随便在网上copy回来乱用,并不是基于项目实际情况。自从前几年开始做技术管理后,工作期间也没多少时间可以在一线撸码,刚好趁周末时间重新把源码看了一遍整理下,希望对大家有帮助!PS:本文基于Quartz2.3.0,不会介绍如何使用Quartz,完全没有接触过Quartz的朋友建议先阅读官方文档。

常见问题

  • Quartz的核心组件?
  • Quartz的核心运行机制?
  • Quartz的线程模型
  • Quartz集群进程间如何通信?
  • Quartz集群如何保证高并发下不重复跑?
  • Quartz如何保证不漏跑
  • Quartz默认任务锁机制?
  • Quartz常见问题

Quartz的核心组件

JobDetail

我们创建一个实现 Job 接口的类,使用 JobBuilder包装成 JobDetail,它可以携带 KV 的数据,方面用户可以扩展自己任务要用的参数。

Trigger

定义任务的触发规则,使用 TriggerBuilder 来构建。

为什么JobDetail和Trigger是一对多的关系

因为通常我们一个任务实际上是有多种触发规则的,例如:我想我的跑批任务周一9点跑一次,周三5点跑一起,它实际上是属于同一个Job,只是不同的触发规则,这时候我们就可以定义多个Trigger组合起来用。

Set<Trigger> triggersForJob = new HashSet();
triggersForJob.add(trigger);
triggersForJob.add(trigger1);
// 绑定关系是1:N
scheduler.scheduleJob(jobDetail, triggersForJob,true);

常见的Tigger类型

接口 描述 特点
SimpleTrigger 简单触发器 SimpleTrigger 可以定义固定时刻或者固定时间间隔的调度规则(精确到毫秒)

例如:每天 9 点钟运行;每隔 30 分钟运行一次
CalendarIntervalTrigger 基于日历的触发器 CalendarIntervalTrigger 可以定义更多时间单位的调度需求,精确到秒

好处是不需要去计算时间间隔,比如 1 个小时等于多少毫秒

例如每年、每个月、每周、每天、每小时、每分钟、每秒

每年的月数和每个月的天数不是固定的,这种情况也适用
DailyTimeIntervalTrigger 基于日期的触发器 每天的某个时间段

例如:每天早上 9 点到晚上 9 点,每隔半个小时执行一次,并且只在周一到周六执行。
CronTrigger 基于 Cron 表达式的触发器 可以支持任意时间(推荐) 如:0/10 * * * * ?

怎么排除掉一些日期不触发

比较常见的需求是周末不计息、节假日不触发邮件通知

如果要在触发器的基础上,排除一些时间区间不执行任务,就要用到 Quartz 的 Calendar 类(注意不是 JDK 的 Calendar)。可以按年、月、周、日、特定日期、Cron 表达式排除

使用方法

  • 调用调度器的 addCalendar()方法注册排除规则
  • 调用 Trigger的 modifiedByCalendar()添加到触发器中
//排除营业时间
scheduler.addCalendar("workingHours",new CronCalendar("* * 0-7,18-23?* *”"),false,false);
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity("trigger1", "group1")
.startNow()
.modifiedByCalendar("workingHours") //排除时间段
.withSchedule(SimpleScheduleBuilder.simpleSchedule()
.withIntervalInSeconds(2)
.repeatForever())
.build();

Scheduler

调度器,是 Quartz 的指挥官,由 StdSchedulerFactory 产生,它是单例的,并且是 Quartz 中最重要的 API,默认是实现类是 StdScheduler,里面包含了一个 QuartzScheduler。QuartzScheduler 里面又包含了一个 QuartzSchedulerThread。

Scheduler 中的方法主要分为三大类:

  • 操作调度器本身,例如调度器的启动 start()、调度器的关闭 shutdown()。
  • 操作 Trigger,例如 pauseTriggers()、resumeTrigger()。
  • 操作 Job,例如 scheduleJob()、unscheduleJob()、rescheduleJob()

这些方法非常重要,可以实现任务的动态调度。

Listener

事件监听器。Quartz框架采用观察者模式设计,可以无入侵式地让用户可以收到对应的通知。提供三种类型监听器,分别是SchedulerListener(监听 Scheduler 的),TriggerListener(监听 Trigger 的),JobListener(监听 Job 的)

场景

  • 任务完成了,发邮件给对应的人。例如:跑批完成了,我想系统自动给我发一个邮件通知
  • 监控任务整个生命周期。例如:作为一个中央分布式调度器需要通过Webhook或者MQ触发多个服务,想监控每个任务的执行情况,是否有遗漏

工具类:ListenerManager,用于添加、获取、移除监听器

工具类:Matcher,主要是基于 groupName 和 keyName 进行匹配。

JobStore

Jobstore 用来存储任务和触发器相关的信息,例如所有任务的名称、数量、状态等等。Quartz 中有两种存储任务的方式,一种在在内存,一种是在数据库。

RAMJobStore

Quartz 默认的 JobStore 是 RAMJobstore,也就是把任务和触发器信息运行的信息存储在内存中,用到了 HashMap、TreeSet、HashSet等等数据结构。

如果程序崩溃或重启,所有存储在内存中的数据都会丢失。所以我们需要把这些数 据持久化到磁盘。

JDBCJobStore

JDBCJobStore 可以通过 JDBC 接口,将任务运行数据保存在数据库中。

JDBC 的实现方式有两种,JobStoreSupport 类的两个子类:

  • JobStoreTX:在独立的程序中使用,自己管理事务,不参与外部事务。
  • JobStoreCMT:(Container Managed Transactions (CMT),如果需要容器管理事 务时,使用它。

Quartz的核心运行机制

以上只是梳理了Quartz的核心流程,列举了一些核心组件,通过一下几个方法作为源码入口:

// Scheduler
Scheduler scheduler = factory.getScheduler();
// 绑定关系是1:N
scheduler.scheduleJob(jobDetail, trigger);
scheduler.start();

从上图可以看到,Quartz的核心流程大致分为三个阶段:

  • 获取调度实例阶段

  • 通过getScheduler方法根据配置文件加载配置和初始化,创建线程池 ThreadPool(默认是SimpleThreadPool,用来执行Quartz调度任务),创建调度器 QuartzScheduler,创建调度线程 QuartzSchedulerThread,并将调度线程初始状态设置为暂停状态。

  • 绑定JobDetail和Trigger阶段

  • Scheduler将任务添加到JobStore中,如果是使用数据库存储信息,这时候会把任务持久化到Quartz核心表中,同时也会对实现JobListener的监听者通知任务已添加

  • 启动调度器阶段

  • Scheduler会调用QuartzScheduler的Start()方法,这时候会把调度线程从暂停切为启动状态,通知QuartzSchedulerThread正式干活。QuartzSchedulerThread会从SimpleThreadPool查看下有多少可用工作线程,然后找JobStore去拿下一批符合条件的待触发的Trigger任务列表,包装成FiredTriggerBundle。通过JobRunShellFactory创建FiredTriggerBundle的执行线程实例JobRunShell,然后把JobRunShell实例交给SimpleThreadPool的工作线程去执行。SimpleThreadPool会从可用线程队列拿出对应数量的线程,去调用JobRunShell的run()方法,此时会执行任务类的execute方法 : job.execute(JobExecutionContext context)。

获取调度实例阶段

加载配置和初始化调度器

StdSchedulerFactory.getScheduler

public Scheduler getScheduler() throws SchedulerException {
if (cfg == null) {
//加载quartz.properties 配置文件
initialize();
}
//调度仓库里维护着一个HashMap<String, Scheduler>,这里使用单例是为了全局共享
SchedulerRepository schedRep = SchedulerRepository.getInstance();
//实际上是从HashMap<String, Scheduler>里查找Scheduler,保证了调度器名称必须是唯一
Scheduler sched = schedRep.lookup(getSchedulerName());
//如果调度器已经存在
if (sched != null) {
if (sched.isShutdown()) {
//假如调度器是关闭状态,则从调度仓库的HashMap移除
schedRep.remove(getSchedulerName());
} else {
return sched;
}
}
//调度器不存在则要进行初始化
sched = instantiate();
return sched;
}

StdSchedulerFactory.instantiate

对调度器进行初始化工作

private Scheduler instantiate() throws SchedulerException {
//...省略...
//存储任务信息的 JobStore
JobStore js = null;
//线程池,默认是SimpleThreadPool
ThreadPool tp = null;
//核心调度器
QuartzScheduler qs = null;
//数据库连接器
DBConnectionManager dbMgr = null;
//ID生成器,用来自动生成唯一的instance id
String instanceIdGeneratorClass = null;
//线程执行器,默认为 DefaultThreadExecutor
ThreadExecutor threadExecutor;
//...省略...

创建线程池(SimpleThreadPool)

StdSchedulerFactory.instantiate

这里创建了线程池,默认是配置文件指定的SimpleThreadPool

//从配置中获取线程池类名,如果没,默认选用SimpleThreadPool作为线程池
String tpClass = cfg.getStringProperty(PROP_THREAD_POOL_CLASS, SimpleThreadPool.class.getName());
if (tpClass == null) {
initException = new SchedulerException(
"ThreadPool class not specified. ");
throw initException;
}
try {
//反射创建线程池
tp = (ThreadPool) loadHelper.loadClass(tpClass).newInstance();
} catch (Exception e) {
initException = new SchedulerException("ThreadPool class '"
+ tpClass + "' could not be instantiated.", e);
throw initException;
}

SimpleThreadPool

此时SimpleThreadPool在创建过程中,会初始化三个列表:

  • workers(总工作线程队列):存放所有的工作线程
  • availWorkers(可用工作线程队列) :存放可用于做任务的工作线程
  • busyWorkers(繁忙工作线程队列):存放已经占用的工作线程
private List<WorkerThread> workers;
private LinkedList<WorkerThread> availWorkers = new LinkedList<WorkerThread>();
private LinkedList<WorkerThread> busyWorkers = new LinkedList<WorkerThread>();

初始化线程池

StdSchedulerFactory.instantiate

在该方法下面有一行对该线程池进行初始化

if(tp instanceof SimpleThreadPool) {
if(threadsInheritInitalizersClassLoader)
((SimpleThreadPool)tp).setThreadsInheritContextClassLoaderOfInitializingThread(threadsInheritInitalizersClassLoader);
}
//调用线程池初始化方法
tp.initialize();

SimpleThreadPool.initialize

在该方法里,会开始创建工作线程(WorkerThread),用于后面的任务执行,真正执行任务的是WorkerThread的run()方法

//根据用户配置文件设置的线程数,来创建对应数量的工作线程
Iterator<WorkerThread> workerThreads = createWorkerThreads(count).iterator();
while(workerThreads.hasNext()) {
WorkerThread wt = workerThreads.next();
//激活每个工作线程
wt.start();
//放在可用线程队列等待被使用
availWorkers.add(wt);
}

创建核心调度器QuartzScheduler

StdSchedulerFactory.instantiate

这里创建核心调度器

//这里创建核心调度器,并且把QuartzSchedulerResources调度资源信息和idleWaitTime(调度器空闲等待的时间量)传进去,默认30秒
qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);

QuartzScheduler.QuartzScheduler

创建调度器时,会对调度器的成员变量进行初始化,这里还会创建调度线程QuartzSchedulerThread,它会负责把任务分配给线程池里的工作线程执行

public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)
//...省略...
//创建调度线程,resouces 里面有线程名称
this.schedThread = new QuartzSchedulerThread(this, resources);
//创建线程执行器 ,默认是DefaultThreadExecutor
ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
//这里线程执行器会调用QuartzSchedulerThread的run()方法
schedThreadExecutor.execute(this.schedThread);
//...省略...
}

QuartzSchedulerThread.QuartzSchedulerThread

调度线程在实例化的时候,会把调度线程控制变量paused=ture,是把调度线程暂停处理任务,halted=false是要把调度线程开始监听调度器控制变量paused,就是让调度线程开始运行但是不处理任务,等待被唤醒,下一步会提到

QuartzSchedulerThread(QuartzScheduler qs, QuartzSchedulerResources qsRsrcs, boolean setDaemon, int threadPrio) {
//...省略...
// start the underlying thread, but put this object into the 'paused'
// state
// so processing doesn't start yet...
paused = true;
halted = new AtomicBoolean(false);
}

QuartzSchedulerThread.run

上面提到,调度线程会被schedThreadExecutor执行,此时由于halted被设置为false,paused设置为true,此时调度线程run()方法并不会向下处理任务,等待被激活,这里会等到后面Scheduler调用start()才会真正被激活

public void run() {
int acquiresFailed = 0;
//这里!halted.get() = true,因此会向下执行
while (!halted.get()) {
try {
//sigLock是调度线程内的一个成员变量,用于控制线程并发
synchronized (sigLock) {
// 检查是否为暂停状态,此时paused && !halted.get() =false,会在这里循环等待,不会往下执行
while (paused && !halted.get()) {
try {
//暂停状态时,尝试去获得信号锁,使当前线程等待直到另一个线程调用,超时时间是1秒
sigLock.wait(1000L);
} catch (InterruptedException ignore) {
}
// 暂停时重置失败计数器,这样我们就不会取消暂停后再次等待
acquiresFailed = 0;
}
//这里为false,因此会直接跳出循环,不会向后执行任务
if (halted.get()) {
break;
}
//...省略...
}

绑定JobDetail和Trigger阶段

执行作业调度

StdScheduler.scheduleJob

public Date scheduleJob(JobDetail jobDetail, Trigger trigger)
throws SchedulerException {
//这里实际调用的是QuartzScheduler
return sched.scheduleJob(jobDetail, trigger);
}

QuartzScheduler.scheduleJob

public Date scheduleJob(JobDetail jobDetail,
Trigger trigger) throws SchedulerException {
//...省略...
//持久化JobDetail和trigger
resources.getJobStore().storeJobAndTrigger(jobDetail, trig);
//通知scheduler监听者
notifySchedulerListenersJobAdded(jobDetail);
notifySchedulerThread(trigger.getNextFireTime().getTime());
notifySchedulerListenersSchduled(trigger);
return ft;
}

启动调度器阶段

调用调度器启动方法

StdScheduler.start

StdScheduler只是代理类,实际上还是调用QuartzScheduler

public void start() throws SchedulerException {
//调用QuartzScheduler.start()方法
sched.start();
}

通知调度线程开始干活

QuartzScheduler.start

public void start() throws SchedulerException {
//...省略...
//通知Scheduler监听者任务开始启动
notifySchedulerListenersStarting();
//第一次启动,这里initialStart为空
if (initialStart == null) {
initialStart = new Date();
//这里将恢复任何失败或误触发的作业并根据需要清理数据存储,错过的任务会在这里重跑
this.resources.getJobStore().schedulerStarted();
startPlugins();
} else {
//如果initialStart不为空,意味着之前已经做过初始化,则把调度器状态恢复成运行中
resources.getJobStore().schedulerResumed();
}
//这里实际上让调度线程QuartzSchedulerThread开始执行任务,前面有提到调度线程虽然已经激活,但是由于Pause为true,因此它没办法处理任务,实际处于停止状态
schedThread.togglePause(false);
getLog().info(
"Scheduler " + resources.getUniqueIdentifier() + " started.");
//通知Scheduler监听者任务已经启动
notifySchedulerListenersStarted();
}

QuartzSchedulerThread.togglePause

//切换暂停状态    
void togglePause(boolean pause) {
synchronized (sigLock) {
paused = pause;
if (paused) {
//如果暂停,这里是要中断任何可能发生的睡眠,等待着被唤醒
signalSchedulingChange(0);
} else {
//唤醒在此对象监视器上等待的所有线程。
sigLock.notifyAll();
}
}
}

调度线程正式开始执行任务

QuartzSchedulerThread.run

这里由于上面一步已经把pause切换成false,因此调度线程的run()方法可以开始处理任务

//...省略...     
//由于pause已经被切换成flase,这里会跳出循环,线程会往下继续执行
while (paused && !halted.get()) {
try {
// wait until togglePause(false) is called...
sigLock.wait(1000L);
} catch (InterruptedException ignore) {
}
acquiresFailed = 0;
}
//...省略...
// 获取线程池可用线程数量
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
//可用线程数量>0才往下执行
if(availThreadCount > 0) {
List<OperableTrigger> triggers;
long now = System.currentTimeMillis();
clearSignaledSchedulingChange();
try {
// 获取需要下次执行的 triggers
// idleWaitTime: 默认 30s
// availThreadCount:获取可用(空闲)的工作线程数量,总会大于 1,因为该方法会一直阻塞, 直到有工作线程空闲下来。
// maxBatchSize:一次拉取 trigger 的最大数量,默认是 1
// batchTimeWindow:时间窗口调节参数,默认是 0
// misfireThreshold: 超过这个时间还未触发的 trigger,被认为发生了 misfire,默认 60s
// 调度线程一次会拉取 NEXT_FIRETIME 小于(now + idleWaitTime +batchTimeWindow),大 于(now - misfireThreshold)的,min(availThreadCount,maxBatchSize)个 triggers,默认情况下,会拉取未来 30s、 过去 60s 之间还未 fire 的 1 个 trigger
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
//...省略...
// set triggers to 'executing'
List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();
boolean goAhead = true;
synchronized(sigLock) {
goAhead = !halted.get();
}
if(goAhead) {
try {
// 触发 Trigger,把 ACQUIRED 状态改成 EXECUTING
// 如果这个 trigger 的 NEXTFIRETIME 为空,也就是未来不再触发,就将其状态改为 COMPLETE // 如果 trigger 不允许并发执行(即 Job 的实现类标注了@DisallowConcurrentExecution), 则将状态变为 BLOCKED,否则就将状态改为 WAITING
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
//...省略...
continue;
}
}
//循环处理trigger
for (int i = 0; i < bndles.size(); i++) {
//从trigger任务集合取出一个
TriggerFiredResult result = bndles.get(i);
//把trigger任务包装成TriggerFiredBundle
TriggerFiredBundle bndle = result.getTriggerFiredBundle();
//...省略...
JobRunShell shell = null;
try {
// 根据 trigger 信息实例化 JobRunShell(implements Runnable),同时依据 JOB_CLASS_NAME 实例化 Job,随后我们将 JobRunShell 实例丢入工作线。
shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
shell.initialize(qs);
} catch (SchedulerException se) {
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
continue;
}
//调用线程池的runInThread方法,实际上是调用JobRunShell的run()方法
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
//...省略...

SimpleThreadPool.runInThread

这里线程池开始从可用线程队列分配工作线程去处理JobRunShell的run()方法

public boolean runInThread(Runnable runnable) {
//...省略...
//假如线程没有关闭
if (!isShutdown) {
//从可用工作线程队列移除一条工作线程
WorkerThread wt = (WorkerThread)availWorkers.removeFirst();
//把工作线程加入到繁忙工作线程队列
busyWorkers.add(wt);
//执行JobRunShell的run方法
wt.run(runnable);
} else {
//加入线程池准备要关闭,开启一个线程池里没有的新工作线程
WorkerThread wt = new WorkerThread(this, threadGroup,
"WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable);
//加入到繁忙工作线程队列
busyWorkers.add(wt);
//工作线程队列加入该新工作线程
workers.add(wt);
//执行JobRunShell的run方法
wt.start();
}
//...省略...
return true;
}

JobRunShell 用来为 Job 提供安全的运行环境的,执行 Job 中所有的作业,捕获运行中的异常,在任务执行完毕的

时候更新 Trigger 状态,等等。

JobRunShell 实例是用 JobRunShellFactory 为 QuartzSchedulerThread 创建的,在调度器决定一个 Job 被触发的时候,它从线程池中取出一个线程来执行任务。

Quartz线程模型

  • SimpleThreadPool:包工头,管理所有 WorkerThread
  • WorkerThread:工人,把 Job 包装成 JobRunShell执行
  • QuartSchedulerThread:项目经理,获取即将触发的 Trigger,从问包工头拿一个空闲的 worker,执行 Trigger 绑定的任务

Quartz集群进程间如何通信

Quartz集群之间是通过数据库几张核心的Quartz表进行通信

表名 作用
QRTZ_BLOB_TRIGGERS Trigger 作为 Blob 类型存储
QRTZ_CALENDARS 存储 Quartz 的 Calendar 信息
QRTZ_CRON_TRIGGERS 存储 CronTrigger,包括 Cron 表达式和时区信息
QRTZ_FIRED_TRIGGERS 存储与已触发的 Trigger 相关的状态信息,以及相关 Job 的执行信息
QRTZ_JOB_DETAILS 存储每一个已配置的 Job 的详细信息
QRTZ_LOCKS 存储程序的悲观锁的信息
QRTZ_PAUSED_TRIGGER_GRPS 存储已暂停的 Trigger 组的信息
QRTZ_SCHEDULER_STATE 存储少量的有关 Scheduler 的状态信息,和别的 Scheduler 实例
QRTZ_SIMPLE_TRIGGERS 存储 SimpleTrigger 的信息,包括重复次数、间隔、以及已触的次数
QRTZ_SIMPROP_TRIGGERS 存储 CalendarIntervalTrigger 和 DailyTimeIntervalTrigger 两种类型的触发器
QRTZ_TRIGGERS 存储已配置的 Trigger 的信息

Quartz集群如何保证高并发下不重复跑

Quartz有多个节点同时在运行,而任务是共享的,这时候肯定存在资源竞争问题,容易造成并发问题,Quartz节点之间是否存在分布式锁去控制?

Quartz是通过数据库去作为分布式锁来控制多进程并发问题,Quartz加锁的地方很多,Quartz是使用悲观锁的方式进行加锁,让在各个instance操作Trigger任务期间串行,这里挑选核心的代码来看看它是符合利用数据库防止并发的。

使用数据库锁需要在quartz.properties中加以下配置,让集群生效Quartz才会对多个instance进行并发控制

org.quartz.jobStore.isClustered = true

QRTZ_LOCKS 表,它会为每个调度器创建两行数据,获取 Trigger 和触发 Trigger 是两把锁,加锁入口在JobStoreSupport类中,Quartz提供的锁表,为多个节点调度提供分布式锁,实现分布式调度,默认有2个锁

SCHED_NAME LOCK_NAME
Myscheduler STATE_ACCESS
Myscheduler TRIGGER_ACCESS

STATE_ACCESS主要用在scheduler定期检查是否失效的时候,保证只有一个节点去处理已经失效的scheduler;

TRIGGER_ACCESS主要用在TRIGGER被调度的时候,保证只有一个节点去执行调度

QuartzSchedulerThread.run

调度线程在获取下一个Trigger任务的时候,会在Quartz表加行级锁,入口在这

//...省略...
//
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
//...省略...

JobStoreSupport.acquireNextTriggers

public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)
throws JobPersistenceException {
//...省略...
//这里会进入加锁控制,lockName是锁的key
return executeInNonManagedTXLock(lockName,
new TransactionCallback<List<OperableTrigger>>() {
//...省略...

JobStoreSupport.executeInNonManagedTXLock

这里会进入非托管事务,加入lockName不为空,需要先获取锁才能执行事务回调方法和事务校验方法

protected <T> T executeInNonManagedTXLock(
String lockName,
TransactionCallback<T> txCallback, final TransactionValidator<T> txValidator) throws JobPersistenceException {
boolean transOwner = false;
Connection conn = null;
try {
if (lockName != null) {
//只要作为锁的key不为空,在这里就会调用JobStoreTx获取数据库连接
if (getLockHandler().requiresConnection()) {
conn = getNonManagedTXConnection();
}
//真正加锁的入口,通过LockHandler去调用DBSemaphore操作数据库获取锁
transOwner = getLockHandler().obtainLock(conn, lockName);
}
//...省略...

DBSemaphore.obtainLock

这里会通过执行两条SQL去向调用线程授予对已识别资源的锁定(阻塞)直到可用

public boolean obtainLock(Connection conn, String lockName)
throws LockException {
//...省略...
//判断当前调用线程是否对标识的资源持有锁,加入已经持有该锁,则直接跳过
if (!isLockOwner(lockName)) {
//通过调用StdRowLockSemaphore的executeSQL方法对expandedSQL, expandedInsertSQL对lockName进行加锁控制
executeSQL(conn, lockName, expandedSQL, expandedInsertSQL);
//...省略...
}

StdRowLockSemaphore.executeSQL

如果已经有lockName代表的行,直接加锁,如果没有插入。但是在加锁时或插入时有可能失败,失败则重试,重试如果超过一定次数就会直接抛出异常。这里是使用悲观锁的方式进行加锁

protected void executeSQL(Connection conn, final String lockName, final String expandedSQL, final String expandedInsertSQL) throws LockException {
//...省略...
ps = conn.prepareStatement(expandedSQL);
//...省略...
ps.setString(1, lockName);
//先执行查询,看看表里是否已经有该存在
rs = ps.executeQuery();
//...省略...
// 如果查询结果不为空
if (!rs.next()) {
ps.setString(1, lockName);
//
int res = ps.executeUpdate();
//...省略...
return; // obtained lock, go
}

这两条SQL是在DBSemaphore初始化的时候塞进来的

public DBSemaphore(String tablePrefix, String schedName, String defaultSQL, String defaultInsertSQL) {
this.tablePrefix = tablePrefix;
this.schedName = schedName;
setSQL(defaultSQL);
setInsertSQL(defaultInsertSQL);
}

再看看调用链会发现,这两条SQL是在StdRowLockSemaphore初始化的时候调用父类DBSemaphore构造方法传进来,分别是selectWithLockSQL和SELECT_FOR_LOCK

public StdRowLockSemaphore(String tablePrefix, String schedName, String selectWithLockSQL) {
super(tablePrefix, schedName, selectWithLockSQL != null ? selectWithLockSQL : SELECT_FOR_LOCK, INSERT_LOCK);
}

两条SQL分别是:

public static final String SELECT_FOR_LOCK = "SELECT * FROM "
+ TABLE_PREFIX_SUBST + TABLE_LOCKS + " WHERE " + COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST
+ " AND " + COL_LOCK_NAME + " = ? FOR UPDATE";
public static final String INSERT_LOCK = "INSERT INTO "
+ TABLE_PREFIX_SUBST + TABLE_LOCKS + "(" + COL_SCHEDULER_NAME + ", " + COL_LOCK_NAME + ") VALUES ("
+ SCHED_NAME_SUBST + ", ?)";

把参数替换进去就比较清晰可以看到,Quartz通过在qrtz_LOCKS表对当前schedule job 加两个行级锁

expandedSQL:select * from QRTZ_LOCKS t where t.lock_name='TRIGGER_ACCESS' for update
expandedInsertSQL:INSERT INTO qrtz_LOCKS(SCHED_NAME, LOCK_NAME) VALUES ('MySchedule', 'TRIGGER_ACCESS')

Quartz集群如何保证高并发下不漏跑

有时候Quartz可能会错过我们的调度任务:

  • 服务重启,没能及时执行任务,就会misfire
  • 工作线程去运行优先级更高的任务,就会misfire
  • 任务的上一次运行还没结束,下一次触发时间到达,就会misfire

Quartz可提供了一些补偿机制应对misfire情况,用户可以根据需要选择对应的策略,这里挑选常用的cronTrigger作为示例

  • withMisfireHandlingInstructionDoNothing

  • 不触发立即执行

  • 等待下次Cron触发频率到达时刻开始按照Cron频率依次执行

  • withMisfireHandlingInstructionIgnoreMisfires

  • 以错过的第一个频率时间立刻开始执行

  • 重做错过的所有频率周期后当下一次触发频率发生时间大于当前时间后,再按照正常的Cron频率依次执行

  • withMisfireHandlingInstructionFireAndProceed(默认)

  • 以当前时间为触发频率立刻触发一次执行,然后按照Cron频率依次执行

假如用户没有设置Misfire指令,Quartz默认指定MISFIRE_INSTRUCTION_SMART_POLICY作为默认策略,在Trigger接口的getMisfireInstruction源码可以看到:

/**
* Get the instruction the <code>Scheduler</code> should be given for
* handling misfire situations for this <code>Trigger</code>- the
* concrete <code>Trigger</code> type that you are using will have
* defined a set of additional <code>MISFIRE_INSTRUCTION_XXX</code>
* constants that may be set as this property's value.
*
* <p>
* If not explicitly set, the default value is <code>MISFIRE_INSTRUCTION_SMART_POLICY</code>.
* </p>
*
* @see #MISFIRE_INSTRUCTION_SMART_POLICY
* @see SimpleTrigger
* @see CronTrigger
*/
public int getMisfireInstruction();

这里继续以CronTrigger举例,其他类型Trigger也类似 。如果是默认策略MISFIRE_INSTRUCTION_SMART_POLICY,在CronTrigger会选用MISFIRE_INSTRUCTION_FIRE_ONCE_NOW,该策略的特点是立刻执行一次,然后后面的任务就按照正常的计划执行。

@Override
public void updateAfterMisfire(org.quartz.Calendar cal) {
int instr = getMisfireInstruction();
if(instr == Trigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY)
return;
if (instr == MISFIRE_INSTRUCTION_SMART_POLICY) {
instr = MISFIRE_INSTRUCTION_FIRE_ONCE_NOW;
}
if (instr == MISFIRE_INSTRUCTION_DO_NOTHING) {
Date newFireTime = getFireTimeAfter(new Date());
while (newFireTime != null && cal != null
&& !cal.isTimeIncluded(newFireTime.getTime())) {
newFireTime = getFireTimeAfter(newFireTime);
}
setNextFireTime(newFireTime);
} else if (instr == MISFIRE_INSTRUCTION_FIRE_ONCE_NOW) {
setNextFireTime(new Date());
}
}

Quartz对于misfire任务大致处理流程

  • QuartzScheduler.start()启动调度
  • JobStoreSupport.schedulerStarted()执行启动调度方法
  • 创建和初始化misfireHandler
  • 异步执行misfireHandler.run方法处理misfire任务
  • MisfileHandler通过JobStoreSupport去查询有没有misfire的任务,查询条件是当前状态是waiting,下一次trigger时间< 当前时间-misfire预设阈值(默认1分钟)
int misfireCount = (getDoubleCheckLockMisfireHandler()) ?
getDelegate().countMisfiredTriggersInState(
conn, STATE_WAITING, getMisfireTime()) :
Integer.MAX_VALUE;
String COUNT_MISFIRED_TRIGGERS_IN_STATE = "SELECT COUNT("
+ COL_TRIGGER_NAME + ") FROM "
+ TABLE_PREFIX_SUBST + TABLE_TRIGGERS + " WHERE "
+ COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST + " AND NOT ("
+ COL_MISFIRE_INSTRUCTION + " = " + Trigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY + ") AND "
+ COL_NEXT_FIRE_TIME + " < ? "
+ "AND " + COL_TRIGGER_STATE + " = ?";
protected long getMisfireTime() {
long misfireTime = System.currentTimeMillis();
if (getMisfireThreshold() > 0) {
//当前时间减去misfire预设阈值,阈值默认一分钟
misfireTime -= getMisfireThreshold();
}
return (misfireTime > 0) ? misfireTime : 0;
}
  • JobStoreSupport通过StdRowLockSemaphore 去获取TRIGGER_ACCESS锁
  • 查询所有misfire任务,查询条件:status=waiting,current_time-next_fire_time>misfireThreshold(可配置,默认1分钟)【即实际触发时间-预计触发时间大于容忍度时间】,获取misfired的trigger,maxToRecoverAtATime默认一个事务中只能最大有20个misfired trigger(可配置)
  • 通过updateAfterMisfired方法获取misfired的策略(默认是MISFIRE_INSTRUCTION_SMART_POLICY该策略在CronTrigger中为MISFIRE_INSTRUCTION_FIRE_ONCE_NOW),根据策略设置nexFireTime。
  • 将nextFireTime等更新或者插入到trigger表;
  • 提交事务,释放锁

Quartz默认任务锁机制

Quartz是否一定会加锁?什么情况下不会加锁?应该怎么避免并发问题?

什么情况下不会加锁?

回到JobStoreSupport 的 acquireNextTriggers()方法,可以看到当isAcquireTriggersWithinLock()为true或者maxCount>1才会加锁,否则lockName为空

public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)
throws JobPersistenceException {
String lockName;
if(isAcquireTriggersWithinLock() || maxCount > 1) {
lockName = LOCK_TRIGGER_ACCESS;
} else {
lockName = null;
}
return executeInNonManagedTXLock(lockName,
new TransactionCallback<List<OperableTrigger>>() {
public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException {
return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);
}
},
new TransactionValidator<List<OperableTrigger>>() {
//..省略..
}
});
}
protected <T> T executeInNonManagedTXLock(
String lockName,
TransactionCallback<T> txCallback, final TransactionValidator<T> txValidator) throws JobPersistenceException {
boolean transOwner = false;
Connection conn = null;
try {
if (lockName != null) {
// If we aren't using db locks, then delay getting DB connection
// until after acquiring the lock since it isn't needed.
if (getLockHandler().requiresConnection()) {
conn = getNonManagedTXConnection();
}
transOwner = getLockHandler().obtainLock(conn, lockName);
}
//..省略...
}

Quartz 加锁的条件有以下两个:

  • 如 果 acquireTriggersWithinLock=true 或 者 batchTriggerAcquisitionMaxCount>1 时 , lockName 赋 值 为

LOCK_TRIGGER_ACCESS,此时获取 Trigger 会加锁。

  • 否则,如果 isAcquireTriggersWithinLock()值是 false 并且 maxCount=1 的话,lockName 赋值为 null,这种情况获取 Trigger下不加锁。

那这两个参数的默认值是什么?

acquireTriggersWithinLock 变量默认是 false

private boolean acquireTriggersWithinLock = false;

maxCount 来自 QuartzSchedulerThread

triggers = qsRsrcs.getJobStore().acquireNextTriggers( now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());

getMaxBatchSize()来自 QuartzSchedulerResources,代表 Scheduler 一次拉取

trigger的最大数量,默认是 1

org.quartz.scheduler.batchTriggerAcquisitionMaxCount=1

什么情况下需要加锁?

QuartzSchedulerThread 的 triggersFired()方法

List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);

调用了 JobStoreSupport 的 triggersFired()方法,接着又调用了triggerFired(Connection conn, OperableTrigger trigger)方法:

public List<TriggerFiredResult> triggersFired(final List<OperableTrigger> triggers) throws JobPersistenceException {
return executeInNonManagedTXLock(LOCK_TRIGGER_ACCESS,
new TransactionCallback<List<TriggerFiredResult>>() {
public List<TriggerFiredResult> execute(Connection conn) throws JobPersistenceException {
List<TriggerFiredResult> results = new ArrayList<TriggerFiredResult>();
TriggerFiredResult result;
for (OperableTrigger trigger : triggers) {
try {
//触发
TriggerFiredBundle bundle = triggerFired(conn, trigger);
result = new TriggerFiredResult(bundle);
//...省略...
protected TriggerFiredBundle triggerFired(Connection conn,
OperableTrigger trigger)
throws JobPersistenceException {
JobDetail job;
Calendar cal = null;
// Make sure trigger wasn't deleted, paused, or completed...
try { // if trigger was deleted, state will be STATE_DELETED
String state = getDelegate().selectTriggerState(conn,
trigger.getKey());
if (!state.equals(STATE_ACQUIRED)) {
return null;
}
//...省略...

如果 Trigger 的状态不是 ACQUIRED,也就是说被其他的线程 fire了,返回空。但是这种乐观锁的检查在高并发下难免会出现 ABA的问题,比如线程 A 拿到的时候还是 ACQUIRED 状态,但是刚准备执行的时候已经变成了 EXECUTING状态,这个时候就会 出现重复执行的问题。

把执行步骤拆解下,比较容易看到该问题:

推荐

如果设置的数量为 1(默认值),并且使用 JDBC JobStore(RAMJobStore 不支持 分 布 式 , 只 有 一 个 调 度 器 实 例 , 所 以 不 加 锁 ) , 则 属 性 org.quartz.jobStore.acquireTriggersWithinLock 应设置为 true。否则不加锁可能会导致任务重复执行。

org.quartz.scheduler.batchTriggerAcquisitionMaxCount=1 org.quartz.jobStore.acquireTriggersWithinLock=true

Quartz常见问题

服务器始终不一致问题

常见异常:

This scheduler instance (SchedulerName) is still active but was recovered by another instance in the cluster

解决:

同步所有集群节点的时间然后重启服务

Quartz集群负载不均衡

Quartz集群是采用抢占式加锁方式去处理任务,因此你会看到每个节点的任务处理日志并不是均衡分配的,很可能一个节点会抢占大量任务导致负载过重,但是这一点官方并没有解决。

错过预定触发时间

常见异常:

Handling 1 trigger(s) that missed their scheduled fire-time

解决:

很可能是你线程数设置太少,而任务执行时间太长,超过的misfire阈值,导致线程池没有可用线程而错过了触发事件。尝试把配置文件线程数调大org.quartz.threadPool.threadCount 或者把misfire阈值调大org.quartz.jobStore.misfireThreshold

原文: https://www.cnblogs.com/evan-liang/p/14939182.html

暂无评论