Java 事件实现
大约 2 分钟
事件/消息
事件驱动模型:生产者、消费者
观察者模式
工具: JDK Observable
解耦: 生产和消费的方法解耦
被观察者 - 观察者
发布订阅模式
工具: Guava EventBus
解耦: 生产和消费的关系解耦
发布者 - 消息总线 - 订阅者
框架:Guava EventBus
框架:Spring Event
特性:
- 支持异步 ——
@Async
- 支持条件配置 ——
@EventListener(condition = "...")
基本使用
Event
package org.example.event;
import lombok.*;
import org.springframework.context.ApplicationEvent;
@Getter
@Setter
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
public class SimpleEvent extends ApplicationEvent {
private String name;
/**
* Create a new ApplicationEvent.
*
* @param source the object on which the event initially occurred (never {@code null})
*/
public SimpleEvent(Object source) {
super(source);
}
}
Publisher
package org.example.event;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
@Component
public class SimpleEventPublisher implements ApplicationRunner {
@Resource
ApplicationContext applicationContext; // 实现了 applicationEventPublisher 接口
@Resource
private ApplicationEventPublisher applicationEventPublisher; // 接口
@Override
public void run(ApplicationArguments args) throws Exception {
log.debug("context == publisher: {}", applicationEventPublisher == applicationContext);
AtomicInteger counter = new AtomicInteger(0);
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
SimpleEvent event = new SimpleEvent("hello world!");
event.setName("EID" + counter.getAndIncrement());
applicationContext.publishEvent(event);
}, 0, 1, TimeUnit.SECONDS);
}
}
相关信息
按 Spring 的设计,提供了两个接口注入 或者 ApplicationContext。
Publisher
@Service
public class XxxEventService implements ApplicationEventPublisherAware {
private ApplicationEventPublisher applicationEventPublisher;
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}
public void publishEvent() {
applicationEventPublisher.publishEvent(new XxxEvent("xxx"));
}
}
Context
Context 实现了 Publisher 接口,把 Context 当作 Publisher 用即可。
@Service
public class XxxEventService implements ApplicationContextAware {
private ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
public void publishEvent() {
applicationContext.publishEvent(new XxxEvent("xxx"));
}
}
:::
::::
Listener
package org.example.event;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Optional;
@Slf4j
@Component
public class SimpleEventListener implements ApplicationListener<SimpleEvent> {
@Resource
private ApplicationContext applicationContext;
@Override
public void onApplicationEvent(SimpleEvent event) {
log.info("receive event:{}, stamp:{}", event, event.getTimestamp());
Optional.of(event).map(SimpleEvent::getName).filter(name -> 4 < name.length()).ifPresent(str -> {
// shutdown
SpringApplication.exit(applicationContext);
System.exit(0);
});
}
}
AnnotationListener
package org.example.event;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.core.annotation.Order;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class SimpleEventAnnotationListener {
@Order(value = 0) // 优先级。值越低,优先级越高。 💡只与同类比较
@Async
@EventListener(condition = "#event.name == 'EID7'")
// @EventListener({SimpleEvent.class}) // 指定多个 event
public void listener001(SimpleEvent event) {
log.info("receive event:{}", event);
}
/**
* 事件二次发布!
*/
@EventListener(classes = {SimpleEvent.class}, condition = "#event.name != 'RID'")
public SimpleEvent listener002(SimpleEvent event) {
if ("EID8".equals(event.getName())) {
log.info("recall event:{}, stamp:{}", event, event.getTimestamp());
SimpleEvent newEvent = new SimpleEvent("OK");
newEvent.setName("RID");
return newEvent;
}
return null;
}
}
自定义事件,封装实现按标识处理
特性:
- 监听事件可按 Topic 区分处理
- 监听器统一配置
Event
package org.example.eventTpoic;
import lombok.Getter;
import org.springframework.context.ApplicationEvent;
@Getter
public class TopicEvent extends ApplicationEvent {
private Topic topic;
/**
* Create a new ApplicationEvent.
*
* @param source the object on which the event initially occurred (never {@code null})
*/
public TopicEvent(Topic topic, Object source) {
super(source);
this.topic = topic;
}
}
Publisher
package org.example.eventTpoic;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.stereotype.Component;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
public class TopicEventPublisher implements ApplicationRunner, ApplicationEventPublisherAware {
private ApplicationEventPublisher applicationEventPublisher;
@Override
public void run(ApplicationArguments args) throws Exception {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
TopicEvent event = new TopicEvent(Topic.JOIN_MEMBERSHIP_GROUP, "Hello world!");
log.info("Publish Event: {}", event);
applicationEventPublisher.publishEvent(event);
}, 0, 1, TimeUnit.SECONDS);
}
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}
}
ListenerEngine
package org.example.eventTpoic;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
@Slf4j
public class TopicListenerEngine {
/**
* 事件执行线程池
*/
private Executor executor;
/**
* 监听器
*/
private Map<Topic, List<TopicListener>> topicListMap;
public TopicListenerEngine(Executor executor, Map<Topic, List<TopicListener>> topicListMap) {
this.executor = executor;
this.topicListMap = topicListMap;
}
// todo @EventListener(condition = "") // SpEL
@EventListener
public void listen(TopicEvent event) {
List<TopicListener> topicListeners = topicListMap.get(event.getTopic());
topicListeners.forEach(listener -> {
executor.execute(() -> {
if (log.isDebugEnabled()) {
log.debug("Event Topic:{}, Handler:{}", event, listener);
}
listener.onEvent(event);
});
});
}
}
ListenerEngineConfig
package org.example.eventTpoic;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
@Slf4j
@Configuration
public class TopicListenerEngineConfig {
/**
* 线程池异步处理
*/
private static final Executor EXECUTOR = new ThreadPoolExecutor(
20,
50,
10,
TimeUnit.MINUTES,
new LinkedBlockingQueue<>(500),
new CustomizableThreadFactory("EVENT_ENGINE_POOL")
);
@Bean("eventListenerEngine")
public TopicListenerEngine initEventListenerEngine() {
Map<Topic, List<TopicListener>> toplicMap = new HashMap<>();
// todo
toplicMap.put(Topic.JOIN_MEMBERSHIP_GROUP, Arrays.asList(new TopicListener() {
@Override
public void onEvent(TopicEvent event) {
log.info("Receive Event:{}", event);
}
}));
toplicMap.put(Topic.ISSUE_COUPONS, Arrays.asList());
toplicMap.put(Topic.SEND_WELCOME_MSG, Arrays.asList());
return new TopicListenerEngine(EXECUTOR, toplicMap);
}
}
内置事件
事件 | 描述 |
---|---|
ContextRefreshedEvent | 容器实例被实例化或者 refreshed(触发 refresh() 方法)时触发事件。 |
ContextStartedEvent | 容器启动时(触发 start() 方法)触发事件。 |
ContextStoppedEvent | 容器停止时(触发 stop() 方法)触发事件。此时所有 bean 收到 stop 信号。可通过 start() 方法重启容器。 |
ContextClosedEvent | 容器关闭时(触发 close() 方法)触发事件。此时所有 bean 已经销毁。容器无法重启。 |
RequestHandledEvent | Spring Web 中 DispatcherServlet 处理完一个请求后触发事件。 |