关键词:
普通消息为 RocketMQ 中最基础的消息,支持生产者和消费者的异步解耦通信。
一、普通消息的生命周期
1、初始化
消息被生产者构建并完成初始化,待发送到服务端的状态。
2、待消费
消息被发送到服务端,对消费者可见,等待消费者消费的状态。
3、消费中
消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,RocketMQ会对消息进行重试处理。
4、消费提交
消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。 RocketMQ默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。
5、消息删除
RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。
二、普通消息的发送方式
RocketMQ提供了三种发送消息的模式,分别为同步发送消息、异步发送消息、单向发送消息。
public enum CommunicationMode SYNC, // 同步 ASYNC, // 异步 ONEWAY; // 单向 private CommunicationMode()
1、同步发送
同步发送是指消息发送方发出数据后,同步等待,直到收到接收方发回响应之后才发下一个请求。
2、异步发送
消息发送方在发送了一条消息后,不等接收方发回响应,接着进行第二条消息发送。发送方通过回调接口的方式接收服务器响应,并对响应结果进行处理。
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
3、单向发送
单向(Oneway)发送特点为发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。
单向发送方式发送消息的过程耗时非常短,一般在微秒级别。不需要关心发送结果的场景,例如日志发送。
三、普通消息的消费方式
// 消息消费模式 public enum MessageModel // 广播 BROADCASTING("BROADCASTING"), // 负载均衡 CLUSTERING("CLUSTERING"); private String modeCN; private MessageModel(String modeCN) this.modeCN = modeCN; public String getModeCN() return this.modeCN;
在实际的开发过程中,使用consumer的setMessageModel()方法,指定消费方式。
1、负载均衡消费模式 - CLUSTERING
消费者采用负载均衡方式消费消息,一个分组(Group)下的多个消费者共同消费队列消息,每个消费者处理的消息不同。
一个Consumer Group中的各个Consumer实例分摊去消费消息,即一条消息只会投递到一个Consumer Group下面的一个实例。
例如某个Topic有3个队列,其中一个Consumer Group 有 3 个实例,那么每个实例只消费其中的1个队列。集群消费模式是消费者默认的消费方式。
2、广播消费模式 - BROADCASTING
广播消费模式中消息将对一个Consumer Group下的各个Consumer实例都投递一遍。即使这些 Consumer属于同一个Consumer Group,消息也会被Consumer Group 中的每个Consumer都消费一次。
实际上,是一个消费组下的每个消费者实例都获取到了topic下面的每个Message Queue去拉取消费。所以消息会投递到每个消费者实例。
3、消费模式特点
1、负载均衡消费模式
消费端集群化部署,每条消息只需要被处理一次;
每一条消息都只会被分发到一台机器上处理;
不保证每一次失败重投的消息路由到同一台机器上。
2、集群消费模式
每条消息都需要被相同逻辑的多台机器处理;
消息队列 RocketMQ 保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投;
客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过;
广播消费模式下不支持重置消费位点,广播模式下服务端不维护消费进度,所以消息队列 RocketMQ 控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能;
广播消费模式下不支持顺序消息。
四、普通消息的示例demo
工具类详见:RocketMQ笔记(六):示例代码工具类。
1、消息发送封装类 SDKSendMsg
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.RequestCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.util.Objects; /** * @Description: 发送消息方式 */ public class SDKSendMsg // 默认组 private final static String DEFAULT_GROUP_NAME = "test-group"; // 测试地址 private final static String DEFAULT_NAMESRV_ADDR = "192.168.33.55:9876"; // 默认Topic public final static String DEFAULT_TOPIC = "TestTopic"; // 同步消息的标签与键 public final static String DEFAULT_SYNC_MSG_TAG = "SYNC_TAG"; public final static String DEFAULT_SYNC_MSG_KEY = "SYNC_KEY"; // 异步消息的标签与键 public final static String DEFAULT_ASYNC_MSG_TAG = "ASYNC_TAG"; public final static String DEFAULT_ASYNC_MSG_KEY = "ASYNC_KEY"; // 单向 public final static String DEFAULT_ONEWAY_MSG_TAG = "ONEWAY_TAG"; public final static String DEFAULT_ONEWAY_MSG_KEY = "ONEWAY_KEY"; // 声明生产者 private DefaultMQProducer producer; public SDKSendMsg() // 启动生产者实例 try // 实例化生产者组名称 DefaultMQProducer producer = new DefaultMQProducer(DEFAULT_GROUP_NAME); // 指定name server地址 producer.setNamesrvAddr(DEFAULT_NAMESRV_ADDR); this.producer = producer; this.producer.start(); catch (MQClientException e) e.printStackTrace(); /** * 关闭生产者实例 */ public void shutdownProducer() this.producer.shutdown(); /** * 同步发送消息, 使用默认超时时间 * @param topic 主题 * @param msgTag 消息标签 * @param msgKey 消息key * @param msgBody 消息内容 * @return */ public SendResult syncSendMsg(String topic, String msgTag, String msgKey, String msgBody) return syncSendMsg(topic, msgTag, msgKey, msgBody, null); /** * 同步发送消息, 使用指定的超时时间 * @param topic 主题 * @param msgTag 消息标签 * @param msgKey 消息key * @param msgBody 消息内容 * @param timeout 超时时间 * @return */ public SendResult syncSendMsg(String topic, String msgTag, String msgKey, String msgBody, Long timeout) System.out.println("发送同步消息: " + msgBody); try Message msg = new Message(topic, msgTag, msgKey, msgBody.getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult send; if (Objects.isNull(timeout)) send = this.producer.send(msg); System.out.printf("消息发送结果:%s%n", send); return send; send = this.producer.send(msg, timeout); System.out.printf("消息发送结果:%s%n", send); return send; catch (Exception e) e.printStackTrace(); return null; /** * 异步发送消息,使用默认的回调处理 * @param topic 主题 * @param msgTag 消息标签 * @param msgKey 消息key * @param msgBody 消息内容 * @param timeout 超时时间 * @return */ public Message asynSendMsg(String topic, String msgTag, String msgKey, String msgBody, Long timeout) System.out.println("发送异步消息: " + msgBody); try Message msg = new Message(topic, msgTag, msgKey, msgBody.getBytes(RemotingHelper.DEFAULT_CHARSET)); return this.producer.request(msg, timeout); catch (Exception e) e.printStackTrace(); return null; /** * 异步发送消息,自定义回调处理 * @param topic 主题 * @param msgTag 消息标签 * @param msgKey 消息key * @param msgBody 消息内容 * @param timeout 超时时间 * @param callback 回调处理 */ public void asynSendMsg(String topic, String msgTag, String msgKey, String msgBody, Long timeout, RequestCallback callback) System.out.println("发送异步消息: " + msgBody); try Message msg = new Message(topic, msgTag, msgKey, msgBody.getBytes(RemotingHelper.DEFAULT_CHARSET)); this.producer.request(msg, callback, timeout); catch (Exception e) e.printStackTrace(); /** * 发送单向消息 * @param topic * @param msgTag * @param msgKey * @param msgBody */ public void sendOneWay(String topic, String msgTag, String msgKey, String msgBody) System.out.println("发送单向消息: " + msgBody); try Message msg = new Message(topic, msgTag, msgKey, msgBody.getBytes(RemotingHelper.DEFAULT_CHARSET)); this.producer.sendOneway(msg); catch (Exception e) e.printStackTrace();
2、生产者 ProducerMsg
import org.apache.rocketmq.client.producer.RequestCallback; import org.apache.rocketmq.common.message.Message; /** * @Description: 发送消息 */ public class ProducerMsg public static void main(String[] args) // 创建消息发送实例 SDKSendMsg sdkSendMsg = new SDKSendMsg(); // 同步消息发送 String syncMsg = "同步消息 -- " + 0; sdkSendMsg.syncSendMsg(SDKSendMsg.DEFAULT_TOPIC, SDKSendMsg.DEFAULT_SYNC_MSG_TAG, SDKSendMsg.DEFAULT_SYNC_MSG_KEY, syncMsg); // 异步消息发送 String asynMsg = "异步消息 --" + 1; sdkSendMsg.asynSendMsg(SDKSendMsg.DEFAULT_TOPIC, SDKSendMsg.DEFAULT_ASYNC_MSG_TAG, SDKSendMsg.DEFAULT_ASYNC_MSG_KEY, asynMsg, 3000l, new RequestCallback() @Override public void onSuccess(Message message) System.out.println("异步消息发送成功"); // TODO 业务数据状态更新 @Override public void onException(Throwable throwable) System.out.println("异步消息发送失败"); // TODO 业务状态回滚 ); // 单向消息发送 String oneWayMsg = "单向消息 --" + 2; sdkSendMsg.sendOneWay(SDKSendMsg.DEFAULT_TOPIC, SDKSendMsg.DEFAULT_ONEWAY_MSG_TAG, SDKSendMsg.DEFAULT_ONEWAY_MSG_KEY, oneWayMsg); // 销毁生产者实例 sdkSendMsg.shutdownProducer();
3、消费者 ProducerMsg
import com.snails.rmq.common.RMQConstant; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import java.io.UnsupportedEncodingException; import java.util.List; /** * @Description: RocketMQ并发消费 */ public class ConsumerMsg public static void main(String[] args) throws MQClientException // 实例化消费者组名称 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RMQConstant.TEST_GROUP); // 指定name server地址 consumer.setNamesrvAddr(RMQConstant.NAEMSRV_ADDR); // 订阅至少一个主题以供消费 consumer.subscribe(RMQConstant.TEST_TOPIC, "*"); // 负载均衡消费模式 consumer.setMessageModel(MessageModel.CLUSTERING); // 广播消费模式 // consumer.setMessageModel(MessageModel.BROADCASTING); // 注册回调,处理从服务端获取的消息 consumer.registerMessageListener(new MessageListenerConcurrently() @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) for (MessageExt msg : msgs) System.out.println("当前消息的KEY: " + msg.getKeys()); try if (SDKSendMsg.DEFAULT_SYNC_MSG_KEY.equals(msg.getKeys())) System.out.println(String.format("线程%s,接收同步消息:%s", Thread.currentThread().getName(), new String(msg.getBody(), "UTF-8"))); else if (SDKSendMsg.DEFAULT_ASYNC_MSG_KEY.equals(msg.getKeys())) System.out.println(String.format("线程%s,接收异步消息:%s", Thread.currentThread().getName(), new String(msg.getBody(), "UTF-8"))); else if (SDKSendMsg.DEFAULT_ONEWAY_MSG_KEY.equals(msg.getKeys())) System.out.println(String.format("线程%s,接收单向消息:%s", Thread.currentThread().getName(), new String(msg.getBody(), "UTF-8"))); catch (UnsupportedEncodingException e) // TODO 补偿机制 System.out.println(e.getMessage()); // 消费消息确认 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; ); // 启动消费者实例 consumer.start(); System.out.printf("消费者已启动.%n");
rocketmq(十三)普通消息(代码片段)
1、消息发送分类Producer对于消息的发送方式也有多种选择,不同的方式会产生不同的系统效果。1.1、同步发送消息同步发送消息是指,Producer发出⼀条消息后,会在收到MQ返回的ACK之后才发下⼀条消息。该方式的消息... 查看详情
深入理解rocketmq普通消息和顺序消息使用,原理,优化(代码片段)
...以这方面为主。这次打压的过程中收获比较的大的是,对RocketMq的一些优化。最开始我们公司使用的是RabbitMq,在一些流量高峰的场景下,发现队列堆积比较严重,导致RabbitMq挂了。为了应对这个场景,最终我们引入了阿里云的Rocke... 查看详情
springboot实战项目整合阿里云rocketmq消息队列实现发送普通消息,延时消息(代码片段)
原文地址:Springboot实战项目整合阿里云RocketMQ消息队列实现发送普通消息,延时消息--附代码-学不会丶-博客园一.为什么选择RocketMQ消息队列?(可跳过看三的整合代码实例)首先RocketMQ是阿里巴巴自研出来的... 查看详情
rocketmq笔记:顺序消息(代码片段)
...是可以按照消息的发送顺序来消费(FIFO)。 顺序消息是RocketMQ提供的一种消息类型,支持消费者按照发送消息的先后顺序获取消息。顺序消息在发送、存储和投递的处理过程中,强调多条消息间的先后顺序关系。RocketMQ顺序消... 查看详情
rocketmq入门笔记(代码片段)
消息队列经典场景优点异步原来的下单场景只是用户支付即可结束,现在需要发送成功短信,给用户增加积分,订阅物流信息等等,这就使得用户的下单时间大大加长,这样就可以使用消息队列,把各个动作发到消息队列,每个服务再去... 查看详情
rocketmq广播消息(代码片段)
发布与模式实现。广播就是向一个主题的所有订阅者发送同一条消息。在发送消息的时候和普通的消息并与不同之处,只是在消费端做一些配置即可。Consumer消息消费publicclassBroadcastConsumerpublicstaticvoidmain(String[]args)throwsMQClientExcept... 查看详情
关于rocketmq的基础api操作——这一篇就够了(代码片段)
关于RocketMQ的基础操作一、基础API操作1、普通消息1.1、消息生产端1.2、消息消费端2、顺序消息2.1、消息生产端2.2、消息消费端3、广播消息3.1、消息生产端3.2、消息消费端4、延迟消息4.1、消息生产端4.2、消息消费端4.3、实现原理... 查看详情
springboot实战项目整合阿里云rocketmq消息队列实现发送普通消息,延时消息(代码片段)
原文地址:Springboot实战项目整合阿里云RocketMQ消息队列实现发送普通消息,延时消息--附代码-学不会丶-博客园一.为什么选择RocketMQ消息队列?(可跳过看三的整合代码实例)首先RocketMQ是阿里巴巴自研出来的... 查看详情
rocketmq——ordermessage(顺序消息)(代码片段)
生产者端消费者端运行效果补充RocketMQ提供了3种模式的Producer:NormalProducer(普通)、OrderProducer(顺序)、TransactionProducer(事务),对应的分别是普通消息、顺序消息和事务消息。在前面的博客当中,涉及的都是NormalProducer,调... 查看详情
day368&369.rocketmq应用-rocketmq(代码片段)
RocketMQ应用一、普通消息1、消息发送分类Producer对于消息的发送方式也有多种选择,不同的方式会产生不同的系统效果。同步发送消息同步发送消息是指,Producer发出⼀条消息后,会在收到MQ返回的ACK之后才发下⼀条消... 查看详情
rocketmq使用(代码片段)
rocketmq 基本使用可以看官网和官网给的demo.https://github.com/apache/rocketmq/tree/master/example这里主要说明几个点:rocketmq 发送类型常用:1,普通消息.(可以获取发送结果,失败了重试)2,有序消息.(秒杀,等需要有序的消费场景)3,事... 查看详情
用rocketmq这么久,才知道消息可以这样玩(代码片段)
前言在上一章节中,我们讲解了RocketMQ的基本介绍,作为MQ最重要的就是消息的使用了,今天我们就来带大家如何玩转MQ的消息。消息中间件,英文MessageQueue,简称MQ。它没有标准定义,一般认为:消息... 查看详情
rocketmq学习笔记:消息发送模式(代码片段)
这是本人学习的总结,主要学习资料如下马士兵教育目录1、消息发送模式2、消息消费模式3、顺序消息的消费和发送3.1、全局顺序3.2、部分顺序3.3、部分顺序代码样例3.3.1、依赖3.3.2、发送信息3.3.3、接受信息4、延时消息的消... 查看详情
rocketmq支持事务消息机制(代码片段)
事务消费我们经常支付宝转账余额宝,这是日常生活的一件普通小事,但是我们思考支付宝扣除转账的钱之后,如果系统挂掉怎么办,这时余额宝账户并没有增加相应的金额,数据就会出现不一致状况了。上... 查看详情
rocketmq源码—十rocketmq顺序消息(代码片段)
RocketMQ本身支持顺序消息,在使用上发送顺序消息和非顺序消息有所区别发送顺序消息SendResultsendResult=producer.send(msg,newMessageQueueSelector()@OverridepublicMessageQueueselect(List<MessageQueue>mqs,Messagemsg,Objectarg)Integeri 查看详情
rocketmq源码—十rocketmq顺序消息(代码片段)
RocketMQ本身支持顺序消息,在使用上发送顺序消息和非顺序消息有所区别发送顺序消息SendResultsendResult=producer.send(msg,newMessageQueueSelector()@OverridepublicMessageQueueselect(List<MessageQueue>mqs,Messagemsg,Objectarg)Integeri 查看详情
rocketmq源码—八rocketmq消息重试(代码片段)
RocketMQ的消息重试包含了producer发送消息的重试和consumer消息消费的重试。producer发送消息重试producer在发送消息的时候如果发送失败了,RocketMQ会自动重试。privateSendResultsendDefaultImpl(Messagemsg,finalCommunicationModecommunicationMode,finalSendCal... 查看详情
rocketmq有序消息(代码片段)
RocketMQ提供的顺序消费消息实现是使用的FIFO先进先出算法Producer消息发送publicclassProducerpublicstaticvoidmain(String[]args)throwsUnsupportedEncodingExceptiontryMQProducerproducer=newDefaultMQProducer("please_rename_unique_ 查看详情