rocketmq生产者和消费者

园芳宝贝 园芳宝贝     2022-08-09     309

关键词:

1、生产者:

package com.ebways.mq.test.mq;

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;

import java.util.concurrent.TimeUnit;

/**
 * Created by gmq on 2016/10/13 0013.
 */
public class NewRocketProductor {

    public static void main(String[] args) throws MQClientException, Exception {

        /**
         * 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例<br>
         * 注意:ProducerGroupName需要由应用来保证唯一<br>
         * ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,
         * 因为服务器会回查这个Group下的任意一个Producer
         */
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr("192.168.100.190:9876");
        producer.setInstanceName("Producer");

        /**
         * Producer对象在使用之前必须要调用start初始化,初始化一次即可<br>
         * 注意:切记不可以在每次发送消息时,都调用start方法
         */
        producer.start();

        /**
         * 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。
         * 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态,<br>
         * 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,<br>
         * 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。
         */
        for (int i = 0; i < 10; i++) {
            try {
                {
                    Message msg = new Message("TopicTest1",// topic
                            "TagA",// tag
                            "OrderID001",// key
                            ("Hello MetaQ").getBytes());// body
                    SendResult sendResult = producer.send(msg);
                    System.out.println(sendResult);
                }
                {
                    Message msg = new Message("TopicTest2",// topic
                            "TagB",// tag
                            "OrderID0034",// key
                            ("Hello MetaQ").getBytes());// body
                    SendResult sendResult = producer.send(msg);
                    System.out.println(sendResult);
                }

                {
                    Message msg = new Message("TopicTest3",// topic
                            "TagC",// tag
                            "OrderID061",// key
                            ("Hello MetaQ").getBytes());// body
                    SendResult sendResult = producer.send(msg);
                    System.out.println(sendResult);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }

            TimeUnit.MILLISECONDS.sleep(1000);
        }

        /**
         * 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己
         * 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法
         */
        producer.shutdown();
    }
}

2、消费者

package com.ebways.mq.test.mq;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * Created by Administrator on 2016/10/14 0014.
 */
public class NewRocketConsumer {
    public static void main(String[] args) throws Exception {
        /**
         * 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例<br>
         * 注意:ConsumerGroupName需要由应用来保证唯一
         */
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
        consumer.setNamesrvAddr("192.168.100.190:9876");
        consumer.setInstanceName("Consumber");

        /**
         * 订阅指定topic下tags分别等于TagA或TagC或TagD
         */
        consumer.subscribe("TopicTest1", "TagA || TagC || TagD");
        /**
         * 订阅指定topic下所有消息<br>
         * 注意:一个consumer对象可以订阅多个topic
         */
        consumer.subscribe("TopicTest2", "*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            /**
             * 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
             */
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(
                    List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.err.println(Thread.currentThread().getName()
                        + " Receive New Messages: " + msgs.size());

                MessageExt msg = msgs.get(0);
                if (msg.getTopic().equals("TopicTest1")) {
                    // 执行TopicTest1的消费逻辑
                    if (msg.getTags() != null && msg.getTags().equals("TagA")) {
                        // 执行TagA的消费
                        System.err.println(new String(msg.getBody()));
                    } else if (msg.getTags() != null
                            && msg.getTags().equals("TagC")) {
                        // 执行TagC的消费
                        System.err.println("TagC:=====");
                        System.err.println(new String(msg.getBody()));
                    } else if (msg.getTags() != null
                            && msg.getTags().equals("TagD")) {
                        // 执行TagD的消费
                        System.err.println("TagD:=====");
                        System.err.println(new String(msg.getBody()));
                    }
                } else if (msg.getTopic().equals("TopicTest2")) {
                    System.err.println("TopicTest2:=====");
                    System.err.println(new String(msg.getBody()));
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        /**
         * Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>
         */
        consumer.start();

        System.err.println("Consumer Started.");
    }
}

 

rocketmq(18)——高可用配置(代码片段)

...概念中有NameServer和Broker,它们充当服务端,作为客户端生产者和消费者都需要和它们不断的进行交互。如果你的NameServer和Broker只有一台机器,并且挂了,那么与它们交互的这条路就断了。而如果作为客户端的生产者和消费者只... 查看详情

rocketmq(12)——生产者介绍(代码片段)

生产者介绍RocketMQ的消费者有基于拉模式的DefaultMQPullConsumer和基于推模式的DefaultMQPushConsumer。而对于生产者而言基本上只有一个DefaultMQProducer,和一个支持事务的TransactionMQProducer。TransactionMQProducer继承自DefaultMQProducer,所以DefaultMQ... 查看详情

rocketmq(12)——生产者介绍(代码片段)

生产者介绍RocketMQ的消费者有基于拉模式的DefaultMQPullConsumer和基于推模式的DefaultMQPushConsumer。而对于生产者而言基本上只有一个DefaultMQProducer,和一个支持事务的TransactionMQProducer。TransactionMQProducer继承自DefaultMQProducer,所以DefaultMQ... 查看详情

rocketmq(18)——高可用配置(代码片段)

...概念中有NameServer和Broker,它们充当服务端,作为客户端生产者和消费者都需要和它们不断的进行交互。如果你的NameServer和Broker只有一台机器,并且挂了,那么与它们交互的这条路就断了。而如果作为客户端的生产者和消费者只... 查看详情

rocketmq(18)——高可用配置(代码片段)

...概念中有NameServer和Broker,它们充当服务端,作为客户端生产者和消费者都需要和它们不断的进行交互。如果你的NameServer和Broker只有一台机器,并且挂了,那么与它们交互的这条路就断了。而如果作为客户端的生产者和消费者只... 查看详情

rocketmq使用延迟消息(代码片段)

...费2、延迟消息主要通过对Message设置延迟级别实现,生产者和消费者按照正常逻辑进行生产和消费。生产端@TestpublicvoidsendMessage()throwsExceptionDefaultMQProducerde 查看详情

rocketmq详解(代码片段)

...xff0c;RocketMQ的特点是纯JAVA实现1.基础概念Producer:消息生产者,负责产生消息,一般由业务系统负责产生消息ProducerGroup:消息生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者Consumer:... 查看详情

rocketmq(三)——系统架构

参考技术ARocketMQ架构上主要分为四部分构成:消息生产者,负责生产消息。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟RocketMQ中的消息生产者都是以生产者组(ProducerGro... 查看详情

rocketmq笔记:普通消息(代码片段)

  普通消息为RocketMQ中最基础的消息,支持生产者和消费者的异步解耦通信。一、普通消息的生命周期 1、初始化  消息被生产者构建并完成初始化,待发送到服务端的状态。2、待消费  消息被发送到服务端,对消费... 查看详情

rocketmq01_概述及背景主题标签队列生产者消费者注册中心工作流程

...tMQ-概述、背景②.消息、主题、标签、队列、唯一标识③.生产者、消费者、NameServer、Broker④.RocketMq-工作流程⑤.Topic的创建模式、读写队列①.RocketMQ-概述、背景①.RocketMQ是⼀款阿⾥巴巴开源的消息中间件。2016年11⽉28⽇,阿⾥巴巴... 查看详情

rocketmq01_概述及背景主题标签队列生产者消费者注册中心工作流程

...tMQ-概述、背景②.消息、主题、标签、队列、唯一标识③.生产者、消费者、NameServer、Broker④.RocketMq-工作流程⑤.Topic的创建模式、读写队列①.RocketMQ-概述、背景①.RocketMQ是⼀款阿⾥巴巴开源的消息中间件。2016年11⽉28⽇,阿⾥巴巴... 查看详情

rocketmq(代码片段)

RocketMQ生产者和消费者  注:生产者在生产数据时,指定数据的key,然后消费者进行数据消费时,获取到key,与redis中保存的key做判断  如果不相同代表之前没有人进行消费,处理消费,保存到redis当中  当有第二个消费者... 查看详情

rocketmq

...均衡(策略可以配置)地消费Topic中的消息,假如有一个生产者(Producer)发送了120条消息,其所属的Topic有3个消费者(Consumer)组,每个消费者组设置为集群消费,分别有2个消费者实例,如图所示。ConsumerGroupA的两个实例ConsumerI... 查看详情

rocketmq控制台跳过堆积是啥意思

...询、消息堆积报警和订阅关系查询功能。MQ消息堆积是指生产者发送的消息短时间内在Broker端大量堆积,无法被消费者及时消费,从而导致业务功能无法正常使用。消息堆积常见于以下几种情况:1、新上线的消费者功能有BUG,消... 查看详情

rocketmq入门到精通—rocketmq学习入门指南|rocketmq服务发现(nameserver)精讲

...包括。Brokers定期向每个NameServer注册路由数据(topic以及生产者信息\\消费者\\等其他信息)。NameServer为客户端,包括生产者,消费者和命令行客 查看详情

springboot整合rocketmq的各种消息类型,生产者,消费者(代码片段)

文章目录Springboot整合使用pom依赖yml整合使用同步消息消费者异步消息消费者单向消息消费者延时消息消费者顺序消息消费者sql92过滤消息消费者事物消息消费者@RocketMQMessageListener参数解释我的rocketmq各种集群方案安装Springboot... 查看详情

rocketmq的消息重试(消息重投)

...了RocketMQ的消息重试机制,RocketMQ的消息重试可以分为生产者重试和消费者重试两个部分。文章目录1生产者重试2消费者重试2.1异常重试2.2超时重试1生产者重试生产者在发送消息时,同步消息失败会重投,异步消息有... 查看详情

rocketmq的消息可靠性(防止消息丢失)

...,以及解决办法。消息的丢失问题,可能出现在生产者、MQ、消费者中。文章目录1生产者消息丢失2MQ消息丢失3消费者消息丢失1生产者消息丢失RocketMQ提供了三种方式发送消息:同步、异步和单向。同步发送:同步... 查看详情