您现在的位置是:首页 >技术杂谈 >设计模式-观察者模式网站首页技术杂谈

设计模式-观察者模式

码农界的菜鸟 2024-05-31 12:00:03
简介设计模式-观察者模式

观察者模式



什么是观察者模式

  在对象之间定义一个一对多的依赖,当一个对象状态改变的时候,所有依赖的对象都会自动收到通知。
  一般情况下,被依赖的对象叫作被观察者(Observable),依赖的对象叫作观察者(Observer)。不过,在实际的项目开发中,这两种对象的称呼是比较灵活的,有各种不同的叫法,比如:Subject-Observer、Publisher-Subscriber、Producer-Consumer、EventEmitter-EventListener、Dispatcher-Listener。不管怎么称呼,只要应用场景符合刚刚给出的定义,都可以看作观察者模式。

为什么要用观察者模式

  观察者模式最大的作用就是解耦,为了更符合开闭原则,使用观察者模式可以将观察者和被观察者代码解耦。借助设计模式,我们利用更好的代码结构,将一大坨代码拆分成职责更单一的小类,让其满足开闭原则、高内聚松耦合等特性,以此来控制和应对代码的复杂性,提高代码的可扩展性。
  比如我们现在有这样一个需求,我们有一个可以动态记录线程池核心参数的Starter,将核心参数定时写入本地文件。代码示例如下:

// 信息采集运行器
public class ZfDtpMonitorRunner implements ApplicationRunner {
    private ZfDtpRunInfoCollectHandler zfDtpRunInfoCollectHandler;

    @Override
    public void run(ApplicationArguments args) {
        // 开启信息采集定时任务
        this.getZfDtpRunInfoCollectHandler().startCollectSchedule();
    }
}

// 信息采集处理器(执行具体的采集任务)
public class ZfDtpRunInfoCollectHandler {
	public void startCollectSchedule() {
        ZfDtpProperties.Monitor monitor = this.getZfDtpProperties().getMonitor();
        Long initialDelayMs = monitor.getInitialDelayMs();
        Long collectIntervalMs = monitor.getCollectIntervalMs();

        // 开启定时任务
        this.getCollectExecutor().scheduleWithFixedDelay(
                this::doCollect,
                initialDelayMs,
                collectIntervalMs,
                TimeUnit.MILLISECONDS);
    }

	// 采集信息
	private void doCollect() {
		// 信息采集时间
        DateTime currentDateTime = DateUtil.dateSecond();
        String collectDay = DateUtil.format(currentDateTime, ZfDtpConstants.DAY_FORMAT_STR);
        String collectTime = DateUtil.format(currentDateTime, ZfDtpConstants.DATETIME_FORMAT_STR);

        // 本次信息采集线程池运行状态集合
        List<ZfDtpRunInfo> zfDtpRunInfoList = new ArrayList<>();

        // 采集动态线程池
        List<String> dtpNames = ZfDtpFactory.listAllZfDtpNames();
        dtpNames.forEach(threadPoolName -> {
            ZfDtpExecutor zfDtpExecutor = ZfDtpFactory.getZfDtpExecutor(threadPoolName);
            ZfDtpRunInfo zfDtpRunInfo = ZfDtpConverter.convertToRunInfo(zfDtpExecutor);
            zfDtpRunInfo.setCurrentTime(collectTime);
            zfDtpRunInfoList.add(zfDtpRunInfo);
        });

        // 本次采集信息
        ZfDtpCollectTimeInfo zfDtpCollectTimeInfo = new ZfDtpCollectTimeInfo();
        zfDtpCollectTimeInfo.setCollectDay(collectDay);
        zfDtpCollectTimeInfo.setCollectTime(collectTime);
        zfDtpCollectTimeInfo.setZfDtpRunInfoList(zfDtpRunInfoList);

        // 保存线程池状态
        saveZfDtpRunInfo(collectDay, zfDtpCollectTimeInfo);
    }
	
	// 保存运行状态
	public void saveZfDtpRunInfo(String collectDay, ZfDtpCollectTimeInfo zfDtpCollectTimeInfo) {
        // 保存至文件
        String localFilePath = CommonUtil.getFilePath(this.getLocalFilePathPrefix(), collectDay);
        ArrayList<String> appendLines = new ArrayList<>();
        appendLines.add(JSONUtil.toJsonStr(zfDtpCollectTimeInfo));
        FileUtil.appendLines(appendLines, localFilePath, StandardCharsets.UTF_8);
    }
}

  上面的代码看起来似乎没什么问题,但是现在又有新的需求,需要将采集的信息保存到ES中,并且后续有规划,将采集的信息保存到时序数据库中。那我们一直频繁的修改ZfDtpRunInfoCollectHandler显然是不合适的,不符合开闭原则。那么此时我们就可以用到观察者模式,将采集信息与记录信息两个动作解耦。接下来我们介绍如何使用观察者模式。

如何使用观察者模式

在解决上面的问题之前,我们先介绍下实现观察者模式的几种方式:

  • JDK自带的Observer/Observable。这是最基本的选择,但是API较为简陋,不太方便使用。
  • Spring的ApplicationEventPublisher和ApplicationListener。这是在Spring环境下一个很好的选择,可以很方便的进行事件发布和监听。
  • Guava的EventBus。这也是一个不错的选型,Guava提供的EventBus API非常简洁易用。
  • RxJava。RxJava是一套polyglot的响应式扩展库,它不仅支持观察者模式,还提供了丰富的操作符来compose异步和事件驱动的程序。这也是当下一个优秀的选择。
  • javax.jms提供的JMS(Java消息服务)API,更广泛用于异步消息和事件处理。
  • RabbitMQ/ActiveMQ/Kafka等消息中间件产品。当应用复杂时,可以使用消息队列来实现事件驱动和解耦。
  • 自己实现一个轻量级的事件总线。实际上,像Guava EventBus等都只是对观察者模式的轻量级实现,我们也可以自己实现一个。

我们罗列了很多种实现观察者模式的方式,但是实际上的技术选型需要结合自己的项目以及实际情况来进行选择,我们这里使用Spring的ApplicationEventPublisher和ApplicationListener来解决上面的问题。代码示例如下:

public class ZfDtpRunInfoCollectRunner implements ApplicationRunner {
    private ZfDtpProperties zfDtpProperties;
    private ScheduledThreadPoolExecutor collectExecutor;

    @Override
    public void run(ApplicationArguments args) {
        ZfDtpProperties.Monitor monitor = this.getZfDtpProperties().getMonitor();
        Long initialDelayMs = monitor.getInitialDelayMs();
        Long collectIntervalMs = monitor.getCollectIntervalMs();
        // 开启定时任务
        this.getCollectExecutor().scheduleWithFixedDelay(
                this::collectDtpRunInfoAndPublishEvent,
                initialDelayMs,
                collectIntervalMs,
                TimeUnit.MILLISECONDS);
    }

    /**
     * 采集信息并发布事件
     */
    public void collectDtpRunInfoAndPublishEvent(){
        // 信息采集时间
        DateTime currentDateTime = DateUtil.dateSecond();
        String collectDay = DateUtil.format(currentDateTime, ZfDtpConstants.DAY_FORMAT_STR);
        String collectTime = DateUtil.format(currentDateTime, ZfDtpConstants.DATETIME_FORMAT_STR);

        // 本次信息采集线程池运行状态集合
        List<ZfDtpRunInfo> zfDtpRunInfoList = new ArrayList<>();

        // 采集普通线程池
        List<String> commonNames = ZfDtpFactory.listAllCtpNames();
        commonNames.forEach(threadPoolName -> {
            ExecutorWrapper executorWrapper = ZfDtpFactory.getCtpExecutor(threadPoolName);
            ZfDtpRunInfo zfDtpRunInfo = ZfDtpConverter.convertToRunInfo(executorWrapper);
            zfDtpRunInfo.setCurrentTime(collectTime);
            zfDtpRunInfoList.add(zfDtpRunInfo);
        });

        // 采集动态线程池
        List<String> dtpNames = ZfDtpFactory.listAllZfDtpNames();
        dtpNames.forEach(threadPoolName -> {
            ZfDtpExecutor zfDtpExecutor = ZfDtpFactory.getZfDtpExecutor(threadPoolName);
            ZfDtpRunInfo zfDtpRunInfo = ZfDtpConverter.convertToRunInfo(zfDtpExecutor);
            zfDtpRunInfo.setCurrentTime(collectTime);
            zfDtpRunInfoList.add(zfDtpRunInfo);
        });

        // 本次采集信息
        ZfDtpCollectTimeInfo zfDtpCollectTimeInfo = new ZfDtpCollectTimeInfo();
        zfDtpCollectTimeInfo.setCollectDay(collectDay);
        zfDtpCollectTimeInfo.setCollectTime(collectTime);
        zfDtpCollectTimeInfo.setZfDtpRunInfoList(zfDtpRunInfoList);

        // 发布事件
        ZfDtpRunInfoCollectEvent zfDtpRunInfoCollectEvent = new ZfDtpRunInfoCollectEvent(zfDtpCollectTimeInfo);
        ZfDtpApplicationContext.publishEvent(zfDtpRunInfoCollectEvent);
    }
}

// 自定义事件类
public class ZfDtpRunInfoCollectEvent extends ApplicationEvent {
    public ZfDtpRunInfoCollectEvent(ZfDtpCollectTimeInfo zfDtpCollectTimeInfo) {
        super(zfDtpCollectTimeInfo);
    }
}

// 自定义抽象类,复用代码
public abstract class ZfDtpAbstractRunInfoCollectEventListener implements ApplicationListener<ZfDtpRunInfoCollectEvent> {
    @Override
    public void onApplicationEvent(ZfDtpRunInfoCollectEvent event) {
        ZfDtpCollectTimeInfo zfDtpCollectTimeInfo = (ZfDtpCollectTimeInfo) event.getSource();
        doCollect(zfDtpCollectTimeInfo);
    }
    abstract void doCollect(ZfDtpCollectTimeInfo zfDtpCollectTimeInfo);
}


// 自定义监听器
public class ZfDtpRunInfoCollectEventLocalFileListener extends ZfDtpAbstractRunInfoCollectEventListener {
    private ZfDtpProperties zfDtpProperties;
    private String localFilePathPrefix;

    /**
     * 采集信息
     */
    @Override
    public void doCollect(ZfDtpCollectTimeInfo zfDtpCollectTimeInfo) {
        try {
            // 获取采集信息
            String collectDay = zfDtpCollectTimeInfo.getCollectDay();

            // 保存线程池状态
            String localFilePath = CommonUtil.getFilePath(this.getLocalFilePathPrefix(), collectDay);
            ArrayList<String> appendLines = new ArrayList<>();
            appendLines.add(JSONUtil.toJsonStr(zfDtpCollectTimeInfo));
            FileUtil.appendLines(appendLines, localFilePath, StandardCharsets.UTF_8);
        } catch (Exception e) {
            log.error("监听到线程池运行状态信息,保存到本地失败", e);
        }
    }
}

// 在Configuration类中将监听器注册到Spring容器中
    @Bean
    @DependsOn("zfDtpRunInfoCollectRunner")
    @ConditionalOnProperty(name = ZfDtpConstants.COLLECT_TYPE, matchIfMissing = true, havingValue = "localFile")
    public ZfDtpRunInfoCollectEventLocalFileListener zfDtpRunInfoCollectEventLocalFileListener(ZfDtpProperties zfDtpProperties) {
        ZfDtpProperties.Monitor monitorConfig = zfDtpProperties.getMonitor();
        String localFilePathPrefix = CommonUtil.getLocalFilePathPrefix(monitorConfig.getLocalFilePath());
        return new ZfDtpRunInfoCollectEventLocalFileListener(zfDtpProperties, localFilePathPrefix);
    }

这样,我们就完成了整个观察者模式。

总结

优点:

    1. 解耦:观察者模式可以解耦观察者和被观察者之间的约束关系。观察者不需要知道有多少观察者对象以及它们的细节。
    1. 增加灵活性:可以在程序运行时增加观察者,也可以删除观察者。
    1. 支持广播通信:被观察者可以向多个观察者对象广播通知。

缺点:

    1. 过度通知:如果观察者过多,被观察者的一个变化会引起大量观察者对象的更新,可能会产生过度通知的问题。
    1. 时序问题:当一个观察者触发另一个观察者的更新时,会产生时序问题。
    1. 增加系统复杂性:观察者模式会引入较多的抽象对象和关系,增加系统的复杂性。

  对于第一点缺点,可以采用推送模型和拉取模型进行优化。对于第二点缺点,可以对观察者进行分类,分级触发。

  总体来说,观察者模式的优点远远大于缺点。它实现了低耦合,高内聚的设计原则,有效地解决了主题对象与观察者对象之间的通信问题,提高了系统的灵活性和扩展性。

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。