rocketmq(十七)批量消息(代码片段)

爱是与世界平行 爱是与世界平行     2022-12-08     357

关键词:

1、批量发送消息

1.1、发送限制

生产者进行消息发送时可以一次发送多条消息,这可以大大提升Producer的发送效率。不过需要注意以下几点:

  • 批量发送的消息必须具有相同的Topic
  • 批量发送的消息必须具有相同的刷盘策略
  • 批量发送的消息不能是延时消息与事务消息

1.2、批量发送大小

默认情况下,一批发送的消息总大小不能超过4MB字节。如果想超出该值,有两种解决方案:

  • 方案一:将批量消息进行拆分,拆分为若干不大于4M的消息集合分多次批量发送
  • 方案二:在Producer端与Broker端修改属性
    • Producer端需要在发送之前设置Producer的maxMessageSize属性
    • Broker端需要修改其加载的配置文件中的maxMessageSize属性

1.3、生产者发送的消息大小

生产者通过send()方法发送的Message,并不是直接将Message序列化后发送到网络上的,而是通过这个Message生成了一个字符串发送出去的。这个字符串由四部分构成:Topic、消息Body、消息日志(占20字节),及用于描述消息的一堆属性key-value。这些属性中包含例如生产者地址、生产时间、要发送的QueueId等。最终写入到Broker中消息单元中的数据都是来自于这些属性。

2、批量消费消息

2.1、修改批量属性

Consumer的MessageListenerConcurrently监听接口的consumeMessage()方法的第一个参数为消息列表,但默认情况下每次只能消费一条消息。若要使其一次可以消费多条消息,则可以通过修改Consumer的consumeMessageBatchMaxSize属性来指定。不过,该值不能超过32。因为默认情况下消费者每次可以拉取的消息最多是32条。若要修改一次拉取的最大值,则可通过修改Consumer的pullBatchSize属性来指定。

2.2、存在的问题

Consumer的pullBatchSize属性与consumeMessageBatchMaxSize属性是否设置的越大越好?当然不是。

  • pullBatchSize值设置的越大,Consumer每拉取一次需要的时间就会越长,且在网络上传输出现问题的可能性就越高。若在拉取过程中若出现了问题,那么本批次所有消息都需要全部重新拉取。
  • consumeMessageBatchMaxSize值设置的越大,Consumer的消息并发消费能力越低,且这批被消费的消息具有相同的消费结果。因为consumeMessageBatchMaxSize指定的一批消息只会使用一个线程进行处理,且在处理过程中只要有一个消息处理异常,则这批消息需要全部重新再次消费处理。

3、代码举例

该批量发送的需求是,不修改最大发送4M的默认值,但要防止发送的批量消息超出4M的限制。

3.1、定义消息列表分割器

import org.apache.rocketmq.common.message.Message;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
// 消息列表分割器:其只会处理每条消息的大小不超4M的情况。
// 若存在某条消息,其本身大小大于4M,这个分割器无法处理,
// 其直接将这条消息构成一个子列表返回。并没有再进行分割
public class MessageListSplitter implements Iterator<List<Message>> 
    // 指定极限值为4M
    private final int SIZE_LIMIT = 4 * 1024 * 1024;
    // 存放所有要发送的消息
    private final List<Message> messages;
    // 要进行批量发送消息的小集合起始索引
    private int currIndex;
    public MessageListSplitter(List<Message> messages) 
        this.messages = messages;
    
    @Override
    public boolean hasNext() 
        // 判断当前开始遍历的消息索引要小于消息总数
        return currIndex < messages.size();
    
    @Override
    public List<Message> next() 
        int nextIndex = currIndex;
        // 记录当前要发送的这一小批次消息列表的大小
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) 
            // 获取当前遍历的消息
            Message message = messages.get(nextIndex);
            // 统计当前遍历的message的大小
            int tmpSize = message.getTopic().length() + message.getBody().length;
            Map<String, String> properties = message.getProperties();
            for (Map.Entry<String, String> entry :
                    properties.entrySet()) 
                tmpSize += entry.getKey().length() + entry.getValue().length();
            
            tmpSize = tmpSize + 20;
            // 判断当前消息本身是否大于4M
            if (tmpSize > SIZE_LIMIT) 
                if (nextIndex - currIndex == 0) 
                    nextIndex++;
                
                break;
            
            if (tmpSize + totalSize > SIZE_LIMIT) 
                break;
             else 
                totalSize += tmpSize;
            
         // end-for
        // 获取当前messages列表的子集合[currIndex, nextIndex)
        List<Message> subList = messages.subList(currIndex, nextIndex);
        // 下次遍历的开始索引
        currIndex = nextIndex;
        return subList;
    

3.2、定义批量消息生产者

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import java.util.ArrayList;
import java.util.List;
public class BatchProducer 
    public static void main(String[] args) throws Exception 
        DefaultMQProducer producer = new DefaultMQProducer("pg");
        producer.setNamesrvAddr("rocketmqOS:9876");
        // 指定要发送的消息的最大大小,默认是4M
        // 不过,仅修改该属性是不行的,还需要同时修改broker加载的配置文件中的
        // maxMessageSize属性
        // producer.setMaxMessageSize(8 * 1024 * 1024);
        producer.start();
        // 定义要发送的消息集合
        List<Message> messages = new ArrayList<>();
        for (int i = 0; i < 100; i++) 
            byte[] body = ("Hi," + i).getBytes();
            Message msg = new Message("someTopic", "someTag", body);
            messages.add(msg);
        
        // 定义消息列表分割器,将消息列表分割为多个不超出4M大小的小列表
        MessageListSplitter splitter = new MessageListSplitter(messages);
        while (splitter.hasNext()) 
            try 
                List<Message> listItem = splitter.next();
                producer.send(listItem);
             catch (Exception e) 
                e.printStackTrace();
            
        
        producer.shutdown();
    

3.3、定义批量消息消费者

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class BatchConsumer 
    public static void main(String[] args) throws MQClientException 
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
        consumer.setNamesrvAddr("rocketmqOS:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET
        );
        consumer.subscribe("someTopicA", "*");
        // 指定每次可以消费10条消息,默认为1
        consumer.setConsumeMessageBatchMaxSize(10);
        // 指定每次可以从Broker拉取40条消息,默认为32
        consumer.setPullBatchSize(40);
        consumer.registerMessageListener(new MessageListenerConcurrently() 
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) 
                for (MessageExt msg : msgs) 
                    System.out.println(msg);
                
                // 消费成功的返回结果
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                // 消费异常时的返回结果
                // return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            
        );
        consumer.start();
        System.out.println("Consumer Started");
    

六.rocketmq极简入门-rocketmq消息批量发送(代码片段)

使用场景如果消息过多,每次发送消息都和MQ建立连接,无疑是一种性能开销,批量消息可以把消息打包批量发送,批量发送消息能显著提高传递小消息的性能。批量消息概述批量发送消息能显著提高传递小消息... 查看详情

rocketmq系列批量发送与过滤(代码片段)

今天我们再来看看RocketMQ的另外两个小功能,消息的批量发送和过滤。这两个小功能提升了我们使用RocketMQ的效率。批量发送以前我们发送消息的时候,都是一个一个的发送,这样效率比较低下。能不能一次发送多个消息呢?当... 查看详情

rocketmq使用批量消息(代码片段)

目录说明生产端消费端遗留问题说明批量发送消息条件:1、发送到同一个topic2、等待同一个发送结果3、不允许使用定时消息4、不支持半事务特性5、同一批消息大小不能超过1MB,否则需要自己进行切割发送批量消息,... 查看详情

rocketmq(06)——消息的批量发送和消费(代码片段)

...的body的总体积不能超过4MB,否则会得到异常——org.apache.rocketmq.client.exce 查看详情

rocketmq之批处理消息(代码片段)

批量消息发送批量消息发送能够提高发送效率,提升系统吞吐量。同一批批量消息的topic、waitStoreMsgOK属性必须保持一致,批量消息不支持延迟消息。批量消息发送一次最多可以发送4MiB的消息,但是如果需要发送更大... 查看详情

rocketmq之批处理消息(代码片段)

批量消息发送批量消息发送能够提高发送效率,提升系统吞吐量。同一批批量消息的topic、waitStoreMsgOK属性必须保持一致,批量消息不支持延迟消息。批量消息发送一次最多可以发送4MiB的消息,但是如果需要发送更大... 查看详情

rocketmq(06)——消息的批量发送和消费(代码片段)

消息的批量发送和消费发送生产者进行消息发送时可以一次发送多条消息,这对于体积比较小的消息而言会大大改善性能,可以想象原本发送10条消息需要传递10次,现在只需要传递一次。这可以通过调用send()时传递... 查看详情

rocketmq-quickstart(批量消费)(代码片段)

一、专业术语Producer    消费生产者,负责产生消息,一般由业务系统负责产生消息Consumer    消息消费者,负责消费消息,一般是后台系统负责异步消费PushConsumer    Consumer的一种,应用通常向Consumer对象注册一个L... 查看详情

rocketmq使用批量消息(代码片段)

...blicvoidsendMessage()throwsExceptionDefaultMQProducerdefaultMQProducer=RocketMqUtil.getDefaultMQProducer();Messagemessage1=newMessage(RocketMqUtil.TOPIC,"batch-1","batch-message-1".getBytes(Charset.forName("UTF-8")));Messagemessage2=newMessage(RocketMqUtil.TOPIC,&... 查看详情

rocketmq之批处理消息(代码片段)

批量消息发送批量消息发送能够提高发送效率,提升系统吞吐量。同一批批量消息的topic、waitStoreMsgOK属性必须保持一致,批量消息不支持延迟消息。批量消息发送一次最多可以发送4MiB的消息,但是如果需要发送更大... 查看详情

rocketmq详解系列(代码片段)

什么是RocketMQRocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。主要功能是异步解耦和流量削峰:。常见的MQ主要有:ActiveMQ、RabbitMQ、Kafka、Ro... 查看详情

rocketmq详解系列(代码片段)

什么是RocketMQRocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。主要功能是异步解耦和流量削峰:。常见的MQ主要有:ActiveMQ、RabbitMQ、Kafka、Ro... 查看详情

rocketmq详解系列(代码片段)

什么是RocketMQRocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。主要功能是异步解耦和流量削峰:。常见的MQ主要有:ActiveMQ、RabbitMQ、Kafka、Ro... 查看详情

三.rocketmq极简入门-rocketmq普通消息发送(代码片段)

前言RocketMQ已经写了两章了,一章是RocketMQ认识和安装,一章是RocketMQ的工作流程和核心概念,本章我们开始使用RocketMQ来发送和接收消息。RocketMQ的消息种类非常多,比如:普通消息,顺序消息,延迟消... 查看详情

rocketmq使用事务消息(代码片段)

目录说明原理事务消息处理流程生产端消费端说明事务消息:1、不支持延时消息和批量消息2、如果消息没有及时提交,默认check15次,可以通过Broker的transactionCheckMax参数配置次数。如果超时15次依然没有得到明确结果... 查看详情

关于rocketmq的基础api操作——这一篇就够了(代码片段)

关于RocketMQ的基础操作一、基础API操作1、普通消息1.1、消息生产端1.2、消息消费端2、顺序消息2.1、消息生产端2.2、消息消费端3、广播消息3.1、消息生产端3.2、消息消费端4、延迟消息4.1、消息生产端4.2、消息消费端4.3、实现原理... 查看详情

消息中间件-rocketmq详解(从软件安装到案例实现)(代码片段)

RocketMQ内容一、RocketMQ安装二、RocketMQ作用和结构1.RocketMQ特点2RocketMQ执行流程3.RocketMQ作用3.1消息中间件结构图3.2应用解耦3.3削峰填谷4.rocketmq组成部分5.rocketmq基本概念模型三、生产消息的类型有三种四、消费模式有两种五、延时... 查看详情

rocketmq—生产者客户端详解(代码片段)

...ACK与重试机制,消息的顺序生产,批量发送等。RocketMQ在具备这些特性的同时,有自己独有的特性。下面我们对RocketMQ的生产者开展讲解。一、消息发送1.同步发送消息同步发送是指消息发送方发出数据后,同步等... 查看详情