1 分钟阅读

简介

从使用开始讲起

简单使用

如何使用分为两个部分:quartz独立使用,和Spring结合使用。之所以分开,是因为spring的存在,虽然增加了易用性,但掩盖了大量细节,影响了我们对程序的直观感觉。quartz独立使用的例子可以参见 [深入解读Quartz的原理][],基本流程就是

  1. 创建Job
  2. 创建Trigger
  3. 创建Scheduler(工厂模式),scheduleJob(jobDetail, strigger)
  4. 最后,Scheduler.start() scheduler.shutdown(true),quartz就开始工作了。
SchedulerFactory schedulerFactory = new StdSchedulerFactory();
Scheduler sched = schedulerFactory.getScheduler();
sched.start();

JobDetail jobDetail=  JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName).build();
CronTrigger trigger =  TriggerBuilder.newTrigger().withIdentity(triggerName, triggerGroupName).withSchedule(CronScheduleBuilder.cronSchedule(cron)).build();
sched.scheduleJob(jobDetail, trigger);

手动代码 可以用于 job 数量不确定(即用户手动提交一个定时任务)的场景。

与spring整合

quartz和spring的结合也非常的简单,上述第一步到第三步可由配置文件代替,第4步中的Scheduler则随着spring容器的启动而启动,停止而停止。Spring对程序的介入几乎没有,开发人员只要告诉配置文件什么时间运行哪个类的哪个方法即可。

<bean id="myJob" class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean">
    <property name="targetObject" ref="myJob" />                    //执行类的实例
    <property name="targetMethod" value="run" />                    //执行方法
</bean> 
<bean id="cronTrigger" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean"> 
    <property name="jobDetail" ref="myJob" />                           //上面任务的Task配置bean
    <property name="cronExpression" value="0 */1 * * * ?" />            //触发时机表达式  cron表达式在文章的最末尾会说
</bean> 
    <bean id="schedulerFactoryBean" class="org.springframework.scheduling.quartz.SchedulerFactoryBean" autowire="no">
    <property name="triggers">
        <list>
            <ref bean="cronTrigger" />                           //上面配置的触发器
        </list>
    </property>
</bean> 

MethodInvokingJobDetailFactoryBean 是一个JobDetail 的FactoryBean

Spring与其它框架的结合,往往从代码上改变了框架的使用“感觉”。其实,spring的本质是ioc(及其基础上的aop),spring为框架提供的“方便”主要是ioc提供的,包括bean的生成,生命周期的管理(比如quartz的scheduler随着ioc容器的启动而启动,shutdown而shutdown)等,并不会改变框架(所提供类的)的使用方式(即一些接口方法的调用)。

源码分析

Date scheduleJob(JobDetail jobDetail,Trigger trigger){
    // 数据判空等
    // 关键操作
    resources.getJobStore().storeJobAndTrigger(jobDetail, trig);
    notifySchedulerListenersJobAdded(jobDetail);
    notifySchedulerThread(trigger.getNextFireTime().getTime());
    notifySchedulerListenersSchduled(trigger);
    // 返回值
}
protected void notifySchedulerThread(long candidateNewNextFireTime) {
    if (isSignalOnSchedulingChange()) {
        signaler.signalSchedulingChange(candidateNewNextFireTime);
    }
}

整体逻辑

  1. 数据容器/队列:QuartzSchedulerResources基本封装了quartz运行的基本数据
  2. 生产者
    1. Scheduler本身不执行任务,只是将job和trigger存入到QuartzSchedulerResources中,并向QuartzSchedulerThread发送信号
    2. 一个QuartzSchedulerThread不断的检查 job 状态,触发最近的下一个任务(立即)执行。一些高级策略 任务调度:时间轮算法经典案例解析及应用实现
  3. 消费者: ThreadPool 执行 job

signaler是一个SchedulerSignaler接口,其实现类SchedulerSignalerImpl有一个构造方法SchedulerSignalerImpl(QuartzScheduler sched, QuartzSchedulerThread schedThread),它保有了一个QuartzSchedulerThread引用。这里的线程通信,只是一个线程保有了另一个线程的引用。QuartzScheduler.scheduleJob ==> notifySchedulerThread ==> signaler.signalSchedulingChange ==> schedThread.signalSchedulingChange(candidateNewNextFireTime)

public class QuartzSchedulerThread extends Thread {
    private QuartzSchedulerResources qsRsrcs;
    public void run() {
        while (!halted.get()) {
            int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
            if(availThreadCount > 0){
                triggers = qsRsrcs.getJobStore().acquireNextTriggers(...)
                // if triggers is not empty
                now = System.currentTimeMillis();
                long triggerTime = triggers.get(0).getNextFireTime().getTime();
                long timeUntilTrigger = triggerTime - now;
                while(timeUntilTrigger > 2) {
                    // 停一下   sigLock.wait(timeUntilTrigger);
                }
                bndles = qsRsrcs.getJobStore().triggersFired(triggers);
                for (int i = 0; i < bndles.size(); i++) {
                    TriggerFiredBundle bndle = ...
                    shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
                    shell.initialize(qs);
                    qsRsrcs.getThreadPool().runInThread(shell)
                }
            }else{
                continue;
            }
        }
    }
}

QuartzSchedulerThread 主要逻辑

  1. 找下一个要触发的trigger
  2. 等着这个trigger 时间到了 可以运行
  3. 根据trigger拿到相应的组件TriggerFiredBundle,触发ThreadPool 执行相应任务(使用JobRunShell实际执行),QuartzSchedulerThread本身不管。

分布式定时(未完成)

问题描述:一个数据库表记录有不同的状态值,定时从中拉取符合条件的状态值的记录,处理(调用其它业务的rpc,进行数据的增删改),然后更新数据库。

对于一个定时任务,单机环境存在负载有限及可靠性问题。

在集群环境中,同样的定时任务,在集群中的每台机器都会执行,这样定时任务就会重复执行,不但会增加服务器的负担,还会因为定时任务重复执行造成额外的不可预期的错误(对同一个增加rpc操作进行多次调用)

基于spring+quartz的分布式定时任务框架

Quartz应用与集群原理分析

在分布式定时任务中(或者集群),同一时刻只会有一个定时任务运行。

那如何做到,一会儿定时任务这个机器上运行,一会儿在那个机器上运行呢?

标签:

分类:

更新时间:

留下评论