您现在的位置是:首页 >学无止境 >RocketMQ的学习历程(4)----消息处理 (1)网站首页学无止境
RocketMQ的学习历程(4)----消息处理 (1)
简介RocketMQ的学习历程(4)----消息处理 (1)
1.消费者消费模式:
1.1:push模式:
push模式是指消息中间件主动地将消息推送给消费者,可以实现较高的实时性,但是在消费者处理能力较弱时,可能会导致消息堆积和缓冲区溢出。
1.2:pull模式:
pull模式是指消费者主动向消息中间件拉取消息,需要自己维护偏移量和拉取频率,具有较高的灵活性,但是实时性较低,可能会产生消息延迟和网络开销。
1.3:长轮询模式:
RocketMQ的长轮询是一种实现消息实时推送的技术,它的原理是消费者向Broker发送拉取请求,如果Broker有数据则返回,消费者端再次拉取。如果Broker端没有数据,不立即返回,而是等待一段时间(默认5s),直到有新消息或者超时才返回响应信息并关闭连接。RocketMQ使用了长轮询机制来优化消费者端的拉取效率和实时性。
2.生产者消息生产(发送):
下面代码仅写了生产者,消费者同理
2.1.同步消息:
- 同步发送是指消息发送方发出一条消息后,会在收到服务端同步响应之后才发下一条消息的通讯方式。
- 同步发送的优点是可靠性高,适合重要且对响应时间不敏感的场景,如金融、电商等。
- 同步发送的缺点是效率低,因为每次发送都要等待服务端的确认。
实例代码
@Test
public void SyncMessage() throws MQBrokerException, RemotingException, InterruptedException, MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("SyncMessage");
//指定NameServer地址
producer.setNamesrvAddr(ip);
//启动生产者
producer.start();
//创建消息对象,指定主题、标签和消息体
Message msg = new Message("RMQ_SYS_TRANS_HALF_TOPIC", "Tag1", ("RMQ_SYS_TRANS_HALF_TOPIC").getBytes());
//发送同步消息
SendResult result = producer.send(msg);
//打印发送结果
System.out.println(result);
//关闭生产者
producer.shutdown();
}
2.2.异步消息:
- 异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
- 生产者发送消息时指定消息发送成功及发送异常的回调方法,调用 API 后立即返回,生产者发送消息线程不阻塞。
- 异步消息的发送速度比同步消息快,但是由于异步消息不会等待Broker的响应,所以可能会出现消息丢失的情况。
@Test
public void AsyncMessage() throws MQClientException, RemotingException, InterruptedException {
//创建生产者对象,指定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1");
//指定NameServer地址
producer.setNamesrvAddr(ip);
//启动生产者
producer.start();
//创建消息对象,指定主题、标签和消息体
Message msg = new Message("RMQ_SYS_TRANS_HALF_TOPIC", "Tag2", ("Hello RocketMQ").getBytes());
//发送异步消息,指定回调函数
producer.send(msg, new SendCallback() {
public void onSuccess(SendResult sendResult) {
System.out.println("这是回调的函数:" + sendResult);
}
public void onException(Throwable e) {
System.out.println("发送异常:" + e);
}
});
//关闭生产者
producer.shutdown();
}
2.3.单项消息:
- 无需等待响应:发送方在发送单项消息后,不需要等待消息服务器的响应,即可继续执行后续逻辑。发送方无法获取消息的发送结果。
- 低延迟:由于无需等待响应,单项消息的发送过程相对较快,可以实现低延迟的消息发送。
- 不保证可靠性:单项消息发送模式不保证消息的可靠性投递,即消息可能会在发送过程中丢失。这是因为单项消息发送模式下,消息发送方无法获得消息的发送结果,也无法得知消息是否成功到达消息服务器。
- 单项消息通常用于对消息的可靠性要求较低的场景,例如日志记录、统计数据上报等。
@Test
public void OnewayMessage() throws MQClientException, RemotingException, InterruptedException {
// 创建生产者实例
DefaultMQProducer producer = new DefaultMQProducer("group1");
// 设置NameServer地址(多个地址用分号隔开)
producer.setNamesrvAddr(ip);
// 启动生产者实例
producer.start();
// 创建消息实例,指定Topic、Tag和消息内容
Message message = new Message("RMQ_SYS_TRANS_HALF_TOPIC", "tag3", "Hello, RocketMQ!".getBytes());
// 发送单向消息,不等待服务器的响应
producer.sendOneway(message);
System.out.println("消息发送成功");
// 关闭生产者实例
producer.shutdown();
}
2.4.延迟消息:
- 支持定时消息的状态持久化存储,即使系统由于故障重启后,仍支持按照原来设置的定时时间触发消息投递。但是,若存储系统异常重启,可能会导致定时消息投递出现一定延迟。
- 延迟消息相比普通消息只不过是在broker多了一层消息topic的转换,对于消息的发送和消费和普通消息没有什么差异。
- 使用场景:订单支付提醒:用户下了一个订单之后,需要在指定时间内(例如30分钟)进行支付,在到期之前可以发送一个消息提醒用户进行支付。电商秒杀:在电商秒杀活动中,为了防止瞬间大量请求导致系统崩溃,可以使用延迟消息来控制请求的流量。消息重试:在消息发送失败后,可以使用延迟消息来进行重试。
@Test
public void delayMessage() throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
// 创建生产者实例
DefaultMQProducer producer = new DefaultMQProducer("group");
// 设置NameServer地址
producer.setNamesrvAddr(ip);
// 启动生产者
producer.start();
// 创建消息实例
Message message = new Message("RMQ_SYS_TRANS_HALF_TOPIC", "Hello, RocketMQ!".getBytes());
// 设置消息的延迟级别,单位为秒,这里设置为5秒
message.setDelayTimeLevel(5);
// 发送消息
producer.send(message);
// 关闭生产者
producer.shutdown();
}
2.5.顺序消息:
- 顺序消息是指按照发送顺序和消费顺序保证消息的有序性的消息类型。在分布式系统中,通常会有多个消息生产者并发发送消息,多个消息消费者并发接收和处理消息。在某些应用场景中,保证消息按照特定的顺序被消费是非常重要的,例如订单处理、交易流程等。
消费需要得到两个保证
- 发送顺序保证:消息生产者按照一定的顺序发送消息,确保后续消息的发送顺序符合预期。通常,消息生产者会根据某种关键字或者标识来对消息进行排序,然后按照顺序发送到消息队列中。
- 消费顺序保证:消息消费者按照与发送顺序相同的顺序接收和处理消息,保证消息的有序性。这意味着,同一个消费者线程会依次处理消息,而不会跳跃或并发处理不同顺序的消息。
消息中间件提供的支持:
- 生产者发送消息时,将消息按照顺序发送到同一个消息队列或者同一个主题的特定分区中。
- 消费者按照顺序从消息队列中读取消息,并确保消息按照顺序进行处理。
- 消息队列或消息中间件需要提供支持,确保消息在发送和消费过程中的有序性。
@Test
public void OrderedMessageProducer() throws MQClientException {
// 创建生产者实例
DefaultMQProducer producer = new DefaultMQProducer("example_group");
// 设置NameServer地址
producer.setNamesrvAddr(ip);
// 启动生产者
producer.start();
try {
// 创建主题
String topic = "OrderedMessage";
// 创建主题时指定有序消息的选项
producer.createTopic(topic, "OrderedMessageTopic", 8);
// 发送顺序消息
for (int i = 0; i < 10; i++) {
// 创建消息实例
Message message = new Message(topic, "Hello, RocketMQ!".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 设置消息的顺序关键字,可以是订单ID等标识
message.setKeys(String.valueOf(i % 5));
// 发送消息
SendResult result = producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int index = (int) arg % mqs.size();
return mqs.get(index);
}
}, i);
// 判断消息发送结果
if (result.getSendStatus() == SendStatus.SEND_OK) {
System.out.println("Message sent successfully. OrderKey: " + message.getKeys());
} else {
System.out.println("Failed to send message. OrderKey: " + message.getKeys());
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭生产者
producer.shutdown();
}
}
3.消息控制:
3.1.消息重复消费:
重复消费问题可能出现的原因:
- 当RocketMQ成功接收到消息,并将消息交给消费者处理,如果消费者消费完成后还没来得及提交offset给RocketMQ。
- 自己宕机或者重启了,那么RocketMQ没有接收到offset,就会认为消费失败了,会重发消息给消费者再次消费。
解决思路:通过设置唯一字段,自行排除是否进行过消费,可以采用redis,或者mysql进行冲突判断,若冲突则跳过,反之可以消费。
注意:不要使用msgid进行唯一消息判断,即便同一内容的消息,也很有可能msgid不同(官方建议自行判断)
@Test
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
String msgId = msg.getMsgId();
// 检查消息日志中是否存在相同ID的消息
if (checkMessageLog(msgId)) {
// 如果存在,则证明这条消息已经被处理过了
continue;
}
// 处理消息
// ...
// 保存消息日志
saveMessageLog(msgId);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
3.2.失败消息重试:
当消息发送失败时,可以配置RocketMQ自动进行消息重试,以提高消息发送的可靠性。
在发送失败的情况下,可以启用RocketMQ的重试机制。重试机制可以根据预先配置的参数进行消息重试,例如设置重试次数、重试间隔等。
public void retryMessage(Message message, int currentRetryCount) {
if (currentRetryCount >= maxRetryCount) {
// 达到最大重试次数,将消息标记为失败或发送到死信队列
markMessageAsFailed(message);
return;
}
// 设置下次重试的时间
long nextRetryTime = System.currentTimeMillis() + retryInterval;
// 使用定时任务或延迟队列,在下次重试时间时重新发送消息
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
try {
sendMessage(message);
} catch (Exception e) {
// 发送失败,进行下一次重试
retryMessage(message, currentRetryCount + 1);
}
}
}, nextRetryTime);
}
public void markMessageAsFailed(Message message) {
// 在消息的属性中记录重试次数或使用专门的记录表来跟踪消息的重试情况
int retryCount = message.getProperty("retryCount", 0);
retryCount++;
message.putProperty("retryCount", retryCount);
// 可以在这里将消息标记为失败,例如记录日志或发送到死信队列
System.out.println("Message failed to process after " + retryCount + " retries: " + message);
}
3.3.死信消息:
死信消息(Dead Letter Message),简称DLQ消息,是指由于某些原因无法被正确处理或消费的消息。当消息无法被正常消费时,通常会被发送到死信队列(DLQ)中进行处理。
以下是一些可能导致消息成为死信消息的情况:
- 消息消费失败:当消息被消费者处理时发生异常或错误,消费者无法正常处理该消息。例如,消费者在处理消息时发生了错误,抛出了异常,导致消息消费失败。
- 消息超时:如果消息在一定的时间内无法被消费者处理,就可能被认为是超时消息,被发送到死信队列。超时时间可以根据具体的业务需求和配置进行设置。
- 消息重试次数超限:在消息传递的过程中,如果消息发送失败或消费失败,可以进行消息重试。然而,如果消息在达到预设的重试次数后仍然无法被成功消费,就会被认为是死信消息。
死信消息的基本处理思路:
- 日志记录:可以将死信消息的相关信息记录在日志中,以便后续的故障排查和分析。
- 人工处理:针对死信消息,可以通过人工干预的方式进行处理。例如,通过监控系统或管理界面,手动重新发送消息或进行其他处理。
- 分析原因:通过分析死信消息的产生原因,可以找出导致消息失败的根本问题,并进行相应的修复。
- 转发或延迟处理:对于某些特殊情况下的死信消息,可以将其转发给其他的消费者进行处理,或者延迟一段时间后再次尝试消费。
风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。