您现在的位置是:首页 >技术交流 >Idea+maven+springboot项目搭建系列--1 整合Rocketmq网站首页技术交流

Idea+maven+springboot项目搭建系列--1 整合Rocketmq

拽着尾巴的鱼儿 2024-07-04 12:01:02
简介Idea+maven+springboot项目搭建系列--1 整合Rocketmq

前言:本文以maven+springboot 整合Rocketmq 完成消息的发送和接收。

1 Rocketmq 介绍:

1.1 Rocketmq 特性:
Apache RocketMQ是一款快速、可靠的分布式消息传递和流处理平台,具有可扩展性和高性能。它是一个分布式的、去中心化的消息队列,具有以下特性:

  • 分布式:RocketMQ允许将消息存储在多个Broker上并支持水平扩展,可以通过增加更多的Broker来扩展存储能力和吞吐量。

  • 异步传输:RocketMQ采用异步传输方式来提高性能,它的异步传输机制利用了Linux内核底层的零拷贝技术,从而实现了高吞吐量和低延迟。

  • 可靠性:RocketMQ采用了复制和故障转移机制来保证消息的可靠性。它可以配置多副本(通常是3个副本)来存储消息,当有一个Broker宕机时,系统可以自动将消息路由到其他副本上。

  • 灵活性:RocketMQ支持多种消息模式,包括点对点模式、发布/订阅模式和事务消息模式。它还支持多种消息协议,包括JMS、OpenMessaging和MQTT等。

  • 易于使用:RocketMQ使用简单,提供了丰富的客户端API和管理工具,使得开发人员可以快速地集成和使用它。

RocketMQ是一个非常优秀的分布式消息传递平台,能够帮助开发人员实现高性能、可靠的消息传递和流处理。它在互联网公司、金融机构和其他大型企业中广泛使用。

1.2 Rocketmq 主要组件:
Rocketmq 是一种基于发布-订阅(Pub/Sub)消息范式,消息的发送者(称为发布者、生产者、Producer)会将消息直接发送给特定的接收者(称为订阅者、消费者、Comsumer)。而RocketMQ的基础消息模型就是一个简单的Pub/Sub模型。
在这里插入图片描述
RocketMQ主要由以下几个组件组成:

  • Nameserver:Nameserver是RocketMQ中的重要组件之一,它充当了命名服务和路由服务的角色。当Producer和Consumer要发送或者接收消息时,它们需要向NameServer请求获取Broker的信息,然后才能和Broker进行通信。Nameserver的作用类似于DNS服务器,用来维护RocketMQ中各个Broker的地址信息。
  • Broker:Broker是RocketMQ中的消息存储和传输核心组件。所有的消息都存储在Broker中,Producer向Broker发送消息,Consumer从Broker中订阅和接收消息。Broker的作用是接收、存储和转发消息,确保消息的可靠性和可扩展性。
  • Producer:Producer是创造和发送消息的客户端应用程序,它通过调用API将消息发送到Broker中。Producer可以按照不同的消息模式发送消息,包括点对点模式、发布/订阅模式和事务消息模式等。
  • Consumer:Consumer是接收和处理消息的客户端应用程序。它通过从Broker中订阅和消费消息来实现消息的处理。Consumer可以按照不同的消息模式消费消息,包括点对点模式、发布/订阅模式和事务消息模式等。
  • Message:Message是RocketMQ中最基本的消息单元,它包含了消息的内容和一些元数据,例如消息ID、消息主题、消息标签等。Producer将消息发送到Broker中,Consumer从Broker中订阅和接收消息。

Producer (生产者)和Consumer(消费者),一个向topic 发送消息,一个向topic 读取消息,消息的基本单元由Message 承接;
一般的消息组件对于消息的存储分发都只有一个组件处理,RocketMQ 中却使用了Nameserver和Broker 两个组件,那么这两个组件的关系是什么呢:
为了方便理解,这里使用图书馆进行类比:

  • 首先图书馆里存储了海量的图书,这些图书并不是杂乱无章的进行堆叠,而是按照一定的类型完成了分类存放;比如新闻类,医学类,生物类,文学类 等等,每种不同的分类下都有海量的图书;如果把每本图书看做是具体的一个个消息,那么图书的分类就是不同的topic;
  • 对于每种分类,为了统计的方便有可能需要为其在划分小类,如生物类,可以被划分为 植物类,动物类 等等,对于每个大类如果可以看做是topic ,那么大类下划分的小类就可以看做是 不同的 tag分类;
  • 显然每一种topic/tag分类的图书并不是杂乱无章的存放,而是会被整齐的放入到一排排的书架上,一排排的书架就可以看做是分区下的队列;
  • 显然书架作为了书籍最终的存放位置,那么可以将图书馆的书架看做是Broker,用户来借书和还书,最终都要来到书架上拿书和放书;
  • 显然图书馆里的书籍不仅需要分类存放,每层的图书管理人员,还需要熟悉自己负责楼层的书籍的位置信息,以及需要对书籍的维护;如果来借书的人需要的图书不在本楼层,图书管理人员也需要为其提供书籍正确的楼层位置信息,显然每层的图书管理人员,都需要掌握每层楼的图书信息,并且必要情况下,需要有可以顶替其他楼层管理员的能力;
  • 显然在rocketmq 中 ,Nameserver 的角色就和 每层的图书管理员相似;当每个用户来到本楼层还书(生产消息),楼层管理人员,需要告知还书的用户这本书,需要被正确归还的位置(消息的路由),从而帮助用户更好的还书;
  • 当用户来借书(消费消息),楼层管理人员,需要告知用户,想要的书籍正确楼层及详细位置信息(消息的路由);
  • 图书管理员怎么知道各个图书的分类以及位置信息,就需要不时的在自己的系统里动态维护数据的信息,以便于更好的服务借书的还书的人;
  • 显然rocketmq 中 最终存放数据的broker 组件需要和Nameserver 进行不时的交互,这样Nameserver 就可以实时的知晓数据的信息,当生产者投递消息时,先向Nameserver询问自己要投递的位置信息,然后在将数据进行投递到broker;当消费者消费消息时,也先向Nameserver询问自己想要消费数据的位置信息,然后在向具体的broker 获取消息;

2 springboot 整合:

2.1 引入jar:

<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.2</version>
</dependency>

2.2 配置rocketmq:

# name-server地址
rocketmq.name-server=localhost:9876
# 配置消费组
rocketmq.producer.group=test-group
rocketmq.producer.send-message-timeout=30000
# 设置日志级别
logging.level.root=debug

2.3 生产者 消息发送工具类:



import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.stream.Collectors;

/**
 * 生产者
 */
@Slf4j
@Component
public class RocketMQProducer {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Value("${rocketmq.producer.send-message-timeout}")
    private Integer messageTimeOut;


    /**
     * 发送普通消息
     *
     * @param topic
     * @param tag
     * @param msgBody
     */
    public void sendMsg(String topic, String tag, Object msgBody) {
        if (StringUtils.isNotBlank(tag)) {
            topic = topic.concat(":") + tag;
        }
        rocketMQTemplate.convertAndSend(topic, msgBody);
    }
    /**
     * 发送同步消息(阻塞当前线程,等待broker响应发送结果,这样不太容易丢失消息)
     * sendResult为返回的发送结果
     */
    public <T> SendResult sendMsg(String topic, T msg) {
        Message<T> message = MessageBuilder.withPayload(msg).build();
        SendResult sendResult = rocketMQTemplate.syncSend(topic, message);
        log.info("【sendMsg】sendResult={}", JSON.toJSONString(sendResult));
        return sendResult;
    }


    /**
     * 发送异步消息
     *
     * @param topic
     * @param tag
     * @param msgBody
     * @param callback
     */
    public void sendAsyncMsg(String topic, String tag, Object msgBody, SendCallback callback) {
        if (StringUtils.isNotBlank(tag)) {
            topic = topic.concat(":") + tag;
        }
        rocketMQTemplate.asyncSend(topic, MessageBuilder.withPayload(msgBody).build(), callback);
    }
    /**
     * 发送异步消息
     *
     * @param topic         消息Topic
     * @param message       消息实体
     * @param sendCallback  回调函数
     * @param timeout       超时时间
     */
    public void asyncSend(String topic, Message<?> message, SendCallback sendCallback, long timeout) {
        rocketMQTemplate.asyncSend(topic, message, sendCallback, timeout);
    }

    /**
     * 发送延时消息
     * 在start版本中 延时消息一共分为18个等级分别为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
     *
     * @param topic
     * @param tag
     * @param msgBody
     * @param timeout
     * @param delayLevel 值的有效范围1至18
     */
    public void sendDelayMsg(String topic, String tag, Object msgBody, Long timeout, Integer delayLevel) {
        if (StringUtils.isNotBlank(tag)) {
            topic = topic.concat(":") + tag;
        }
        if (timeout != null) {
            messageTimeOut = timeout.intValue();
        }
        rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build(), messageTimeOut, delayLevel);
    }

    /**
     * 发送异步延迟消息
     *
     * @param topic        消息Topic
     * @param message      消息实体
     * @param sendCallback 回调函数
     * @param timeout      超时时间
     * @param delayLevel   延迟消息的级别
     */
    public void asyncSendDelay(String topic, Message<?> message, SendCallback sendCallback, long timeout, int delayLevel) {
        rocketMQTemplate.asyncSend(topic, message, sendCallback, timeout, delayLevel);
    }


    /**
     * 发送异步延迟消息
     *
     * @param topic      消息Topic
     * @param message    消息实体
     * @param timeout    超时时间
     * @param delayLevel 延迟消息的级别
     */
    public void asyncSendDelay(String topic, Message<?> message, long timeout, int delayLevel) {
        rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("topic:{}消息---发送MQ成功---", topic);
            }

            @Override
            public void onException(Throwable throwable) {
                log.error("topic:{}消息---发送MQ失败 ex:{}---", topic, throwable.getMessage());
            }
        }, timeout, delayLevel);
    }

    /**
     * 单向消息
     * 特点为只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答
     * 此方式发送消息的过程耗时非常短,一般在微秒级别
     * 应用场景:适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集
     *
     * @param topic 消息主题
     * @param msg   消息体
     * @param <T>   消息泛型
     */
    public <T> void sendOneWayMsg(String topic, T msg) {
        Message<T> message = MessageBuilder.withPayload(msg).build();
        rocketMQTemplate.sendOneWay(topic, message);
    }

    /**
     * 发送批量消息
     *
     * @param topic   消息主题
     * @param msgList 消息体集合
     * @param <T>     消息泛型
     * @return
     */
    public <T> SendResult asyncSendBatch(String topic, List<T> msgList) {
        List<Message<T>> messageList = msgList.stream()
                .map(msg -> MessageBuilder.withPayload(msg).build()).collect(Collectors.toList());
        return rocketMQTemplate.syncSend(topic, messageList);
    }
    /**
     * 发送顺序消息
     *
     * @param topic     消息主题
     * @param msg       消息体
     * @param hashKey   确定消息发送到哪个队列中
     * @param <T>       消息泛型
     */
    public <T> void syncSendOrderly(String topic, T msg, String hashKey) {
        Message<T> message = MessageBuilder.withPayload(msg).build();
        log.info("发送顺序消息,topic:{}, hashKey:{}", topic, hashKey);
        rocketMQTemplate.syncSendOrderly(topic, message, hashKey);
    }


    /**
     * 发送顺序消息
     *
     * @param topic     消息主题
     * @param msg       消息体
     * @param hashKey   确定消息发送到哪个队列中
     * @param timeout   超时时间
     */
    public <T> void syncSendOrderly(String topic, T msg, String hashKey, long timeout) {
        Message<T> message = MessageBuilder.withPayload(msg).build();
        log.info("发送顺序消息,topic:{}, hashKey:{}, timeout:{}", topic, hashKey, timeout);
        rocketMQTemplate.syncSendOrderly(topic, message, hashKey, timeout);
    }

}

2.4 消费者:


import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "consumer_test",
        topic = "test_topic",
        selectorExpression = "*")
public class RocketMqConsumerTest implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt message) {
        byte[] body = message.getBody();
        String msg = new String(body);
        log.debug("监听到消息:message:{}", msg);
    }
}

2.5 测试消息发送:


import com.example.springrocket.config.RocketMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class SpringRocketApplicationTests {
    @Autowired
    private RocketMQProducer rocketMQProducer;

    @Test
    void contextLoads() {
    }
    @Test
    void sendMQMessage(){
        SendResult sendResult = rocketMQProducer.sendMsg("test_topic","hello test 123");
        System.out.println(sendResult);
    }


}

消息获取:
在这里插入图片描述

3 整合遇到的问题参考:

3.1 提示RocketMQTemplate bean 没有被找到:

  • 检查nameServer 和Broker 服务,是否正常启动;
  • 检查10911,10909,10912 端口是否正常暴露;
  • 检查生产者的group 分组是否配置:rocketmq.producer.group
  • 如果springboot 的版本为3.x 则可以降低2.x 的版本,因为3.x 的版本不会进行rocketmq 的自动装配;

3.2 如果提示xxx.xx.xx.xx:10911 连接失败或者决绝:

  • 检查broker 的启动配置文件broker.conf 的brokerIP1 是否为公网ip 如果不是,则需要修改为公网ip;

4 参考:

4.1 Apache RocketMQ

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。