Timer 使用
大约 2 分钟
JDK 提供的定时任务工具
Demo: /code/demo-java-schedule/demo-timer-01-simple.html
简单使用
提示
Timer 使用 “小顶堆” 方式管理任务调度
package org.example.task;
import lombok.Builder;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@Slf4j
public class HelloTaskTest {
private static final int NUM = 2;
private static final CountDownLatch countDownLatch = new CountDownLatch(NUM*2);
private static final List<String> expectArray = Collections.synchronizedList(new ArrayList<>());
@Test
void test() {
Timer timer = new Timer(); // 底层使用小顶堆实现任务轮询
for (int i = 0; i < 2; i++) {
log.info("启动 task {}", i);
HelloTask task = HelloTask.builder()
.num(i)
.build();
Calendar cal = Calendar.getInstance();
Date firstTime = cal.getTime(); // 开始时间
long period = 2000L; // 时间间隔 ❗计划的时间间隔,但实际执行时间会受到上次执行时间影响。比如上次任务未执行结束,则下次任务即便到达执行时间,但仍然等待上次任务执行后才执行
timer.schedule(task, firstTime, period);
}
try {
log.info("主进程等待");
countDownLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("主进程结束");
Assertions.assertArrayEquals(new String[] {"0", "1", "0", "1"}, expectArray.toArray());
}
@Builder
private static class HelloTask extends TimerTask {
private static final String PREFIX = "taskHello";
private int num;
private String logo() {
return PREFIX + "-" + num;
}
@Override
public void run() {
int i = 0;
while (i++<3) {
log.info("{} {} {} start", logo(), i, Thread.currentThread().hashCode());
try {
TimeUnit.MILLISECONDS.sleep(1000); // 💡任务间隔 2s 执行一次,但这里等待 1*3s,这会导致下次任务执行延时(观察 log 打印的时间)
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("{} {} {} end", logo(), i, Thread.currentThread().hashCode());
}
expectArray.add(String.valueOf(num));
countDownLatch.countDown();
}
}
}
任务丢失问题
虽然任务添加进队列,但由于上一个任务未执行完成,导致后面的任务延时完成,最终在程序关闭前依然未执行任务。
package org.example.task;
import lombok.Builder;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.Calendar;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@Slf4j
public class MissTaskTest {
private static final int NUM = 10;
private static final CountDownLatch countDownLatch = new CountDownLatch(NUM);
@Test
void test() {
Timer timer = new Timer();
for (int i = 0; i < NUM; i++) {
log.info("主进程开始任务 {}", i);
HelloTask task = HelloTask.builder().num(i).build();
Date firstTime = Calendar.getInstance().getTime();
long period = 100L;
timer.schedule(task, firstTime, period);
}
try {
log.info("主进程等待");
countDownLatch.await(2L, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("主进程结束 {}", countDownLatch.getCount());
Assertions.assertTrue(countDownLatch.getCount()>0); // 给了足够时间,任务依然未全部执行 —— 任务丢失
}
@Builder
private static class HelloTask extends TimerTask {
private static final String PREFIX = "helloTask";
private int num;
private String logo() {
return PREFIX + "-" + num;
}
@Override
public void run() {
log.info("{} 开始", logo());
try {
TimeUnit.MILLISECONDS.sleep(300L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("{} 结束", logo());
countDownLatch.countDown();
}
}
}
线程池执行
上面任务丢失问题是因为 Timer 对 schedule 中的任务是单线程执行的,所以有线程阻塞问题。 解决方法就是添加线程池,避免线程阻塞问题。
FixPool 方案
package org.example.task;
import lombok.Builder;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.Calendar;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.*;
@Slf4j
public class FixPoolTaskTest {
private static final int NUM = 10;
private static final CountDownLatch countDownLatch = new CountDownLatch(NUM);
@Test
void test() {
Timer timer = new Timer();
for (int i = 0; i < NUM; i++) {
log.info("主进程开始任务 {}", i);
HelloTask task = HelloTask.builder().num(i).build();
Date firstTime = Calendar.getInstance().getTime();
long period = 100L;
timer.schedule(task, firstTime, period);
}
try {
log.info("主进程等待");
countDownLatch.await(2L, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("主进程结束 {}", countDownLatch.getCount());
Assertions.assertEquals(0, countDownLatch.getCount());
}
@Builder
private static class HelloTask extends TimerTask {
private static final ExecutorService THREADPOOL = Executors.newFixedThreadPool(NUM);
private static final String PREFIX = "helloTask";
private int num;
private String logo() {
return PREFIX + "-" + num;
}
@Override
public void run() {
THREADPOOL.execute(() -> {
log.info("{} 开始", logo());
try {
TimeUnit.MILLISECONDS.sleep(300L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("{} 结束", logo());
countDownLatch.countDown();
});
}
}
}
SchedulePool 方案
SchedulePool 相关方法
// 延时执行一次某个任务
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit); // 会有返回值
// 延时,并周期性执行某个任务
// 间隔时间是固定的,无论上一个任务是否执行完成
public ScheduledFuture<?> scheduleWithFixedDalay(Runnable command, long initialDelay, long delay, TimeUnit unit)
// 间隔是不固定的,会在周期任务的上一个任务执行完成之后才开始计时,并在指定时间间隔之后才开始执行任务
public ScheduledFuture<?> scheduleAtFixeRate(Runnable command, long initialDelay, long period, TimeUnit unit)
相关信息
Leader-Follower 模式 在 SchedulePool 中,所有工作线程只会有一个 leader 线程,其他线程都是 follower 线程。 只有 leader 线程能执行任务,而 follower 线程则不会执行任务(它们会处于 “休眠” 状态)。 当 leader 线程拿到任务后执行任务前,leader 线程会变成 follower 线程,并选出一个新的 leader 线程,然后才去执行任务。 然后新的 leader 等待下一个可执行任务的到来,循环上面过程。 这种方法保证了线程池中的线程有序执行任务。
代码:
package org.example.task;
import lombok.Builder;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.TimerTask;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Slf4j
public class SchedulePoolTaskTest {
private static final int NUM = 10;
private static final CountDownLatch countDownLatch = new CountDownLatch(NUM);
private static final ScheduledExecutorService THREADPOOL = Executors.newScheduledThreadPool(NUM);
@Test
void test() {
for (int i = 0; i < NUM; i++) {
log.info("主进程开始任务 {}", i);
HelloTask task = HelloTask.builder().num(i).build();
long period = 100L;
THREADPOOL.scheduleAtFixedRate(task, 0L, period, TimeUnit.MILLISECONDS);
}
try {
log.info("主进程等待");
countDownLatch.await(2L, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("主进程结束 {}", countDownLatch.getCount());
Assertions.assertEquals(0, countDownLatch.getCount());
}
@Builder
private static class HelloTask extends TimerTask {
private static final String PREFIX = "helloTask";
private int num;
private String logo() {
return PREFIX + "-" + num;
}
@Override
public void run() {
THREADPOOL.execute(() -> {
log.info("{} 开始", logo());
try {
TimeUnit.MILLISECONDS.sleep(300L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("{} 结束", logo());
countDownLatch.countDown();
});
}
}
}
::: tips 使用 SchedulePool 就不需要自己创建 Timer 了。 :::