您现在的位置是:首页 >技术杂谈 >基于ActiveMQ搭建MQTT服务备忘(二):webapp集成网站首页技术杂谈
基于ActiveMQ搭建MQTT服务备忘(二):webapp集成
(1)为什么写这个话题(Why)
读万卷书不如行千里路。这次搭建MQTT服务,遇到了一些误解,特此记录备忘。
主要包括:
(1)服务(Broker)的账户管理与网页管理平台的账户
(2)与web应用的集成(Spring系)
(2)ActiveMQ版本选择
因为JAVA环境是JDK 8,所以按兼容性考虑选择了ActiveMQ 5.15的最后版本5.15.15。
如果你是JDK 11则可考虑ActiveMQ的最新版本5.17或5.18。
ActiveMQ支持MQTT v3.1.1 and v3.1。
(3)ActiveMQ与web应用的集成
主要介绍与Spring系的webapp集成(SpringBoot 和 Spring MVC)。
SpringBoot 与 Spring MVC显著区别在于Bean生成方式:前者是代码化(通过@Configuration和@Bean注解),后者是配置化(通过xml文件标签<bean>定义)。
本文以 Spring MVC 为例,遇到的问题有:
(1)组件的选择
(2)publisher与subscriber的共存
(3.1)组件的选择
一般的介绍中,集成组件包含3个:spring-integration-core、spring-integration-stream和 spring-integration-mqtt。
而实际上,除了spring-integration-mqtt是必选,spring-integration-core可以依赖下载(但建议还是必选),而spring-integration-stream可能仅仅用于流式(字节流?)消息的场景下(对于文本消息则可选)。
那么【pom.xml】文件片段如下:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
<version>${ver.mqtt}</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>${ver.mqtt}</version>
<exclusions><!-- 排除旧版组件 -->
<exclusion>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
</exclusion>
</exclusions>
</dependency>
......
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
其中Spring 与 mqtt集成组件的版本定义(【pom.xml】文件中)如下:
<ver.spring>4.3.30.RELEASE</ver.spring>
<ver.mqtt>4.3.24.RELEASE</ver.mqtt>
(3.2)publisher与subscriber的共存
实际上,publisher与subscriber的共存非常普遍:服务器端既会作为publisher发送数据请求,又作为subscriber接收终端发送的数据。
共存的关键是publisher不能与subscriber使用共同的clientId。
否则发布消息的时候会遇到异常:
ERROR MqttPahoMessageDrivenChannelAdapter:330 - Lost connection: 已断开连接; retrying...
ERROR MqttPahoMessageHandler:217 - Lost connection; will attempt reconnect on next request
解决方式就是设置不同的clientId(稍加区分)即可。【mqtt.xml】片段如下:
<!-- 消息处理者(publisher) -->
<bean id="mqttHandler" class="org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler">
<constructor-arg name="clientId" value="${mqtt.clientId.publisher}"/>
...
</bean>
<!-- 消息适配器(subscriber) -->
<int-mqtt:message-driven-channel-adapter id="mqttInbound" client-id="${mqtt.clientId.subscriber}"
... />
注意:clientId的长度不能超过23个字符,这是MQTT协议约束。
(4)结语
使用ActiveMQ作为MQTT服务是一个不错的选择。
如何安全、合理地使用服务总觉得是一个有意义的话题。