rocketmq使用批量消息(代码片段)

乐观男孩 乐观男孩     2022-12-11     177

关键词:

目录

说明

批量发送消息条件:
1、发送到同一个topic
2、等待同一个发送结果
3、不允许使用定时消息
4、不支持半事务特性
5、同一批消息大小不能超过1MB,否则需要自己进行切割

发送批量消息,最主要的区别是在发送消息的send方法入参一个List。

生产端

@Test
    public void sendMessage() throws Exception 
        DefaultMQProducer defaultMQProducer = RocketMqUtil.getDefaultMQProducer();
        Message message1 = new Message(RocketMqUtil.TOPIC, "batch-1", "batch-message-1".getBytes(Charset.forName("UTF-8")));
        Message message2 = new Message(RocketMqUtil.TOPIC, "batch-2", "batch-message-2".getBytes(Charset.forName("UTF-8")));
        Message message3 = new Message(RocketMqUtil.TOPIC, "batch-3", "batch-message-3".getBytes(Charset.forName("UTF-8")));
        //批量发送消息
        List<Message> messages = Lists.newArrayList(message1, message2, message3);
        SendResult sendResult = defaultMQProducer.send(messages);
        log.info("发送消息结果:", sendResult.getSendStatus().name());
        defaultMQProducer.shutdown();
    

消费端

@Test
    public void consumer() throws Exception 
        DefaultMQPushConsumer defaultMQPushConsumer = RocketMqUtil.getDefaultMQPushConsumer();
        defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        defaultMQPushConsumer.subscribe(RocketMqUtil.TOPIC, "*");
        defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() 
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) 
                msgs.stream().map(MessageExt::getBody).map(String::new).forEach(System.out::println);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            
        );
        defaultMQPushConsumer.start();
        Thread.sleep(5000L);
        defaultMQPushConsumer.shutdown();
    

遗留问题

本地测试发送批量消息后,在rocketmq-console上看不到消息的Tag,在命令行看到的Tag为:[tagCLUSTERDefaultCluster],其中的原因暂未深入研究,后期有时间再看吧,如果哪位兄弟知道原因也请留言哦~

六.rocketmq极简入门-rocketmq消息批量发送(代码片段)

使用场景如果消息过多,每次发送消息都和MQ建立连接,无疑是一种性能开销,批量消息可以把消息打包批量发送,批量发送消息能显著提高传递小消息的性能。批量消息概述批量发送消息能显著提高传递小消息... 查看详情

rocketmq使用批量消息(代码片段)

...blicvoidsendMessage()throwsExceptionDefaultMQProducerdefaultMQProducer=RocketMqUtil.getDefaultMQProducer();Messagemessage1=newMessage(RocketMqUtil.TOPIC,"batch-1","batch-message-1".getBytes(Charset.forName("UTF-8")));Messagemessage2=newMessage(RocketMqUtil.TOPIC,&... 查看详情

rocketmq批量消息投递(代码片段)

批量发送消息可提高传递小消息的性能。同时也需要满足以下特征批量消息要求必要具有同一topic、相同消息配置不支持延时消息建议一个批量消息最好不要超过1MB大小示例小于1MBStringtopic="BatchTest";List<Message>messages=newArrayList... 查看详情

rocketmq(十七)批量消息(代码片段)

1、批量发送消息1.1、发送限制生产者进行消息发送时可以一次发送多条消息,这可以大大提升Producer的发送效率。不过需要注意以下几点:批量发送的消息必须具有相同的Topic批量发送的消息必须具有相同的刷盘策略批量... 查看详情

rocketmq(06)——消息的批量发送和消费(代码片段)

...的body的总体积不能超过4MB,否则会得到异常——org.apache.rocketmq.client.exce 查看详情

rocketmq之批处理消息(代码片段)

批量消息发送批量消息发送能够提高发送效率,提升系统吞吐量。同一批批量消息的topic、waitStoreMsgOK属性必须保持一致,批量消息不支持延迟消息。批量消息发送一次最多可以发送4MiB的消息,但是如果需要发送更大... 查看详情

rocketmq之批处理消息(代码片段)

批量消息发送批量消息发送能够提高发送效率,提升系统吞吐量。同一批批量消息的topic、waitStoreMsgOK属性必须保持一致,批量消息不支持延迟消息。批量消息发送一次最多可以发送4MiB的消息,但是如果需要发送更大... 查看详情

rocketmq使用事务消息(代码片段)

目录说明原理事务消息处理流程生产端消费端说明事务消息:1、不支持延时消息和批量消息2、如果消息没有及时提交,默认check15次,可以通过Broker的transactionCheckMax参数配置次数。如果超时15次依然没有得到明确结果... 查看详情

rocketmq(06)——消息的批量发送和消费(代码片段)

消息的批量发送和消费发送生产者进行消息发送时可以一次发送多条消息,这对于体积比较小的消息而言会大大改善性能,可以想象原本发送10条消息需要传递10次,现在只需要传递一次。这可以通过调用send()时传递... 查看详情

rocketmq-quickstart(批量消费)(代码片段)

一、专业术语Producer    消费生产者,负责产生消息,一般由业务系统负责产生消息Consumer    消息消费者,负责消费消息,一般是后台系统负责异步消费PushConsumer    Consumer的一种,应用通常向Consumer对象注册一个L... 查看详情

rocketmq之批处理消息(代码片段)

批量消息发送批量消息发送能够提高发送效率,提升系统吞吐量。同一批批量消息的topic、waitStoreMsgOK属性必须保持一致,批量消息不支持延迟消息。批量消息发送一次最多可以发送4MiB的消息,但是如果需要发送更大... 查看详情

三.rocketmq极简入门-rocketmq普通消息发送(代码片段)

前言RocketMQ已经写了两章了,一章是RocketMQ认识和安装,一章是RocketMQ的工作流程和核心概念,本章我们开始使用RocketMQ来发送和接收消息。RocketMQ的消息种类非常多,比如:普通消息,顺序消息,延迟消... 查看详情

rocketmq详解系列(代码片段)

什么是RocketMQRocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。主要功能是异步解耦和流量削峰:。常见的MQ主要有:ActiveMQ、RabbitMQ、Kafka、Ro... 查看详情

rocketmq详解系列(代码片段)

什么是RocketMQRocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。主要功能是异步解耦和流量削峰:。常见的MQ主要有:ActiveMQ、RabbitMQ、Kafka、Ro... 查看详情

rocketmq详解系列(代码片段)

什么是RocketMQRocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。主要功能是异步解耦和流量削峰:。常见的MQ主要有:ActiveMQ、RabbitMQ、Kafka、Ro... 查看详情

rocketmq事务消息篇之事务消息的使用(代码片段)

前言在RocketMQ事务消息篇(一)之事务消息的介绍里对RocketMQ的事务消息作了相关说明,本文提供一些基本的开发示例。java示例依赖<dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>ro 查看详情

rocketmq使用事务消息(代码片段)

...ate.ROLLBACK_MESSAGE、LocalTransactionState.UNKNOW。原理事务消息是RocketMQ的一大特性,其保证发送消息和执行本地逻辑在同一个事务内。实现的思路借鉴了两阶段提交协议:第一阶段:发送半事务消息,消息发送后,消... 查看详情

rocketmq源码—十rocketmq顺序消息(代码片段)

RocketMQ本身支持顺序消息,在使用上发送顺序消息和非顺序消息有所区别发送顺序消息SendResultsendResult=producer.send(msg,newMessageQueueSelector()@OverridepublicMessageQueueselect(List<MessageQueue>mqs,Messagemsg,Objectarg)Integeri 查看详情