rocketmq之消息的生产与消费(代码片段)

一只猪的思考 一只猪的思考     2022-12-04     447

关键词:

基本示例中提供了以下两个功能

  • RocketMQ可用于以三种方式发送消息:可靠的同步、可靠的异步和单向传输。前两种消息类型是可靠的,因为无论它们是否成功发送都有响应。
  • RocketMQ可以用来消费消息。

1 添加依赖

maven:

<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-client</artifactId>
  <version>4.3.0</version>
</dependency>

gradle:

compile 'org.apache.rocketmq:rocketmq-client:4.3.0'

2 发送消息

2.1 使用Producer发送同步消息

可靠的同步传输被广泛应用于各种场景,如重要的通知消息、短消息通知等。

public class SyncProducer 
  public static void main(String[] args) throws Exception 
    // Instantiate with a producer group name
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    // Specify name server addresses
    producer.setNamesrvAddr("localhost:9876");
    // Launch the producer instance
    producer.start();
    for (int i = 0; i < 100; i++) 
      // Create a message instance with specifying topic, tag and message body
      Message msg = new Message("TopicTest" /* Topic */,
        "TagA" /* Tag */,
        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
        );
      // Send message to one of brokers
      SendResult sendResult = producer.send(msg);
      // Check whether the message has been delivered by the callback of sendResult
      System.out.printf("%s%n", sendResult);
    
    // Shut down once the producer instance is not longer in use
    producer.shutdown();
  

2.2 发送异步消息

异步传输通常用于响应时间敏感的业务场景。这意味着发送方无法等待代理的响应太长时间。

public class AsyncProducer 
  public static void main(String[] args) throws Exception 
    // Instantiate with a producer group name
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    // Specify name server addresses
    producer.setNamesrvAddr("localhost:9876");
    // Launch the producer instance
    producer.start();
    producer.setRetryTimesWhenSendAsyncFailed(0);
    for (int i = 0; i < 100; i++) 
      final int index = i;
      // Create a message instance with specifying topic, tag and message body
      Message msg = new Message("TopicTest",
        "TagA",
        "OrderID188",
        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
      // SendCallback: receive the callback of the asynchronous return result.
      producer.send(msg, new SendCallback() 
        @Override
        public void onSuccess(SendResult sendResult) 
          System.out.printf("%-10d OK %s %n", index,
            sendResult.getMsgId());
        
        @Override
        public void onException(Throwable e) 
          System.out.printf("%-10d Exception %s %n", index, e);
          e.printStackTrace();
        
      );
    
    // Shut down once the producer instance is not longer in use
    producer.shutdown();
  

2.3 以单向模式发送消息

单向传输用于需要中等可靠性的情况,如日志收集。

public class OnewayProducer 
  public static void main(String[] args) throws Exception
    // Instantiate with a producer group name
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    // Specify name server addresses
    producer.setNamesrvAddr("localhost:9876");
    // Launch the producer instance
    producer.start();
    for (int i = 0; i < 100; i++) 
      // Create a message instance with specifying topic, tag and message body
      Message msg = new Message("TopicTest" /* Topic */,
        "TagA" /* Tag */,
        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
      );
      // Send in one-way mode, no return result
      producer.sendOneway(msg);
    
    // Shut down once the producer instance is not longer in use
     producer.shutdown();
  

3 消费消息

public class Consumer 
  public static void main(String[] args) throws InterruptedException, MQClientException 
    // Instantiate with specified consumer group name
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
    
    // Specify name server addresses
    consumer.setNamesrvAddr("localhost:9876");

    // Subscribe one or more topics and tags for finding those messages need to be consumed
    consumer.subscribe("TopicTest", "*");
    // Register callback to execute on arrival of messages fetched from brokers
    consumer.registerMessageListener(new MessageListenerConcurrently() 
      @Override
      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) 
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        // Mark the message that have been consumed successfully
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
      
    );
    // Launch the consumer instance
    consumer.start();
    System.out.printf("Consumer Started.%n");
  

rocketmq之消息的生产与消费(代码片段)

基本示例中提供了以下两个功能RocketMQ可用于以三种方式发送消息:可靠的同步、可靠的异步和单向传输。前两种消息类型是可靠的,因为无论它们是否成功发送都有响应。RocketMQ可以用来消费消息。1添加依赖maven:<depen... 查看详情

rocketmq使用延迟消息(代码片段)

目录说明生产端消费端运行结果示例延迟级别与延迟时间对应关系说明1、延时消息不是延迟发送,消息是实时发送的,只是消费者延迟消费2、延迟消息主要通过对Message设置延迟级别实现,生产者和消费者按照正常逻... 查看详情

rocketmq(代码片段)

RocketMQ生产者和消费者  注:生产者在生产数据时,指定数据的key,然后消费者进行数据消费时,获取到key,与redis中保存的key做判断  如果不相同代表之前没有人进行消费,处理消费,保存到redis当中  当有第二个消费者... 查看详情

springboot整合rocketmq的各种消息类型,生产者,消费者(代码片段)

...顺序消息消费者sql92过滤消息消费者事物消息消费者@RocketMQMessageListener参数解释我的rocketmq各种集群方案安装Springboot整合使用pom依赖<dependency><groupId>org.apache.roc 查看详情

rocketmq专题2:三种常用生产消费方式(顺序广播定时)以及顺序消费源码探究(代码片段)

...行常用的三种消息类型例子展示的时候,我们先来说一说RocketMQ的几个重要概念:PullConsumer与PushConsumer:主要区别在于Pull与Push的区别。对于PullConsumer,消费者会主动从broker中拉取消息进行消费。而对于PushConsumer,会封装包含消... 查看详情

关于rocketmq的基础api操作——这一篇就够了(代码片段)

关于RocketMQ的基础操作一、基础API操作1、普通消息1.1、消息生产端1.2、消息消费端2、顺序消息2.1、消息生产端2.2、消息消费端3、广播消息3.1、消息生产端3.2、消息消费端4、延迟消息4.1、消息生产端4.2、消息消费端4.3、实现原理... 查看详情

rocketmq之顺序消费(代码片段)

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。顺序消费的原理解析,在默认的情况下消息发送会采取RoundRobin轮询方式把消息发送到不同的queue(... 查看详情

rocketmq之顺序消费(代码片段)

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。顺序消费的原理解析,在默认的情况下消息发送会采取RoundRobin轮询方式把消息发送到不同的queue(... 查看详情

rocketmq广播消息(代码片段)

发布与模式实现。广播就是向一个主题的所有订阅者发送同一条消息。在发送消息的时候和普通的消息并与不同之处,只是在消费端做一些配置即可。Consumer消息消费publicclassBroadcastConsumerpublicstaticvoidmain(String[]args)throwsMQClientExcept... 查看详情

rocketmq之顺序消费(代码片段)

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。顺序消费的原理解析,在默认的情况下消息发送会采取RoundRobin轮询方式把消息发送到不同的queue(... 查看详情

rocketmq使用延迟消息(代码片段)

...blicvoidsendMessage()throwsExceptionDefaultMQProducerdefaultMQProducer=RocketMqUtil.getDefaultMQProducer();Messagemessage=newMessage(RocketMqUtil.TOPIC,"schedule","schedule-message".getBytes(Charset.forName("UTF-8")));//设置延迟级别,延迟级别≠延迟... 查看详情

消息队列之rabbitmq(代码片段)

...用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ。"""生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完... 查看详情

rocketmq笔记:普通消息(代码片段)

  普通消息为RocketMQ中最基础的消息,支持生产者和消费者的异步解耦通信。一、普通消息的生命周期 1、初始化  消息被生产者构建并完成初始化,待发送到服务端的状态。2、待消费  消息被发送到服务端,对消费... 查看详情

rocketmq简单的消费者和生产者(示例代码)(代码片段)

一、生产者  使用RocketMQ以三种方式发送消息:可靠的同步,可靠的异步和单向传输。  (1)同步发送消息(可靠的同步传输,适用于重要的短信通知等)publicclassSyncProducerpublicstaticvoidmain(String[]args)throwsException//Instantiatewitha... 查看详情

rocketmq的顺序消息(顺序消费)(代码片段)

简单介绍了消息有序性的概念,以及RocketMQ如何实现消息的顺序消费。文章目录1消息的有序性2生产者有序发送3消费者有序消费1消息的有序性消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订... 查看详情

rocketmq消息过滤(代码片段)

目录说明生产端消费端说明消息过滤的两种方式:Tag和SQL表达式。生产者:对Message设置Tag、用户属性消费者:subscribe时指定Tag、SQL表达式生产端@TestpublicvoidsendMessage()throwsExceptionDefaultMQProducerdefaultMQProducer=RocketMqU 查看详情

rocketmq使用广播消息(代码片段)

目录说明生产端消费端总结说明RocketMQ消息模式主要有两种:(1)、MessageModel.CLUSTERING:集群模式。同一消费者组内的每个消费者,只消费到Topic的一部分消息,所有消费者消费的消息加起来就是Topic的所有... 查看详情

rocketmq使用广播消息(代码片段)

目录说明生产端消费端总结说明RocketMQ消息模式主要有两种:(1)、MessageModel.CLUSTERING:集群模式。同一消费者组内的每个消费者,只消费到Topic的一部分消息,所有消费者消费的消息加起来就是Topic的所有... 查看详情