跳至主要內容

Quartz 使用

Steven大约 5 分钟javaschedule

Quartz 由 OpenSymphony 开源组织发起的项目,该项目 2009 年被 Terracotta 收购。 Quartz 完全由 Java 编写, 专注于 “任务调度库”(job scheduling) 功能搭建,可以用来管理上万个 jobs 复杂程序。

官网: http://www.quartz-scheduler.org/open in new window
文档: https://www.quartz-scheduler.org/documentation/open in new window
API: https://www.quartz-scheduler.org/api/2.3.0/index.htmlopen in new window

特点:

  • 支持多种任务调度方法
  • 支持多种调度数据存储方式
  • 支持分布式和集群能力
  • 易于与 Spring 集成(Quartz 是 Spring 默认的调度框架)

概念:

  • 任务详情(JobDetail)

    • Job 实现: 开发者根据业务需要,实现 org.quartz.job 接口的类
    • Job 相关类:
      • JobBuilder
      • JobDataMap

    相关信息

    为什么设计成 JobDetail + Job,不直接使用 Job? Sheduler 每次执行,都会根据 JobDetail 创建一个新的 Job 实例,这样就可以规避 “并发访问” 的问题。

  • 触发器(Trigger) —— 触发规则 💡 任务:触发器=1:n

    • Trigger 实现包括:
      • SimpleTrigger: 延时任务/定时任务
      • CronTrigger: 使用 cron 表达式定义触发任务的时间规则
      • DateIntervalTrigger: 日期周期的规则触发器
      • NthIncludedDateTrigger: 排除指定时间周期和日期的触发器
      • DailyTimeIntervalTrigger
      • CalendarIntervalTrigger
    • Trigger 相关类:
      • JobDataMap
      • TriggerBuilder
      • ScheduleBuilder
  • 调度器(Scheduler) —— 将 “任务” 和 “触发器” 关联

    • Scheduler 实现包括:
      • RemoteScheduler
      • StdSchduler
    • Scheduler 状态:
      • start
      • stop
      • pause
      • resume
    • Scheduler 相关类:
      • SchedulerFactory: 用于创建 Scheduler
        • StdSchedulerFactory: properties 配置
        • DirecttSchedulerFactory: 通过代码配置
      • JobStore: 存储运行时信息,包括 Trigger/Scheduler/JobDetail/业务锁等 | 默认只在 Job 被添加到调度程序(Scheduler,任务执行计划表)的时候,存储一次关于该任务的状态信息数据
        • RAMJobStore: 内存存储任务调度状态
        • JDBCJobStore: 通过数据库持久化任务调度状态 【持久化、集群】
          • JobStoreTX: JDBC 事务由 Quartz 管理
          • JobStoreCMT: JDBC 使用容器实务
          • ClusteredJobStore: 集群实现
          • TerracottaJobStore: Terracotta 中间件
      • ThreadPool
        • SimpleThreadPool
        • 自定义线程池
  • 监听器(Listener)

任务调用:间隔 SimpleTrigger

SimpleTrigger 是比较简单的一类触发器,支持场景:

  • 在指定时间内执行一次任务 —— 设定开始时间,不设定循环,(设定结束时间
  • 在指定时间内执行多次任务(可指定任务执行次数/任务执行时间段) —— 设定开始时间,设定循环(间隔/次数),(设定结束时间
测试类
package org.example.job;

import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.quartz.*;

import java.util.concurrent.TimeUnit;

@Slf4j
public class SimpleJobTest extends AbstractSimpleJobTest {
    final static String KEY_LOGO = "logo";
    final static String MSG_LOGO_01 = "hello~";
    final static String MSG_LOGO_02 = "trigger-hello~";
    final static String MSG_TRIGGER_NAME = "triggerName01";
    final static String MSG_TRIGGER_GROUP = "triggerGroup01";
    // 计数器
    private static volatile int COUNT = 0;
    // 计数器跨步
    private static final int STEP = 2;

    @Test
    void test() throws SchedulerException {
        startSchedule(SimpleJobTest.HelloJob.class,
                jobBuilder -> {
                    jobBuilder.withIdentity(getJobName(), MSG_JOB_GROUP);
                    jobBuilder.usingJobData(KEY_LOGO, MSG_LOGO_01);// 向任务传递参数
                },
                triggerBuilder -> {
                    triggerBuilder.withIdentity(MSG_TRIGGER_NAME, MSG_TRIGGER_GROUP);
                    triggerBuilder.usingJobData(KEY_LOGO, MSG_LOGO_02); // 向任务传递参数 | 若 jobDetail 中也有定义,优先使用 trigger 定义
                    // 计数
                    triggerBuilder.usingJobData("step", STEP); // 向任务传递参数
                    triggerBuilder.startNow(); // 马上调用一次,这次不计入 repeatCount 重复次数里
                    triggerBuilder.withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(INTERVAL_SECOND).withRepeatCount(REPEAT));
                });
        // 校验运行结果
        Assertions.assertEquals(STEP * (REPEAT+1), COUNT);
    }

    private static String getJobName() {
        return MSG_JOB_NAME + "-" + SimpleJobTest.class.getName();
    }

    /**
     * 编写任务执行逻辑
     */
    // @DisallowConcurrentExecution
    public static class HelloJob implements Job {
        private static Integer prevHashCode = null;
        /**
         * trigger 传值或者 jobDetail 传值
         */
        @Setter
        private String logo = "HelloJob";

        /**
         * @param jobExecutionContext 获取调度器调用 job 时传递的参数,如 JobDetail
         */
        @Override
        public void execute(JobExecutionContext jobExecutionContext) {
            // job detail
            JobDetail jobDetail = jobExecutionContext.getJobDetail();
            Assertions.assertEquals(getJobName(), jobDetail.getKey().getName());
            Assertions.assertEquals(MSG_JOB_GROUP, jobDetail.getKey().getGroup());

            // trigger
            Trigger trigger = jobExecutionContext.getTrigger();
            Assertions.assertEquals(MSG_TRIGGER_NAME, trigger.getKey().getName());
            Assertions.assertEquals(MSG_TRIGGER_GROUP, trigger.getKey().getGroup());

            // data map 优先级
            Assertions.assertEquals(MSG_LOGO_01, jobDetail.getJobDataMap().getString("logo"));
            Assertions.assertEquals(MSG_LOGO_02, trigger.getJobDataMap().getString("logo"));
            Assertions.assertEquals(MSG_LOGO_02, jobExecutionContext.getMergedJobDataMap().getString("logo"));
            Assertions.assertEquals(MSG_LOGO_02, this.logo); // 优先使用 trigger 传入数值

            // count
            int triggerStep = trigger.getJobDataMap().getInt("step");
            log.info("{} {} context: count={}, 本次执行={}, 下次执行={}",
                    this.logo,
                    this.hashCode() + // 不同
                            "/" + jobExecutionContext.hashCode() + // 不同
                            "/" + jobExecutionContext.getJobDetail().hashCode() + // ❗相同
                            "/" + jobExecutionContext.getTrigger().hashCode() + // ❗相同
                            "/" + jobExecutionContext.getJobDetail().getJobDataMap().hashCode() + // ❗相同
                            "/" + jobExecutionContext.getTrigger().getJobDataMap().hashCode() // ❗相同
                    ,
                    COUNT+=triggerStep,
                    jobExecutionContext.getFireTime(),
                    jobExecutionContext.getNextFireTime());

            // hashcode
            Assertions.assertNotEquals(prevHashCode, this.hashCode()); // 每次创建新的对象
            prevHashCode = this.hashCode();

            // 制造阻塞,观察异步情况 —— 默认异步执行,下一个任务不会被当前阻塞影响
            try {
                TimeUnit.SECONDS.sleep(2*INTERVAL_SECOND);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

任务调用:日期 CronSchedule

package org.example.job;

import lombok.extern.slf4j.Slf4j;
import org.example.listener.MyJobListener;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.quartz.*;

import java.util.Calendar;
import java.util.function.Consumer;

@Slf4j
public class CronJobTest extends AbstractSimpleJobTest {
    @Test
    void test() throws SchedulerException {
        CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule("0/1 * * * * ?");
        MyJobListener myJobListener = new MyJobListener();
        startSchedule(HelloJob.class,
                jobBuilder -> {},
                triggerTriggerBuilder -> {
                    triggerTriggerBuilder.withSchedule(cronScheduleBuilder);
                    Calendar calender = Calendar.getInstance();
                    calender.add(Calendar.SECOND, REPEAT);
                    triggerTriggerBuilder.endAt(calender.getTime());
                },
                schedulerContext -> {
                    try {
                        // add listen
                        schedulerContext.getScheduler().getListenerManager().addJobListener(myJobListener);
                    } catch (SchedulerException e) {
                        throw new RuntimeException(e);
                    }
                    schedulerContext.startJobSchedule();
                });
        Assertions.assertEquals(2 * (REPEAT + 1), myJobListener.getCounter().get());
    }

    public static class HelloJob implements Job {
        @Override
        public void execute(JobExecutionContext context) throws JobExecutionException {
            log.info("hello~");
        }
    }
}

任务错过处理 misfire

任务调度错过情况:

  • 调度器(scheduler)设置任务/触发器 pause
  • 线程池资源资源

Misfire 策略:

  • SimpleTrigger:间隔
    • MISFIRE_INSTRUCTION_SMART_POLICY (默认)
    • MISFIRE_INSTRUCTION_FIRE_NOW
    • MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY —— 立即执行(错过的)所有任务
    • MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_EXISTING_REPEAT_COUNT —— (如果有错过任务)立即执行一次,按原来间隔执行下次,执行总数不变(即:将所有任务推迟到恢复后执行)
    • MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_REMAINING_REPEAT_COUNT —— (如果有错过任务)立即执行一次,按原来间隔执行下次,忽略错过的执行次数(即:将所有错过的任务丢弃)
    • MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_EXISTING_COUNT
    • MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_REMAINING_COUNT
  • CronTrigger
    • MISFIRE_INSTRUCTION_DO_NOTHING —— 忽略错过任务
    • MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY —— 立即执行(错过的)所有任务,其他任务按计执行
    • MISFIRE_INSTRUCTION_ONE_NOW —— 立即执行一个任务,其他任务按计划执行

任务线程异步

Job 间掉调度,默认情况是异步执行的 —— 即前一个任务的线程状态不影响下一个任务的调用。 但我们也可以通过在类上添加 @DisallowConcurrentExecution 注解关闭 Job 异步调用。

todo demo

任务状态持久化

注意

(默认)无状态任务: 每次调度器执行 job 时,会创建新的 job 实例,然后调用 execute 方法,然后关联的 job 对象实例释放(可被 gc)。 💡 每次创建新实例能避免并发问题

状态任务: 通过添加 @PersistJobDataAfterExecution 注解,让 job 的 data 持久化,下一个 job 依然能获取之前持久化的数据 (持久化 JobDetail 数据,但没有持久化 Trigger 数据)

package org.example.job;

import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.quartz.*;

import java.util.HashMap;
import java.util.Map;

@Slf4j
public class SimpleJobStateTest extends AbstractSimpleJobTest {
    private static final String KEY_COUNTER = "counter";
    @Test
    void testPersist() throws SchedulerException {
        Map<String, Object> map = new HashMap<>(); // 存储数据
        startSchedule(PersistJob.class, map);
        Assertions.assertEquals(REPEAT+1, map.get(KEY_COUNTER));
    }

    @Test
    void testTransient() throws SchedulerException {
        Map<String, Object> map = new HashMap<>(); // 存储数据
        startSchedule(TransientJob.class, map);
        Assertions.assertEquals(1, map.get(KEY_COUNTER));
    }

    private void startSchedule(Class<? extends Job> clazz, Map<String, Object> map) throws SchedulerException {
        startSchedule(clazz,
                jobBuilder -> {
                    JobDataMap jobDataMap = new JobDataMap();
                    jobDataMap.put(KEY_COUNTER, map);
                    jobBuilder.usingJobData(jobDataMap);
                },
                triggerTriggerBuilder -> {
                    triggerTriggerBuilder.withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(1).withRepeatCount(REPEAT));
                }
        );
    }

    /**
     * 有状态
     */
    @PersistJobDataAfterExecution // 持久化 JobDataMap 数据
    @DisallowConcurrentExecution // 通常同 @PersistJobDataAfterExecution 一起使用,避免当同一个 job(JobDetail) 的两个实例被并发执行时,由于竞争导致 JobDataMap 中存储的数据不确定的
    public static class PersistJob extends TransientJob {}

    /**
     * 无状态
     */
    @Getter
    @Setter
    public static class TransientJob implements Job {
        private String logo = "TransientJob";
        private int count = 0;
        @Override
        public void execute(JobExecutionContext context) throws JobExecutionException {
            log.info("{} {} context: count={}, 本次执行={}, 下次执行={}",
                    this.logo,
                    this.hashCode() + // 不同
                            "/" + context.hashCode() + // 不同
                            "/" + context.getJobDetail().hashCode() + // ❗相同
                            "/" + context.getTrigger().hashCode() + // ❗相同
                            "/" + context.getJobDetail().getJobDataMap().hashCode() + // ❗相同
                            "/" + context.getTrigger().getJobDataMap().hashCode() // ❗相同
                    ,
                    ++count,
                    context.getFireTime(),
                    context.getNextFireTime());
            context.getJobDetail().getJobDataMap().put("count", count); // 💡光改属性值还不行,还要手动插入到 Map 中,否则不被持久化
            Map<String, Object> map = (Map<String, Object>) context.getJobDetail().getJobDataMap().get(KEY_COUNTER);
            map.put(KEY_COUNTER, count);
        }
    }
}

监听器 listener

接口:

  • JobListener —— job 开始前/job 完成后/job 失效
  • TriggerListener ——
  • SchedulerListener

JobListener

scheduler.getListenerManager().addJobListener(new MyJobListener());
scheduler.getListenerManager().addJobListener(new MyJobListener(), GroupMatcher.jobGroupEquals("group1"));
scheduler.getListenerManager().addJobListener(new MyJobListener(), KeyMatcher.keyEquals(new JobKey("job1", "group1")));
package org.example.listener;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.JobListener;

import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
@Getter
public class MyJobListener implements JobListener {
    private final AtomicInteger counter = new AtomicInteger(0);
    @Override
    public String getName() {
        return getClass().getName();
    }

    @Override
    public void jobToBeExecuted(JobExecutionContext context) {
        log.info("监听:任务开始前 {}", counter.incrementAndGet());
    }

    @Override
    public void jobExecutionVetoed(JobExecutionContext context) {
        log.info("监听:任务被取消 {}", counter.incrementAndGet());
    }

    @Override
    public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) {
        log.info("监听:任务执行后 {}", counter.incrementAndGet());
    }
}

调度持久化 JobStore

JobStore 负责存储调度器的 “工作数据”: 任务(Job)、触发器(Trigger)、数据(JobDataMap)

jdbc

todo

配置 prperties

https://www.quartz-scheduler.org/documentation/quartz-2.3.0/configurationopen in new window

org.quartz.scheduler.instanceName: CustomizeQuartzScheduler
org.quartz.scheduler.rmi.export: false
org.quartz.scheduler.rmi.proxy: false
org.quartz.scheduler.rmi.wrapJobExecutionInUserTransaction: false

org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount: 3
org.quartz.threadPool.threadPriority: 5
org.quartz.threadPool.threadInheritContextClassLoaderOfInitializingThread: true

org.quartz.jobStore.misfireThreadhold: 6000

org.quartz.jobStore.class: org.quartz.simpl.RAMJobStore

插件

# ===================
# Configure Plugins
# ===================

org.quartz.plugin.triggHistory.class: org.quartz.plugins.history.LoggingJobHistoryPlugin
org.quartz.plugin.jobInitializer.class: org.quartz.plugins.xml.XMLSchedulingDataProcessorPlugin
org.quartz.plugins.jobInitializer.fileNames: quartz_data.xml
org.quartz.plugins.jobInitializer.failOnFileNoFound: true
org.quartz.plugins.jobInitializer.scanInterval: 120
org.quartz.plugins.jobInitializer.wrapInUserTransaction: false

集群

Quartz 可单独使用,可在集成在服务器内参数实务,可集成在集群中负责平衡和故障转移。

todo

集成 spring

从 springboot 2.4.10 开始,添加 quartz 的 maven 依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.example</groupId>
        <artifactId>demo-java-schedule</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>

    <artifactId>demo-quartz-02-spring</artifactId>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>2.3.12.RELEASE</version>
                <scope>import</scope>
                <type>pom</type>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
            <!-- fix: SLF4J: Class path contains multiple SLF4J bindings. -->
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-logging</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-quartz</artifactId>
        </dependency>
    </dependencies>
</project>

配置类
package org.example.config;

import lombok.extern.slf4j.Slf4j;
import org.example.job.HelloJob;
import org.quartz.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

@Slf4j
@Configuration
public class QuartzConfiguration {
    @Autowired
    private Scheduler scheduler;

    @PostConstruct
    public void helloJob() throws SchedulerException {
        JobDetail job = JobBuilder.newJob(HelloJob.class)
                .build();
        Trigger trigger = TriggerBuilder.newTrigger()
                .withSchedule(SimpleScheduleBuilder.simpleSchedule().withRepeatCount(3).withIntervalInSeconds(1))
                .build();
        scheduler.scheduleJob(job, trigger);
    }

    @PostConstruct
    public void shutdownScheduler() {
        new Thread(() -> {
            try {
                Thread.sleep(4000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            try {
                scheduler.shutdown(); // 在 spring 中,默认 scheduler 不会自动关闭
            } catch (SchedulerException e) {
                throw new RuntimeException(e);
            }
        }).start();
    }

}

持久化 with spring

参考: https://zhuanlan.zhihu.com/p/522284183

初始化 sql 脚本: https://gitee.com/qianwei4712/code-of-shiva/blob/master/quartz/quartz.sql

# 开发环境配置
server:
  # 服务器的HTTP端口
  port: 80
  servlet:
    # 应用的访问路径
    context-path: /
  tomcat:
    # tomcat的URI编码
    uri-encoding: UTF-8

spring:
  datasource:
    username: root
    password: root
    url: jdbc:mysql://127.0.0.1:3306/quartz?useUnicode=true&characterEncoding=utf-8&useSSL=true
    driver-class-name: com.mysql.cj.jdbc.Driver

    # HikariPool 较佳配置
    hikari:
      # 客户端(即您)等待来自池的连接的最大毫秒数
      connection-timeout: 60000
      # 控制将测试连接的活动性的最长时间
      validation-timeout: 3000
      # 控制允许连接在池中保持空闲状态的最长时间
      idle-timeout: 60000

      login-timeout: 5
      # 控制池中连接的最大生存期
      max-lifetime: 60000
      # 控制允许池达到的最大大小,包括空闲和使用中的连接
      maximum-pool-size: 10
      # 控制HikariCP尝试在池中维护的最小空闲连接数
      minimum-idle: 10
      # 控制默认情况下从池获得的连接是否处于只读模式
      read-only: false

配置类

@Configuration
public class ScheduleConfig {

    @Bean
    public SchedulerFactoryBean schedulerFactoryBean(DataSource dataSource) {
        SchedulerFactoryBean factory = new SchedulerFactoryBean();
        factory.setDataSource(dataSource);

        // quartz参数
        Properties prop = new Properties();
        prop.put("org.quartz.scheduler.instanceName", "shivaScheduler");
        prop.put("org.quartz.scheduler.instanceId", "AUTO");
        // 线程池配置
        prop.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool");
        prop.put("org.quartz.threadPool.threadCount", "20");
        prop.put("org.quartz.threadPool.threadPriority", "5");
        // JobStore配置
        prop.put("org.quartz.jobStore.class", "org.quartz.impl.jdbcjobstore.JobStoreTX");
        // 集群配置
        prop.put("org.quartz.jobStore.isClustered", "true");
        prop.put("org.quartz.jobStore.clusterCheckinInterval", "15000");
        prop.put("org.quartz.jobStore.maxMisfiresToHandleAtATime", "1");
        prop.put("org.quartz.jobStore.txIsolationLevelSerializable", "true");

        // sqlserver 启用
        // prop.put("org.quartz.jobStore.selectWithLockSQL", "SELECT * FROM {0}LOCKS UPDLOCK WHERE LOCK_NAME = ?");
        prop.put("org.quartz.jobStore.misfireThreshold", "12000");
        prop.put("org.quartz.jobStore.tablePrefix", "QRTZ_");
        factory.setQuartzProperties(prop);

        factory.setSchedulerName("shivaScheduler");
        // 延时启动
        factory.setStartupDelay(1);
        factory.setApplicationContextSchedulerContextKey("applicationContextKey");
        // 可选,QuartzScheduler
        // 启动时更新己存在的Job,这样就不用每次修改targetObject后删除qrtz_job_details表对应记录了
        factory.setOverwriteExistingJobs(true);
        // 设置自动启动,默认为true
        factory.setAutoStartup(true);

        return factory;
    }
}