rocketmq(代码片段)

whtt whtt     2023-04-17     768

关键词:

RocketMQ生产者和消费者

  注:生产者在生产数据时,指定数据的key,然后消费者进行数据消费时,获取到key,与redis中保存的key做判断

  如果不相同代表之前没有人进行消费,处理消费,保存到redis当中

  当有第二个消费者时,如果拿到的消息与redis中相同代表之前已已经有人消费

  就进行数据签收,防止后续消费者同样拿到重复消费数据

 

  :消费者的消费逻辑失败时,可以通过设置返回状态达到消息重试的结果。

  消息重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。

  每次重试后,消息ID都不一致,所以不能使用消息ID判断幂等。

 

生产者

 

private static Map<String,Object> map=new HashMap<>();
    public static void main(String[] args) throws Exception 
        //创建消费者
        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("mckz-group");
        //设置NameServer地址
        consumer.setNamesrvAddr("192.168.3.100:9876;192.168.3.200:9876");
        //设置实例名称
        consumer.setInstanceName("mckz");
        //订阅Topic
        consumer.subscribe("mckz_topic","TagA");
        //监听消息
        consumer.registerMessageListener(new MessageListenerConcurrently() 
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) 
                //获取消息
                for(MessageExt ext:msgs)
                    //判断redis中有没有当前消息key
                    if(map.containsKey(ext.getKeys()))
                        System.out.println("已经消费.......");
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    
                    //RocketMQ由于是集群环境,所以产生的消息ID可能会重复
                    System.out.println(ext.getMsgId()+"----------"+new String(ext.getBody()));
                    //将当前Key保存到redis当中
                    map.put(ext.getKeys(),ext);
                
          /*如果出现异常可进行try进行捕获*/
/*try 
                    int a=5/0;
                catch (Exception e)
                    //接受消息状态 
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                */

                /*try 
                    Thread.sleep(60000);
                catch (Exception e)
                    e.printStackTrace();
                */

                //接受消息状态 
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            
        );
        //启动消费者
        consumer.start();
    

 

 

消费者

public static void main(String[] args) throws Exception 
        //创建一个生产者
        DefaultMQProducer producer=new DefaultMQProducer("mckz_group");
        //设置NameServer地址
        producer.setNamesrvAddr("192.168.3.100:9876;192.168.3.200:9876");
        //设置生产者实例名称
        producer.setInstanceName("mckz");
        //启动生产者
        producer.start();
        //发送消息
        for (int i = 1; i <=10 ; i++) 
            Thread.sleep(1000); //模拟网络延迟
            //创建消息  topic代表主题名称     tags代表小分类     body代表消息体
            Message message=new Message("mckz_topic","TagA",("wdksoft-"+i).getBytes());
            //消息的唯一标识
            message.setKeys("订单编号"+i);
            //发送消息
            SendResult send = producer.send(message);
            System.out.println(send.toString());
        
    

rocketmq问题排查命令(代码片段)

修改rocketmq官方代码测试:packagecom.alibaba.middleware.race.rocketmq;importjava.util.Scanner;importcom.alibaba.rocketmq.client.exception.MQClientException;importcom.alibaba.rocketmq.client.producer.DefaultMQP 查看详情

rocketmq(15)——rocketmq控制台(代码片段)

RocketMQ控制台RocketMQ提供了一个管理控制台,可以查看RocketMQ的相关信息和进行一些管理操作。RocketMQ在Github上专门有一个仓库用来存放一些与RocketMQ相关的项目,地址是https://github.com/apache/rocketmq-externals,控制台就是其中一个。如... 查看详情

rocketmq(15)——rocketmq控制台(代码片段)

RocketMQ控制台RocketMQ提供了一个管理控制台,可以查看RocketMQ的相关信息和进行一些管理操作。RocketMQ在Github上专门有一个仓库用来存放一些与RocketMQ相关的项目,地址是https://github.com/apache/rocketmq-externals,控制台就是其中一个。如... 查看详情

rocketmq的使用(代码片段)

1在resources目录下创建config目录,新建文件rocketmq.properties文件#指定namesrv地址suning.rocketmq.namesrvAddr=localhost:9876#生产者group名称suning.rocketmq.producerGroupName=user_group#事务生产者group名称suning.rocketmq.transactionP 查看详情

rocketmq源码—十rocketmq顺序消息(代码片段)

RocketMQ本身支持顺序消息,在使用上发送顺序消息和非顺序消息有所区别发送顺序消息SendResultsendResult=producer.send(msg,newMessageQueueSelector()@OverridepublicMessageQueueselect(List<MessageQueue>mqs,Messagemsg,Objectarg)Integeri 查看详情

rocketmq源码—十rocketmq顺序消息(代码片段)

RocketMQ本身支持顺序消息,在使用上发送顺序消息和非顺序消息有所区别发送顺序消息SendResultsendResult=producer.send(msg,newMessageQueueSelector()@OverridepublicMessageQueueselect(List<MessageQueue>mqs,Messagemsg,Objectarg)Integeri 查看详情

rocketmq源码—八rocketmq消息重试(代码片段)

RocketMQ的消息重试包含了producer发送消息的重试和consumer消息消费的重试。producer发送消息重试producer在发送消息的时候如果发送失败了,RocketMQ会自动重试。privateSendResultsendDefaultImpl(Messagemsg,finalCommunicationModecommunicationMode,finalSendCal... 查看详情

rocketmq(代码片段)

1.消费端集群消费(负载均衡) 示例代码:/***Producer,发送消息**/publicclassProducerpublicstaticvoidmain(String[]args)throwsMQClientException,InterruptedExceptionDefaultMQProducerproducer=newDefaultMQProducer("message_prod 查看详情

rocketmq(08)——日志输出到rocketmq(代码片段)

日志输出到RocketMQRocketMQ对常用的几种日志输出框架都定义了一个日志输出实现,使对应的日志信息作为一条消息发送到RocketMQ。要使日志输出信息能够发送到RocketMQ,需要添加rocketmq-logappender依赖。<dependency><groupId&g... 查看详情

rocketmq===》docker部署rocketmq(代码片段)

一、拉取镜像dockerpullrocketmqinc/rocketmq:4.4.0二、启动容器dockerrun-d-p9876:9876-v/data/ota_soft/mqtt/data/namesrv/logs:/root/logs-vRmHome/data/namesrv/store:/root/store--namermqnamesrv-e"MAX_POSSIBLE_HEAP& 查看详情

rocketmq之延时处理消息(代码片段)

1启动消费者等待传入的订阅消息importorg.apache.rocketmq.client.consumer.DefaultMQPushConsumer;importorg.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;importorg.apache.rocketmq.client.consumer.l 查看详情

windows下安装rocketmq(代码片段)

RocketMQ安装内容一、RocketMQ环境搭建1.下载RocketMQ2.window的安装配置2.1使用rocketmq-4.5.1.zip解压到指定目录2.2需要配置环境变量ROCKETMQ_HOME3.修改broker的配置文件3.1打开broker.conf文件3.2添加配置4.先启动mqnamesrv5.启动broker6.启动管理控制台... 查看详情

windows下安装rocketmq(代码片段)

RocketMQ安装内容一、RocketMQ环境搭建1.下载RocketMQ2.window的安装配置2.1使用rocketmq-4.5.1.zip解压到指定目录2.2需要配置环境变量ROCKETMQ_HOME3.修改broker的配置文件3.1打开broker.conf文件3.2添加配置4.先启动mqnamesrv5.启动broker6.启动管理控制台... 查看详情

rocketmq(01)——简介(代码片段)

RocketMQ简介笔者使用的是ApacheRocketMQ,官网是http://rocketmq.apache.org/。RocketMQ是Alibaba开源的一个分布式消息队列,可以通过http://rocketmq.apache.org/dowloading/releases/下载当前最新的版本。下载后解压缩,然后通过bin/mqnamesrv启... 查看详情

docker部署rocketmq集群(代码片段)

最近搭建RocketMQ踩了一些坑,更新下第一步,创建目录mkdir-p/opt/rocketmq/logs/nameserver-amkdir-p/opt/rocketmq/logs/nameserver-bmkdir-p/opt/rocketmq/store/nameserver-amkdir-p/opt/rocketmq/store/nameserver-bmkd 查看详情

rocketmq管理控制台(代码片段)

dockerpullstyletang/rocketmq-console-ng  dockerrun-e"JAVA_OPTS=-Drocketmq.namesrv.addr=10.78.16.70:9876-Dcom.rocketmq.sendMessageWithVIPChannel=false"-p8080:8080-tstyletang/rocketmq-console- 查看详情

安装配置rocketmq(代码片段)

文章目录安装与配置RocketMQgitCode分享RocketMQ源码版本下载地址:Rocketmq控制台下载地址:百度网盘分享官方下载地址安装配置查看linux版本创建一个操作用户启动NameServer启动Broker命令行快速验证关闭RocketMQ服务安装与配置Ro... 查看详情

安装配置rocketmq(代码片段)

文章目录安装与配置RocketMQgitCode分享RocketMQ源码版本下载地址:Rocketmq控制台下载地址:百度网盘分享官方下载地址安装配置查看linux版本创建一个操作用户启动NameServer启动Broker命令行快速验证关闭RocketMQ服务安装与配置Ro... 查看详情