您现在的位置是:首页 >技术杂谈 >【开发经验】spring事件监听机制关心的同步、异步、事务问题网站首页技术杂谈

【开发经验】spring事件监听机制关心的同步、异步、事务问题

叁滴水 2023-06-03 12:00:03
简介【开发经验】spring事件监听机制关心的同步、异步、事务问题


        观察者模式又称为发布订阅模式,定义为:对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖它的对象都得到通知并被自动更新。

        如下图所示,一个场景中,主要流程是用户下单,并且返回操作,但是下单之后,要做很多与主干业务无关的流程,如发送短信、发送邮件、积分增加等等。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FPfVdOQt-1682166965766)(C:UserslikuoAppDataRoamingTypora	ypora-user-imagesimage-20230422203315173.png)]

        这种模式想必大家多多少少都了解过,但是大家讨论最多的就是通过使用mq中间件实现发布订阅模式,但是如果没有中间件呢?本文重点阐述,在没有中间件的情况下,如何做到使用观察者模式解耦。

spring发布订阅示例

        spring和guava都实现了比较优雅的发布订阅的框架,但是默认开发都会在spring的框架下开发。因此通过spring来实现发布订阅会更加方便一点。

主要业务流程代码,用户完成下单并且返回,并且发布订单下单事件。

    @Autowired
    private ApplicationContext applicationContext;
    public String saveOrder(Order order){
        this.orderMapper.save(order);
        //创建下单事件
        this.applicationContext.publishEvent(new SaveOrderEvent(order));
        return "下单成功";
    }

创建下单事件类,描述该事件信息和当事件发生时可以添加一些通用代码。

// 下单事件
public class SaveOrderEvent  extends ApplicationEvent {

    public SaveOrderEvent(Object source) {
        //编写事件后的代码
        super(source);
    }
}

        事件监听,这里的写法示例两种(我也没研究过其他的写法)。一种是作用在类上,一个类一个监听事件,另一种可以总用在方法上,通过@EventListener来实现监听。

@Component
public class SaveOrderEventListener implements ApplicationListener<SaveOrderEvent> {
    @Override
    public void onApplicationEvent(SaveOrderEvent event) {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        System.out.println("线程name:"+Thread.currentThread().getName()+"类打印信息"+event);
    }
}
@Service
public class OrderService{

  @EventListener(SaveOrderEvent.class)
    public void methodTest(SaveOrderEvent event){
        System.out.println("线程name:"+Thread.currentThread().getName()+"方法中打印"+event);
    }
}

执行代码后,打印日志如下:

线程name:http-nio-6902-exec-3方法中打印com.example.demo.task.SaveOrderEvent[source=Order{orderId=‘12312312’}]
线程name:http-nio-6902-exec-3类打印信息com.example.demo.task.SaveOrderEvent[source=Order{orderId=‘12312312’}]

同步核心源码分析

        我原本以为spring的发布订阅模式是异步的,这可能是我用mq中间件用的太多了缘故吧。实际上,默认情况下spring的发布订阅是同步的(可以通过配置实现异步)。为了更进一步测试他是异步还是同步, 我在一个订阅事件中设置了休眠,然后就因为这个休眠,影响了主流程的响应时间。为此我还去研究了它的源码。最终找到核心源码为下org.springframework.context.event.SimpleApplicationEventMulticaster#multicastEvent

public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
  		ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
  		for (final ApplicationListener<?> listener : getApplicationListeners(event, type)) {
            //如果有配置执行器,则为异步,如果没有配置,则为同步。
  			Executor executor = getTaskExecutor();
  			if (executor != null) {
  				executor.execute(() -> invokeListener(listener, event));
  			}
  			else {
  				invokeListener(listener, event);
  			}
  		}
  	}

如何配置异步

        我在寻找了很多资料,大部分博客是使用@Async注解实现,其实仔细观察其源码可发现,官方这种写法,肯定是可以通过配置实现异步。因此,我在此示例两种写法。

方法一:

代码示例如下,自定义一个线程池,并且把线程池设置上即可。个人认为是相对符合官方的配置思路。

 @Bean("taskExecutor")
    public Executor getExecutor() {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(10,
                20,
                60,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue(10000));
        return executor;
    }

    @Bean(AbstractApplicationContext.APPLICATION_EVENT_MULTICASTER_BEAN_NAME)
    public ApplicationEventMulticaster initEventMulticaster(@Qualifier("taskExecutor") Executor taskExecutor) {
        SimpleApplicationEventMulticaster simpleApplicationEventMulticaster = new SimpleApplicationEventMulticaster();
        simpleApplicationEventMulticaster.setTaskExecutor(taskExecutor);
        return simpleApplicationEventMulticaster;
    }

方法2:

        通过@Async注解实现,在使用@Async的时候一般都会自定义线程池,因为@Async的默认线程池为SimpleAsyncTaskExecutor,不是真的线程池,这个类不重用线程,默认每次调用都会创建一个新的线程。

Application上添加@EnableAsync注解。然后在方法上添加注解即可@Async即可

@Component
public class SaveOrderEventListener implements ApplicationListener<SaveOrderEvent> {
    @Override
    @Async("taskExecutor")
    public void onApplicationEvent(SaveOrderEvent event) {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        System.out.println("线程name:"+Thread.currentThread().getName()+"类打印信息"+event);
    }
}

事务问题

        异步、解耦和事务在一起本身是一个伪命题,至少我并为发现一套完美的异步事务解决方案。但是,为了追求数据一致性,开发者也是在一直努力着。如果要是想使用spring的异步发布订阅的时候实现数据一致性。也许可以尝试下@TransactionalEventListener。从命名上来看,即可得出,他是一个事件监听加上了事务的扩展。只不过加入了回调的方式来解决,这样就能够在事务进行CommitedRollback…等的时候才会去进行Event的处理,达到事务同步的目的。

   //配置监听,事务commit之后执行。
   @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
    public void methodTest(SaveOrderEvent event){
        System.out.println("线程name:"+Thread.currentThread().getName()+"方法中打印"+event);
    }

        如上,配置了事务执行之后执行。并且它还有很多其他的监听方式,如:BEFORE_COMMIT、AFTER_COMMIT、AFTER_ROLLBACK、AFTER_COMPLETION。这种的相对于@EventListener功能更加多了一些。毕竟在真实的场景中,经常是有事务存在的,并且为了减小事务的执行时间,要求第三方的接口调用不在事务中执行。

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。