您现在的位置是:首页 >技术杂谈 >【开发经验】spring事件监听机制关心的同步、异步、事务问题网站首页技术杂谈
【开发经验】spring事件监听机制关心的同步、异步、事务问题
观察者模式又称为发布订阅模式,定义为:对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖它的对象都得到通知并被自动更新。
如下图所示,一个场景中,主要流程是用户下单,并且返回操作,但是下单之后,要做很多与主干业务无关的流程,如发送短信、发送邮件、积分增加等等。
这种模式想必大家多多少少都了解过,但是大家讨论最多的就是通过使用mq中间件实现发布订阅模式,但是如果没有中间件呢?本文重点阐述,在没有中间件的情况下,如何做到使用观察者模式解耦。
spring发布订阅示例
spring和guava都实现了比较优雅的发布订阅的框架,但是默认开发都会在spring的框架下开发。因此通过spring来实现发布订阅会更加方便一点。
主要业务流程代码,用户完成下单并且返回,并且发布订单下单事件。
@Autowired
private ApplicationContext applicationContext;
public String saveOrder(Order order){
this.orderMapper.save(order);
//创建下单事件
this.applicationContext.publishEvent(new SaveOrderEvent(order));
return "下单成功";
}
创建下单事件类,描述该事件信息和当事件发生时可以添加一些通用代码。
// 下单事件
public class SaveOrderEvent extends ApplicationEvent {
public SaveOrderEvent(Object source) {
//编写事件后的代码
super(source);
}
}
事件监听,这里的写法示例两种(我也没研究过其他的写法)。一种是作用在类上,一个类一个监听事件,另一种可以总用在方法上,通过@EventListener
来实现监听。
@Component
public class SaveOrderEventListener implements ApplicationListener<SaveOrderEvent> {
@Override
public void onApplicationEvent(SaveOrderEvent event) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("线程name:"+Thread.currentThread().getName()+"类打印信息"+event);
}
}
@Service
public class OrderService{
@EventListener(SaveOrderEvent.class)
public void methodTest(SaveOrderEvent event){
System.out.println("线程name:"+Thread.currentThread().getName()+"方法中打印"+event);
}
}
执行代码后,打印日志如下:
线程name:http-nio-6902-exec-3方法中打印com.example.demo.task.SaveOrderEvent[source=Order{orderId=‘12312312’}]
线程name:http-nio-6902-exec-3类打印信息com.example.demo.task.SaveOrderEvent[source=Order{orderId=‘12312312’}]
同步核心源码分析
我原本以为spring的发布订阅模式是异步的,这可能是我用mq中间件用的太多了缘故吧。实际上,默认情况下spring的发布订阅是同步的(可以通过配置实现异步)。为了更进一步测试他是异步还是同步, 我在一个订阅事件中设置了休眠,然后就因为这个休眠,影响了主流程的响应时间。为此我还去研究了它的源码。最终找到核心源码为下org.springframework.context.event.SimpleApplicationEventMulticaster#multicastEvent
:
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
for (final ApplicationListener<?> listener : getApplicationListeners(event, type)) {
//如果有配置执行器,则为异步,如果没有配置,则为同步。
Executor executor = getTaskExecutor();
if (executor != null) {
executor.execute(() -> invokeListener(listener, event));
}
else {
invokeListener(listener, event);
}
}
}
如何配置异步
我在寻找了很多资料,大部分博客是使用@Async注解实现,其实仔细观察其源码可发现,官方这种写法,肯定是可以通过配置实现异步。因此,我在此示例两种写法。
方法一:
代码示例如下,自定义一个线程池,并且把线程池设置上即可。个人认为是相对符合官方的配置思路。
@Bean("taskExecutor")
public Executor getExecutor() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(10,
20,
60,
TimeUnit.SECONDS,
new ArrayBlockingQueue(10000));
return executor;
}
@Bean(AbstractApplicationContext.APPLICATION_EVENT_MULTICASTER_BEAN_NAME)
public ApplicationEventMulticaster initEventMulticaster(@Qualifier("taskExecutor") Executor taskExecutor) {
SimpleApplicationEventMulticaster simpleApplicationEventMulticaster = new SimpleApplicationEventMulticaster();
simpleApplicationEventMulticaster.setTaskExecutor(taskExecutor);
return simpleApplicationEventMulticaster;
}
方法2:
通过@Async
注解实现,在使用@Async
的时候一般都会自定义线程池,因为@Async
的默认线程池为SimpleAsyncTaskExecutor
,不是真的线程池,这个类不重用线程,默认每次调用都会创建一个新的线程。
在Application
上添加@EnableAsync
注解。然后在方法上添加注解即可@Async
即可
@Component
public class SaveOrderEventListener implements ApplicationListener<SaveOrderEvent> {
@Override
@Async("taskExecutor")
public void onApplicationEvent(SaveOrderEvent event) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("线程name:"+Thread.currentThread().getName()+"类打印信息"+event);
}
}
事务问题
异步、解耦和事务在一起本身是一个伪命题,至少我并为发现一套完美的异步事务解决方案。但是,为了追求数据一致性,开发者也是在一直努力着。如果要是想使用spring的异步发布订阅的时候实现数据一致性。也许可以尝试下@TransactionalEventListener
。从命名上来看,即可得出,他是一个事件监听加上了事务的扩展。只不过加入了回调
的方式来解决,这样就能够在事务进行Commited
,Rollback
…等的时候才会去进行
Event的处理,达到事务同步的目的。
//配置监听,事务commit之后执行。
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void methodTest(SaveOrderEvent event){
System.out.println("线程name:"+Thread.currentThread().getName()+"方法中打印"+event);
}
如上,配置了事务执行之后执行。并且它还有很多其他的监听方式,如:BEFORE_COMMIT、AFTER_COMMIT、AFTER_ROLLBACK、AFTER_COMPLETION。这种的相对于@EventListener
功能更加多了一些。毕竟在真实的场景中,经常是有事务存在的,并且为了减小事务的执行时间,要求第三方的接口调用不在事务中执行。