rocketmq笔记:普通消息(代码片段)

无虑的小猪 无虑的小猪     2023-05-05     164

关键词:

  普通消息为 RocketMQ 中最基础的消息,支持生产者和消费者的异步解耦通信。

一、普通消息的生命周期

 

1、初始化

  消息被生产者构建并完成初始化,待发送到服务端的状态。

2、待消费

  消息被发送到服务端,对消费者可见,等待消费者消费的状态。

3、消费中

  消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,RocketMQ会对消息进行重试处理。

4、消费提交

  消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。 RocketMQ默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。

5、消息删除

  RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。

二、普通消息的发送方式

  RocketMQ提供了三种发送消息的模式,分别为同步发送消息、异步发送消息、单向发送消息。

 public enum CommunicationMode 
     SYNC,  // 同步
     ASYNC,  // 异步
     ONEWAY;  // 单向
 
     private CommunicationMode() 
     
 

1、同步发送

  同步发送是指消息发送方发出数据后,同步等待,直到收到接收方发回响应之后才发下一个请求。

  

2、异步发送

  消息发送方在发送了一条消息后,不等接收方发回响应,接着进行第二条消息发送。发送方通过回调接口的方式接收服务器响应,并对响应结果进行处理。

  异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。

  

3、单向发送

  单向(Oneway)发送特点为发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。

  单向发送方式发送消息的过程耗时非常短,一般在微秒级别。不需要关心发送结果的场景,例如日志发送。

  

三、普通消息的消费方式

 // 消息消费模式
 public enum MessageModel 
     // 广播
     BROADCASTING("BROADCASTING"),
     // 负载均衡
     CLUSTERING("CLUSTERING");
     private String modeCN;
     private MessageModel(String modeCN) 
         this.modeCN = modeCN;
     
     public String getModeCN() 
         return this.modeCN;
     
 

  在实际的开发过程中,使用consumer的setMessageModel()方法,指定消费方式。

1、负载均衡消费模式 - CLUSTERING

  消费者采用负载均衡方式消费消息,一个分组(Group)下的多个消费者共同消费队列消息,每个消费者处理的消息不同。

  一个Consumer Group中的各个Consumer实例分摊去消费消息,即一条消息只会投递到一个Consumer Group下面的一个实例。

  

  例如某个Topic有3个队列,其中一个Consumer Group 有 3 个实例,那么每个实例只消费其中的1个队列。集群消费模式是消费者默认的消费方式。

2、广播消费模式 - BROADCASTING

  广播消费模式中消息将对一个Consumer Group下的各个Consumer实例都投递一遍。即使这些 Consumer属于同一个Consumer Group,消息也会被Consumer Group 中的每个Consumer都消费一次。

 

  

  实际上,是一个消费组下的每个消费者实例都获取到了topic下面的每个Message Queue去拉取消费。所以消息会投递到每个消费者实例。

3、消费模式特点

1、负载均衡消费模式

  消费端集群化部署,每条消息只需要被处理一次;

  每一条消息都只会被分发到一台机器上处理;

  不保证每一次失败重投的消息路由到同一台机器上。

2、集群消费模式

  每条消息都需要被相同逻辑的多台机器处理;

  消息队列 RocketMQ 保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投;

  客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过;

  广播消费模式下不支持重置消费位点,广播模式下服务端不维护消费进度,所以消息队列 RocketMQ 控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能;

  广播消费模式下不支持顺序消息。

 

 

四、普通消息的示例demo

  工具类详见:RocketMQ笔记(六):示例代码工具类

1、消息发送封装类 SDKSendMsg

 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.RequestCallback;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import java.util.Objects;
 
 /**
  * @Description: 发送消息方式
  */
 public class SDKSendMsg 
     // 默认组
     private final static String DEFAULT_GROUP_NAME = "test-group";
     // 测试地址
     private final static String DEFAULT_NAMESRV_ADDR = "192.168.33.55:9876";
     // 默认Topic
     public final static String DEFAULT_TOPIC = "TestTopic";
     // 同步消息的标签与键
     public final static String DEFAULT_SYNC_MSG_TAG = "SYNC_TAG";
     public final static String DEFAULT_SYNC_MSG_KEY = "SYNC_KEY";
     // 异步消息的标签与键
     public final static String DEFAULT_ASYNC_MSG_TAG = "ASYNC_TAG";
     public final static String DEFAULT_ASYNC_MSG_KEY = "ASYNC_KEY";
     // 单向
     public final static String DEFAULT_ONEWAY_MSG_TAG = "ONEWAY_TAG";
     public final static String DEFAULT_ONEWAY_MSG_KEY = "ONEWAY_KEY";
 
     // 声明生产者
     private DefaultMQProducer producer;
 
     public SDKSendMsg() 
         // 启动生产者实例
         try 
             // 实例化生产者组名称
             DefaultMQProducer producer = new DefaultMQProducer(DEFAULT_GROUP_NAME);
             // 指定name server地址
             producer.setNamesrvAddr(DEFAULT_NAMESRV_ADDR);
             this.producer = producer;
             this.producer.start();
          catch (MQClientException e) 
             e.printStackTrace();
         
     
 
     /**
      * 关闭生产者实例
      */
     public void shutdownProducer() 
         this.producer.shutdown();
     
 
 
     /**
      * 同步发送消息, 使用默认超时时间
      * @param topic  主题
      * @param msgTag 消息标签
      * @param msgKey 消息key
      * @param msgBody 消息内容
      * @return
      */
     public SendResult syncSendMsg(String topic, String msgTag, String msgKey, String msgBody) 
         return syncSendMsg(topic, msgTag, msgKey, msgBody, null);
     
 
     /**
      * 同步发送消息, 使用指定的超时时间
      * @param topic  主题
      * @param msgTag 消息标签
      * @param msgKey 消息key
      * @param msgBody 消息内容
      * @param timeout 超时时间
      * @return
      */
     public SendResult syncSendMsg(String topic, String msgTag, String msgKey, String msgBody, Long timeout) 
         System.out.println("发送同步消息: " + msgBody);
         try 
             Message msg = new Message(topic, msgTag, msgKey, msgBody.getBytes(RemotingHelper.DEFAULT_CHARSET));
             SendResult send;
             if (Objects.isNull(timeout)) 
                 send = this.producer.send(msg);
                 System.out.printf("消息发送结果:%s%n", send);
                 return send;
             
             send = this.producer.send(msg, timeout);
             System.out.printf("消息发送结果:%s%n", send);
             return send;
          catch (Exception e) 
             e.printStackTrace();
         
         return null;
     
 
     /**
      * 异步发送消息,使用默认的回调处理
      * @param topic  主题
      * @param msgTag 消息标签
      * @param msgKey 消息key
      * @param msgBody 消息内容
      * @param timeout 超时时间
      * @return
      */
     public Message asynSendMsg(String topic, String msgTag, String msgKey, String msgBody, Long timeout) 
         System.out.println("发送异步消息: " + msgBody);
         try 
             Message msg = new Message(topic, msgTag, msgKey, msgBody.getBytes(RemotingHelper.DEFAULT_CHARSET));
             return this.producer.request(msg, timeout);
          catch (Exception e) 
             e.printStackTrace();
         
         return null;
     
 
     /**
      * 异步发送消息,自定义回调处理
      * @param topic  主题
      * @param msgTag 消息标签
      * @param msgKey 消息key
      * @param msgBody 消息内容
      * @param timeout 超时时间
      * @param callback 回调处理
      */
     public void asynSendMsg(String topic, String msgTag, String msgKey, String msgBody, Long timeout, RequestCallback callback) 
         System.out.println("发送异步消息: " + msgBody);
         try 
             Message msg = new Message(topic, msgTag, msgKey, msgBody.getBytes(RemotingHelper.DEFAULT_CHARSET));
             this.producer.request(msg, callback, timeout);
          catch (Exception e) 
             e.printStackTrace();
         
     
 
     /**
      * 发送单向消息
      * @param topic
      * @param msgTag
      * @param msgKey
      * @param msgBody
      */
     public void sendOneWay(String topic, String msgTag, String msgKey, String msgBody) 
         System.out.println("发送单向消息: " + msgBody);
         try 
             Message msg = new Message(topic, msgTag, msgKey, msgBody.getBytes(RemotingHelper.DEFAULT_CHARSET));
             this.producer.sendOneway(msg);
          catch (Exception e) 
             e.printStackTrace();
         
     
 

2、生产者 ProducerMsg

 import org.apache.rocketmq.client.producer.RequestCallback;
 import org.apache.rocketmq.common.message.Message;
 /**
  * @Description: 发送消息
  */
 public class ProducerMsg 
     public static void main(String[] args) 
         // 创建消息发送实例
         SDKSendMsg sdkSendMsg = new SDKSendMsg();
 
         // 同步消息发送
         String syncMsg = "同步消息 -- " + 0;
         sdkSendMsg.syncSendMsg(SDKSendMsg.DEFAULT_TOPIC, SDKSendMsg.DEFAULT_SYNC_MSG_TAG,
                 SDKSendMsg.DEFAULT_SYNC_MSG_KEY, syncMsg);
 
         // 异步消息发送
         String asynMsg = "异步消息 --" + 1;
         sdkSendMsg.asynSendMsg(SDKSendMsg.DEFAULT_TOPIC, SDKSendMsg.DEFAULT_ASYNC_MSG_TAG,
                 SDKSendMsg.DEFAULT_ASYNC_MSG_KEY, asynMsg, 3000l, new RequestCallback() 
                     @Override
                     public void onSuccess(Message message) 
                         System.out.println("异步消息发送成功");
                         // TODO 业务数据状态更新
                     
                     @Override
                     public void onException(Throwable throwable) 
                         System.out.println("异步消息发送失败");
                         // TODO 业务状态回滚
                     
                 );
 
         // 单向消息发送
         String oneWayMsg = "单向消息 --" + 2;
         sdkSendMsg.sendOneWay(SDKSendMsg.DEFAULT_TOPIC, SDKSendMsg.DEFAULT_ONEWAY_MSG_TAG,
                 SDKSendMsg.DEFAULT_ONEWAY_MSG_KEY, oneWayMsg);
 
         // 销毁生产者实例
         sdkSendMsg.shutdownProducer();
     
 

3、消费者 ProducerMsg

 import com.snails.rmq.common.RMQConstant;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 
 import java.io.UnsupportedEncodingException;
 import java.util.List;
 
 /**
  * @Description: RocketMQ并发消费
  */
 public class ConsumerMsg 
     public static void main(String[] args) throws MQClientException 
 
         // 实例化消费者组名称
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RMQConstant.TEST_GROUP);
         // 指定name server地址
         consumer.setNamesrvAddr(RMQConstant.NAEMSRV_ADDR);
         // 订阅至少一个主题以供消费
         consumer.subscribe(RMQConstant.TEST_TOPIC, "*");
         // 负载均衡消费模式
         consumer.setMessageModel(MessageModel.CLUSTERING);
         // 广播消费模式
 //        consumer.setMessageModel(MessageModel.BROADCASTING);
         //  注册回调,处理从服务端获取的消息
         consumer.registerMessageListener(new MessageListenerConcurrently() 
             @Override
             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) 
                 for (MessageExt msg : msgs) 
                     System.out.println("当前消息的KEY: " + msg.getKeys());
                     try 
                         if (SDKSendMsg.DEFAULT_SYNC_MSG_KEY.equals(msg.getKeys())) 
                             System.out.println(String.format("线程%s,接收同步消息:%s", Thread.currentThread().getName(), new String(msg.getBody(), "UTF-8")));
                          else if (SDKSendMsg.DEFAULT_ASYNC_MSG_KEY.equals(msg.getKeys())) 
                             System.out.println(String.format("线程%s,接收异步消息:%s", Thread.currentThread().getName(), new String(msg.getBody(), "UTF-8")));
                         else if (SDKSendMsg.DEFAULT_ONEWAY_MSG_KEY.equals(msg.getKeys())) 
                             System.out.println(String.format("线程%s,接收单向消息:%s", Thread.currentThread().getName(), new String(msg.getBody(), "UTF-8")));
                         
                      catch (UnsupportedEncodingException e) 
                         // TODO 补偿机制
                         System.out.println(e.getMessage());
                     
                 
                 // 消费消息确认
                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
             
 
         );
         // 启动消费者实例
         consumer.start();
         System.out.printf("消费者已启动.%n");
     
 

 

rocketmq(十三)普通消息(代码片段)

1、消息发送分类Producer对于消息的发送方式也有多种选择,不同的方式会产生不同的系统效果。1.1、同步发送消息同步发送消息是指,Producer发出⼀条消息后,会在收到MQ返回的ACK之后才发下⼀条消息。该方式的消息... 查看详情

深入理解rocketmq普通消息和顺序消息使用,原理,优化(代码片段)

...以这方面为主。这次打压的过程中收获比较的大的是,对RocketMq的一些优化。最开始我们公司使用的是RabbitMq,在一些流量高峰的场景下,发现队列堆积比较严重,导致RabbitMq挂了。为了应对这个场景,最终我们引入了阿里云的Rocke... 查看详情

springboot实战项目整合阿里云rocketmq消息队列实现发送普通消息,延时消息(代码片段)

原文地址:Springboot实战项目整合阿里云RocketMQ消息队列实现发送普通消息,延时消息--附代码-学不会丶-博客园一.为什么选择RocketMQ消息队列?(可跳过看三的整合代码实例)首先RocketMQ是阿里巴巴自研出来的&#... 查看详情

rocketmq笔记:顺序消息(代码片段)

...是可以按照消息的发送顺序来消费(FIFO)。  顺序消息是RocketMQ提供的一种消息类型,支持消费者按照发送消息的先后顺序获取消息。顺序消息在发送、存储和投递的处理过程中,强调多条消息间的先后顺序关系。RocketMQ顺序消... 查看详情

rocketmq入门笔记(代码片段)

消息队列经典场景优点异步原来的下单场景只是用户支付即可结束,现在需要发送成功短信,给用户增加积分,订阅物流信息等等,这就使得用户的下单时间大大加长,这样就可以使用消息队列,把各个动作发到消息队列,每个服务再去... 查看详情

rocketmq广播消息(代码片段)

发布与模式实现。广播就是向一个主题的所有订阅者发送同一条消息。在发送消息的时候和普通的消息并与不同之处,只是在消费端做一些配置即可。Consumer消息消费publicclassBroadcastConsumerpublicstaticvoidmain(String[]args)throwsMQClientExcept... 查看详情

关于rocketmq的基础api操作——这一篇就够了(代码片段)

关于RocketMQ的基础操作一、基础API操作1、普通消息1.1、消息生产端1.2、消息消费端2、顺序消息2.1、消息生产端2.2、消息消费端3、广播消息3.1、消息生产端3.2、消息消费端4、延迟消息4.1、消息生产端4.2、消息消费端4.3、实现原理... 查看详情

springboot实战项目整合阿里云rocketmq消息队列实现发送普通消息,延时消息(代码片段)

原文地址:Springboot实战项目整合阿里云RocketMQ消息队列实现发送普通消息,延时消息--附代码-学不会丶-博客园一.为什么选择RocketMQ消息队列?(可跳过看三的整合代码实例)首先RocketMQ是阿里巴巴自研出来的&#... 查看详情

rocketmq——ordermessage(顺序消息)(代码片段)

生产者端消费者端运行效果补充RocketMQ提供了3种模式的Producer:NormalProducer(普通)、OrderProducer(顺序)、TransactionProducer(事务),对应的分别是普通消息、顺序消息和事务消息。在前面的博客当中,涉及的都是NormalProducer,调... 查看详情

day368&369.rocketmq应用-rocketmq(代码片段)

RocketMQ应用一、普通消息1、消息发送分类Producer对于消息的发送方式也有多种选择,不同的方式会产生不同的系统效果。同步发送消息同步发送消息是指,Producer发出⼀条消息后,会在收到MQ返回的ACK之后才发下⼀条消... 查看详情

rocketmq使用(代码片段)

rocketmq 基本使用可以看官网和官网给的demo.https://github.com/apache/rocketmq/tree/master/example这里主要说明几个点:rocketmq  发送类型常用:1,普通消息.(可以获取发送结果,失败了重试)2,有序消息.(秒杀,等需要有序的消费场景)3,事... 查看详情

用rocketmq这么久,才知道消息可以这样玩(代码片段)

前言在上一章节中,我们讲解了RocketMQ的基本介绍,作为MQ最重要的就是消息的使用了,今天我们就来带大家如何玩转MQ的消息。消息中间件,英文MessageQueue,简称MQ。它没有标准定义,一般认为:消息... 查看详情

rocketmq学习笔记:消息发送模式(代码片段)

这是本人学习的总结,主要学习资料如下马士兵教育目录1、消息发送模式2、消息消费模式3、顺序消息的消费和发送3.1、全局顺序3.2、部分顺序3.3、部分顺序代码样例3.3.1、依赖3.3.2、发送信息3.3.3、接受信息4、延时消息的消... 查看详情

rocketmq支持事务消息机制(代码片段)

事务消费我们经常支付宝转账余额宝,这是日常生活的一件普通小事,但是我们思考支付宝扣除转账的钱之后,如果系统挂掉怎么办,这时余额宝账户并没有增加相应的金额,数据就会出现不一致状况了。上... 查看详情

rocketmq源码—十rocketmq顺序消息(代码片段)

RocketMQ本身支持顺序消息,在使用上发送顺序消息和非顺序消息有所区别发送顺序消息SendResultsendResult=producer.send(msg,newMessageQueueSelector()@OverridepublicMessageQueueselect(List<MessageQueue>mqs,Messagemsg,Objectarg)Integeri 查看详情

rocketmq源码—十rocketmq顺序消息(代码片段)

RocketMQ本身支持顺序消息,在使用上发送顺序消息和非顺序消息有所区别发送顺序消息SendResultsendResult=producer.send(msg,newMessageQueueSelector()@OverridepublicMessageQueueselect(List<MessageQueue>mqs,Messagemsg,Objectarg)Integeri 查看详情

rocketmq源码—八rocketmq消息重试(代码片段)

RocketMQ的消息重试包含了producer发送消息的重试和consumer消息消费的重试。producer发送消息重试producer在发送消息的时候如果发送失败了,RocketMQ会自动重试。privateSendResultsendDefaultImpl(Messagemsg,finalCommunicationModecommunicationMode,finalSendCal... 查看详情

rocketmq有序消息(代码片段)

RocketMQ提供的顺序消费消息实现是使用的FIFO先进先出算法Producer消息发送publicclassProducerpublicstaticvoidmain(String[]args)throwsUnsupportedEncodingExceptiontryMQProducerproducer=newDefaultMQProducer("please_rename_unique_ 查看详情