rocketmq(代码片段)

lowerma lowerma     2023-04-18     629

关键词:

RocketMQ整理

概念

  • broker MQ服务节点

    • topic 主题

    • message queue 消息队列,类似于kafka中的partition

  • producer

    生产者,先去连接name server,查询到对应的broker信息,再去连接broker

  • consumer

    消费者,先去连接name server,查询到对应的broker信息,再去连接broker

  • name server

    命名服务器,用于存储Broker信息

启动命令

先启动name server

./bin/mqnamesrv

然后再启动broker

 
./bin/mqbroker -n name.server.url 
./bin/mqbroker -c config  
#-n 指定nameserver的ip和端口
#-c 以指定配置文件为参数启动
#端口默认为 9876
 

这时候启动应该会报错

原因:

RocketMQ的默认配置是生产环境级别的,JVM的内存达到8G,如果机器配置不够就会报错

解决:

  1. 修改runbroker.sh脚本,将JVM参数 -Xms,-Xmx,设置为机器可以承受的大小

  2. 修改runserver.sh脚本,将JVM参数 -Xms,-Xmx,设置为机器可以承受的大小

Console端测试

RocketMQ提供了quick start的example

  1. 配置环境变量NAMESRV_ADDRname server的url

  2. 执行./bin/tools.sh org.apache.rocketmq.example.quickstart.Producer来启动一个生产者,自动发送测试消息

  3. 执行./bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer来启动一个消费者来接收消息

JAVA客户端测试

引入RocketMQ依赖

 
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.3.2</version>
</dependency>
 

Java示例代码

 
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
?
public class SyncProducer 
    public static void main(String[] args) throws Exception 
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new DefaultMQProducer("test-group");
        // Specify name server addresses.
        producer.setNamesrvAddr("172.16.55.185:9876");
        //Launch the instance.
        producer.start();
        for (int i = 0; i < 100; i++) 
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTest11" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " +
                            i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //Call send message to deliver message to one of brokers.
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
    
 

执行main方法会报错,提示timeout

原因:

因为RocketMQ默认启动Broker的时候,分配的ip只有服务器内网访问权限,外网无法访问

解决:

通过配置文件指定broker启动ip为服务器外网能够访问的ip

然后重启broker,使用-c参数指定配置文件路径

UI管理工具(了解)

rocketmq-console,可以同过github下载

具体使用方式上网搜索

Topic创建

 
DefaultMQProducer producer = new DefaultMQProducer("HAOKE_IM");
producer.setNamesrvAddr("172.16.55.185:9876");
producer.start();
/**
 * key:broker名称
 * newTopic:topic名称
 * queueNum:队列数(分区)默认4个
 */
producer.createTopic("broker_name", "topic", 8);
 

消息发送

同步模式

 
String msgStr = "用户A发送消息给用户B";
Message msg = new Message("haoke_im_topic","SEND_MSG",
msgStr.getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息
SendResult sendResult = producer.send(msg);
System.out.println("消息状态:" + sendResult.getSendStatus());
System.out.println("消息id:" + sendResult.getMsgId());
System.out.println("消息queue:" + sendResult.getMessageQueue());
System.out.println("消息offset:" + sendResult.getQueueOffset());
 

异步模式

 
String msgStr = "用户A发送消息给用户B";
Message msg = new Message("broker_name","SEND_MSG",msgStr.getBytes(RemotingHelper.DEFAULT_CHARSET));
// 异步发送消息
producer.send(msg, new SendCallback() 
    @Override
    public void onSuccess(SendResult sendResult) 
        System.out.println("消息状态:" + sendResult.getSendStatus());
        System.out.println("消息id:" + sendResult.getMsgId());
        System.out.println("消息queue:" + sendResult.getMessageQueue());
        System.out.println("消息offset:" + sendResult.getQueueOffset());
    
    @Override
    public void onException(Throwable e) 
        System.out.println("发送失败!" + e);
    
);
 

Message数据结构

字段名默认值说明
Topic null 必填,线下环境不需要申请,线上环境需要申请后才能使用
Body null 必填,二进制形式,序列化由应用决定,ProducerConsumer 要协商好序列化形式
Tags null 选填,类似于 Gmail 为每封邮件设置的标签,方便服务器过滤使用。目前只支持每个消息设置一个 tag,所以也可以类比为 NotifyMessageType 概念
Keys null 选填,代表这条消息的业务关键词,服务器会根据 keys 创建哈希索引,设置后,可以在 Console 系统根据 Topic、Keys 来查询消息,由于是哈希索引,请尽可能保证 key 唯一,例如订单号,商品 Id 等
Flag 0 选填,完全由应用来设置,RocketMQ 不做干预
DelayTimeLevel 0 选填,消息延时级别,0 表示不延时,大于 0 会延时特定的时间才会被消费
WaitStoreMsgOK TRUE 选填,表示消息是否在服务器落盘后才返回应答

消息接收

 
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
consumer.setNamesrvAddr("172.16.55.185:9876");
// 订阅topic,* 表示接收此Topic下的所有消息,相当于一个匹配符 
//如果为 "tag1"就表示只订阅tag为"tag1"的消息 还可以组合订阅 "tag1 || tag2 ||  tag3"
consumer.subscribe("topic_name", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() 
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
    ConsumeConcurrentlyContext context) 
        for (MessageExt msg : msgs) 
            try 
                System.out.println(new String(msg.getBody(), "UTF-8"));
             catch (UnsupportedEncodingException e) 
                e.printStackTrace();
            
        
        System.out.println("收到消息->" + msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    
);
consumer.start();
 

消息过滤器

RocketMQ支持根据用户自定义属性进行过滤

过滤表达式类似于SQL的where,如:a > 5 AND b = ‘abc‘

RocketMQ默认没有开启消息过滤器

所以需要先开启才能使用

开启方式:

在配置文件中新增enablePropertyFilter = true

生产者发送带有条件的消息

 
String msgStr = "大家好我叫小美,今年18岁";
Message msg = new Message("topic_name","tag1",
msgStr.getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.putUserProperty("age", "18");
msg.putUserProperty("sex", "女");
// 发送消息
SendResult sendResult = producer.send(msg);
 

消费者使用过滤器接收

 
// 订阅topic,接收此Topic下的所有消息
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
consumer.setNamesrvAddr("172.16.55.185:9876");
// 订阅topic,* 表示接收此Topic下的所有消息,相当于一个匹配符 
consumer.subscribe("topic_name", MessageSelector.bySql("age>=18 AND sex=‘女‘"));
consumer.registerMessageListener(new MessageListenerConcurrently() 
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
    ConsumeConcurrentlyContext context) 
        for (MessageExt msg : msgs) 
            try 
                System.out.println(new String(msg.getBody(), "UTF-8"));
             catch (UnsupportedEncodingException e) 
                e.printStackTrace();
            
        
        System.out.println("收到消息->" + msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    
);
consumer.start();
 

消息顺序

在某些业务中,consumer在消费消息时,是需要按照生产者发送消息的顺序进行消费的

比如在电商系统中,订 单的消息,会有创建订单、订单支付、订单完成,如果消息的顺序发生改变,那么这样的消息就没有意义了

生产者代码实现

 
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr("172.16.55.185:9876");
producer.start();
for (int i = 0; i < 100; i++) 
    String msgStr = "order --> " + i;
    int orderId = i % 10; // 模拟生成订单id
    Message message = new Message("topic","tag",
                                msgStr.getBytes(RemotingHelper.DEFAULT_CHARSET));
    //orderId实际传入到lambda表达式(MessageQueueSelector)第三个参数
    SendResult sendResult = producer.send(message, (mqs, msg, arg) -> 
        //orderId范围0-9之间
        Integer id = (Integer) arg;
        //topic默认queue个数为4 所以index范围0-4之间
        int index = id % mqs.size();
        //凡是orderId对4取余为0的选用第1个queue
        //凡是orderId对4取余为1的选用第2个queue
        //凡是orderId对4取余为2的选用第3个queue
        //凡是orderId对4取余为3的选用第4个queue
        //这样一来就保证了满足某一条件的消息能够总是发送到特定的queue中
        return mqs.get(index);
    , orderId);
    System.out.println(sendResult);

producer.shutdown();
 

消费者代码

 
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("consumer_group");
consumer.setNamesrvAddr("172.16.55.185:9876");
consumer.subscribe("topic", "tag");
//使用MessageListenerOrderly这种顺序监听器,可以满足按顺序消费同一队列中的消息,不同的线程处理不同的队列
//比如thread-pool-1专门消费queue-0中的消息
//比如thread-pool-2专门消费queue-1中的消息
//比如thread-pool-3专门消费queue-2中的消息
//比如thread-pool-4专门消费queue-3中的消息
consumer.registerMessageListener(new MessageListenerOrderly() 
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
        ConsumeOrderlyContext context) 
        System.out.println(Thread.currentThread().getName() + " Receive New
        Messages: " + msgs);
        return ConsumeOrderlyStatus.SUCCESS;
    
);
consumer.start();
 

分布式事务

随着项目越来越复杂,越来越服务化,就会导致系统间的事务问题,这个就是分布式事务问题。 分布式事务分类有这几种:

  • 基于单个JVM,数据库分库分表了(跨多个数据库)。

  • 基于多JVM,服务拆分了(不跨数据库)。

  • 基于多JVM,服务拆分了 并且数据库分库分表了。

解决分布式事务问题的方案有很多,使用消息实现只是其中的一种

实现原理

  • 半消息

    生产者发送一条半消息

    MQ不会将半消息发送给消费者

    等生产者发送一条commit消息,MQ才会将半消息发送给消费者

    生产者发送一条rollback消息,MQ将半消息给回滚

  • 消息回查

    commit或者rollback消息在网络中丢失

    MQ会回调生产者提供的check方法,来确认消息的状态是commit或者rollback

生产者代码

 
TransactionMQProducer producer = new
TransactionMQProducer("producer_group");
producer.setNamesrvAddr("172.16.55.185:9876");
// 设置自定义事务监听器
producer.setTransactionListener(new TransactionListenerImpl());
producer.start();
// 发送消息
Message message = new Message("pay_topic", "用户A给用户B转账500元".getBytes("UTF-8"));
producer.sendMessageInTransaction(message, null);
Thread.sleep(999999);
producer.shutdown();
 

 

TransactionListenerImpl.java

 
public class TransactionListenerImpl implements TransactionListener 
    private static Map<String, LocalTransactionState> STATE_MAP = new HashMap<>();
    /**
    * 执行具体的业务逻辑
    *
    * @param msg 发送的消息对象
    * @param arg
    * @return
    */
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) 
        try 
            System.out.println("用户A账户减500元.");
            Thread.sleep(500); //模拟调用服务
            // System.out.println(1/0);
            System.out.println("用户B账户加500元.");
            Thread.sleep(800);
            STATE_MAP.put(msg.getTransactionId(),
            LocalTransactionState.COMMIT_MESSAGE);
            // 二次提交确认
            return LocalTransactionState.COMMIT_MESSAGE;
         catch (Exception e) 
            e.printStackTrace();
        
        STATE_MAP.put(msg.getTransactionId(),
        LocalTransactionState.ROLLBACK_MESSAGE);
        // 回滚
        return LocalTransactionState.ROLLBACK_MESSAGE;
    
    /**
    * 消息回查
    *
    * @param msg
    * @return
    */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) 
        return STATE_MAP.get(msg.getTransactionId());
    
 

 

消费者push和pull模式

  • push模式

    客户端与服务端建立连接后,当服务端有消息时,将消息推送到客户端

  • pull模式

    客户端不断的轮询请求服务端,来获取新的消息

但在具体实现时,PushPull模式都是采用消费端主动拉取的方式,即consumer轮询从broker拉取消息

  • Push方式里,consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒 MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送过来的

  • Pull方式里,取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历 MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开 始offset,直到取完了,再换另一个MessageQueue

消息模式

  • 集群模式

    同一个Group下的消费者

    每个消费者消费一部分topic数据

    同一个Group下的消费者合起来消费的数据就是topic的所有数据

  • 广播模式

    同一个Group下的消费者

    每一个消费的数据就是Topic的所有数据

// 集群模式
consumer.setMessageModel(MessageModel.CLUSTERING);
// 广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);

重复消息

造成消息重复的根本原因是:网络不可达

只要通过网络交换数据,就无法避免这个问题

所以解决这个问题的办 法就是绕过这个问题

那么问题就变成了:如果消费端收到两条一样的消息,应该怎样处理

  1. 消费端处理消息的业务逻辑保持幂等性

  2. 保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现

持久化

RocketMQ中的消息数据存储,采用了零拷贝技术(使用 mmap + write 方式)

文件系统采用 Linux Ext4 文件系 统进行存储

RocketMQ中,消息数据是保存在磁盘文件中,为了保证写入的性能,RocketMQ尽可能保证顺序写入,顺序写 入的效率比随机写入的效率高很多。 RocketMQ消息的存储是由ConsumeQueueCommitLog配合完成的

CommitLog是真正存储数据的文件

ConsumeQueue索引文件,存储数据指向到物理文件的配置。

 

Consume Queue相当于kafka中的partition,是一个逻辑队列,存储了这个Queue在CommiLog中的起始 offsetlog大小和MessageTaghashCode

每次读取消息队列先读取consumerQueue,然后再通过consumerQueuecommitLog中拿到消息主体

同步异步写入

RocketMQ 为了提高性能,会尽可能地保证磁盘的顺序写入

消息在通过 Producer 写入 RocketMQ 的时候,有两 种写磁盘方式

分别是同步写入异步写入

  • 同步

    在返回写成功状态时,消息已经被写入磁盘

    消息写入内存的缓冲区后,立刻通知写入线程写入

    等待写入完成,写入线程 执行完成后唤醒等待的线程,返回消息写成功的状态

  • 异步

    在返回写成功状态时,消息可能只是被写入了内存的 缓冲区

    写操作的返回快,吞吐量大

    当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入

配置指定写入方式

#-- 异步
flushDiskType = ASYNC_FLUSH 
#-- 同步
flushDiskType = SYNC_FLUSH 

 

rocketmq问题排查命令(代码片段)

修改rocketmq官方代码测试:packagecom.alibaba.middleware.race.rocketmq;importjava.util.Scanner;importcom.alibaba.rocketmq.client.exception.MQClientException;importcom.alibaba.rocketmq.client.producer.DefaultMQP 查看详情

rocketmq(15)——rocketmq控制台(代码片段)

RocketMQ控制台RocketMQ提供了一个管理控制台,可以查看RocketMQ的相关信息和进行一些管理操作。RocketMQ在Github上专门有一个仓库用来存放一些与RocketMQ相关的项目,地址是https://github.com/apache/rocketmq-externals,控制台就是其中一个。如... 查看详情

rocketmq(15)——rocketmq控制台(代码片段)

RocketMQ控制台RocketMQ提供了一个管理控制台,可以查看RocketMQ的相关信息和进行一些管理操作。RocketMQ在Github上专门有一个仓库用来存放一些与RocketMQ相关的项目,地址是https://github.com/apache/rocketmq-externals,控制台就是其中一个。如... 查看详情

rocketmq的使用(代码片段)

1在resources目录下创建config目录,新建文件rocketmq.properties文件#指定namesrv地址suning.rocketmq.namesrvAddr=localhost:9876#生产者group名称suning.rocketmq.producerGroupName=user_group#事务生产者group名称suning.rocketmq.transactionP 查看详情

rocketmq源码—十rocketmq顺序消息(代码片段)

RocketMQ本身支持顺序消息,在使用上发送顺序消息和非顺序消息有所区别发送顺序消息SendResultsendResult=producer.send(msg,newMessageQueueSelector()@OverridepublicMessageQueueselect(List<MessageQueue>mqs,Messagemsg,Objectarg)Integeri 查看详情

rocketmq源码—十rocketmq顺序消息(代码片段)

RocketMQ本身支持顺序消息,在使用上发送顺序消息和非顺序消息有所区别发送顺序消息SendResultsendResult=producer.send(msg,newMessageQueueSelector()@OverridepublicMessageQueueselect(List<MessageQueue>mqs,Messagemsg,Objectarg)Integeri 查看详情

rocketmq源码—八rocketmq消息重试(代码片段)

RocketMQ的消息重试包含了producer发送消息的重试和consumer消息消费的重试。producer发送消息重试producer在发送消息的时候如果发送失败了,RocketMQ会自动重试。privateSendResultsendDefaultImpl(Messagemsg,finalCommunicationModecommunicationMode,finalSendCal... 查看详情

rocketmq(代码片段)

1.消费端集群消费(负载均衡) 示例代码:/***Producer,发送消息**/publicclassProducerpublicstaticvoidmain(String[]args)throwsMQClientException,InterruptedExceptionDefaultMQProducerproducer=newDefaultMQProducer("message_prod 查看详情

rocketmq(08)——日志输出到rocketmq(代码片段)

日志输出到RocketMQRocketMQ对常用的几种日志输出框架都定义了一个日志输出实现,使对应的日志信息作为一条消息发送到RocketMQ。要使日志输出信息能够发送到RocketMQ,需要添加rocketmq-logappender依赖。<dependency><groupId&g... 查看详情

rocketmq===》docker部署rocketmq(代码片段)

一、拉取镜像dockerpullrocketmqinc/rocketmq:4.4.0二、启动容器dockerrun-d-p9876:9876-v/data/ota_soft/mqtt/data/namesrv/logs:/root/logs-vRmHome/data/namesrv/store:/root/store--namermqnamesrv-e"MAX_POSSIBLE_HEAP& 查看详情

rocketmq之延时处理消息(代码片段)

1启动消费者等待传入的订阅消息importorg.apache.rocketmq.client.consumer.DefaultMQPushConsumer;importorg.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;importorg.apache.rocketmq.client.consumer.l 查看详情

windows下安装rocketmq(代码片段)

RocketMQ安装内容一、RocketMQ环境搭建1.下载RocketMQ2.window的安装配置2.1使用rocketmq-4.5.1.zip解压到指定目录2.2需要配置环境变量ROCKETMQ_HOME3.修改broker的配置文件3.1打开broker.conf文件3.2添加配置4.先启动mqnamesrv5.启动broker6.启动管理控制台... 查看详情

windows下安装rocketmq(代码片段)

RocketMQ安装内容一、RocketMQ环境搭建1.下载RocketMQ2.window的安装配置2.1使用rocketmq-4.5.1.zip解压到指定目录2.2需要配置环境变量ROCKETMQ_HOME3.修改broker的配置文件3.1打开broker.conf文件3.2添加配置4.先启动mqnamesrv5.启动broker6.启动管理控制台... 查看详情

rocketmq(01)——简介(代码片段)

RocketMQ简介笔者使用的是ApacheRocketMQ,官网是http://rocketmq.apache.org/。RocketMQ是Alibaba开源的一个分布式消息队列,可以通过http://rocketmq.apache.org/dowloading/releases/下载当前最新的版本。下载后解压缩,然后通过bin/mqnamesrv启... 查看详情

docker部署rocketmq集群(代码片段)

最近搭建RocketMQ踩了一些坑,更新下第一步,创建目录mkdir-p/opt/rocketmq/logs/nameserver-amkdir-p/opt/rocketmq/logs/nameserver-bmkdir-p/opt/rocketmq/store/nameserver-amkdir-p/opt/rocketmq/store/nameserver-bmkd 查看详情

rocketmq管理控制台(代码片段)

dockerpullstyletang/rocketmq-console-ng  dockerrun-e"JAVA_OPTS=-Drocketmq.namesrv.addr=10.78.16.70:9876-Dcom.rocketmq.sendMessageWithVIPChannel=false"-p8080:8080-tstyletang/rocketmq-console- 查看详情

安装配置rocketmq(代码片段)

文章目录安装与配置RocketMQgitCode分享RocketMQ源码版本下载地址:Rocketmq控制台下载地址:百度网盘分享官方下载地址安装配置查看linux版本创建一个操作用户启动NameServer启动Broker命令行快速验证关闭RocketMQ服务安装与配置Ro... 查看详情

安装配置rocketmq(代码片段)

文章目录安装与配置RocketMQgitCode分享RocketMQ源码版本下载地址:Rocketmq控制台下载地址:百度网盘分享官方下载地址安装配置查看linux版本创建一个操作用户启动NameServer启动Broker命令行快速验证关闭RocketMQ服务安装与配置Ro... 查看详情