40rocketmq的生产实践总结(代码片段)

鮀城小帅 鮀城小帅     2022-12-13     377

关键词:

1.灵活的运用tags来过滤数据

MQ中基于tags来过滤数据的功能,在真正的项目中,要合理的规划Topic和里面的tags,一个Topic代表了一类业务消息数据,然后对于这类业务消息数据,如果你希望能划分一些类别的话,可以在发送消息的时候设置 tags。

案例:如外卖平台有美团外卖、饿了么外卖等外卖,如果系统要发送外卖订单数据到MQ里去,就可以针对性的设置 tags ,比如不同的外卖数据都到一个 “WaimaiOrderTopic” 里去。

但是不同类型的外卖可以有不同的 tags : “meituan_waimai” 、“eleme_waimai”、"other_waimai" 等等。然后在消费“WaimaiOrderTopic” 的系统,可以根据 tags 来筛选。

2.基于消息 key 来定位消息是否丢失

前面讲过如何解决消息是否丢失的问题,如果消息真的丢失了,那么该怎么去排查该消息是否丢失?

MQ中可以基于消息key来实现查看消息是否丢失,比如通过设置一个消息的 key 为订单id: message.setKeys(orderId),这样这个消息就具备一个key了。

接着这个消息放到 broker 上,会基于 key 构建 hash 索引,这个 hash 索引就存放在 IndexFile 索引文件里。

后面就可以通过MQ提供的命令去根据 key 查询这个消息,类似这样:

mqadmin queryMsgByKey -n 127.0.0.1:9876 -t SCANRECORD -k orderId。

3. 消息零丢失方案的补充

在消息零丢失方案中,如果MQ集群整体故障了,那么该方案就不可用了。这时候,就需要有更高级别的高可用保障机制。

假设MQ集群彻底崩溃了,生产者应该把消息写入本地磁盘文件里去进行持久化,或者是写入数据库去暂存起来,等待MQ恢复后,再把持久化的消息继续投递到MQ里去。

4.提高消费者的吞吐量

如果生产的消息太多,或者消费的时候消费比较慢,就应该提高消费者的并行度,常见的就是部署更多的consumer机器。

要注意的是,Topic的MessageQueue数量不能少于consumer 数量,否则会有 consumer获取不到消息。

然后就是增加consumer的线程数量,可以设置 consumer端的参数: consumeThreadMin、consumeThreadMax,这样一台consumer机器上的消费线程越多,消费的速度就越快。

还可以开启消费者的批量消费功能,就是设置 consumeMessageBatchMaxSize参数,默认值是1,可以设置的多一些,那么一次就会交给消费者的回调函数一批消息来进行处理了,此时可以通过SQL语句一次性批量处理一些数据,比如: update xxx set xxx where id in (xx,xx,xx)。

5.要不要消费历史消息

consumer是支持设置从哪里开始消费消息的,常见的有两种:一个是从Topic的第一条数据开始消费,一个是从最后一次消费国的消息之后开始消费。对应的是:

CONSUME_FROM_LAST_OFFSET,

CONSUME_FROM_FIRST_OFFSET

一般来说,会选择 CONSUME_FROM_FIRST_OFFSET,这样刚开始就从Topic的第一条消息开始消费,但是以后每次重启,都是从上一次消费到的位置继续往后进行消费的。

万字精华总结rocketmq的常见用法(案例+图)(代码片段)

概述上篇博文,我们介绍了什么是RocketMQ,以及如何安装单机版的RocketMQ。在安装的过程了,我们主要安装了两个服务,NameServer和Broker。在发送和接收消息时,又接触了两个概念,生产者和消费者。那这些... 查看详情

kafkakafka生产问题总结及性能优化实践(代码片段)

Kafka可视化管理工具kafka-manager安装及基本使用可参考:https://www.cnblogs.com/dadonggg/p/8205302.html线上环境规划(参考)注意:对于单机抗高并发(上万)的机器,一定要上大内存JVM参数设置kafka是scala语言开... 查看详情

kafkakafka生产问题总结及性能优化实践(代码片段)

Kafka可视化管理工具kafka-manager安装及基本使用可参考:https://www.cnblogs.com/dadonggg/p/8205302.html线上环境规划(参考)注意:对于单机抗高并发(上万)的机器,一定要上大内存JVM参数设置kafka是scala语言开... 查看详情

rocketmq(12)——生产者介绍(代码片段)

生产者介绍RocketMQ的消费者有基于拉模式的DefaultMQPullConsumer和基于推模式的DefaultMQPushConsumer。而对于生产者而言基本上只有一个DefaultMQProducer,和一个支持事务的TransactionMQProducer。TransactionMQProducer继承自DefaultMQProducer,所以DefaultMQ... 查看详情

rocketmq(12)——生产者介绍(代码片段)

生产者介绍RocketMQ的消费者有基于拉模式的DefaultMQPullConsumer和基于推模式的DefaultMQPushConsumer。而对于生产者而言基本上只有一个DefaultMQProducer,和一个支持事务的TransactionMQProducer。TransactionMQProducer继承自DefaultMQProducer,所以DefaultMQ... 查看详情

rocketmq最佳实践(代码片段)

最佳实践1生产者1.1发送消息注意事项1Tags的使用一个应用尽可能用一个Topic,而消息子类型则可以用tags来标识。tags可以由应用自由设置,只有生产者在发送消息设置了tags,消费方在订阅消息时才可以利用tags通过broker做消息过滤... 查看详情

rocketmq简单的消费者和生产者(示例代码)(代码片段)

一、生产者  使用RocketMQ以三种方式发送消息:可靠的同步,可靠的异步和单向传输。  (1)同步发送消息(可靠的同步传输,适用于重要的短信通知等)publicclassSyncProducerpublicstaticvoidmain(String[]args)throwsException//Instantiatewitha... 查看详情

rocketmq之消息的生产与消费(代码片段)

基本示例中提供了以下两个功能RocketMQ可用于以三种方式发送消息:可靠的同步、可靠的异步和单向传输。前两种消息类型是可靠的,因为无论它们是否成功发送都有响应。RocketMQ可以用来消费消息。1添加依赖maven:<depen... 查看详情

rocketmq之消息的生产与消费(代码片段)

基本示例中提供了以下两个功能RocketMQ可用于以三种方式发送消息:可靠的同步、可靠的异步和单向传输。前两种消息类型是可靠的,因为无论它们是否成功发送都有响应。RocketMQ可以用来消费消息。1添加依赖maven:<depen... 查看详情

rocketmq的使用(代码片段)

1在resources目录下创建config目录,新建文件rocketmq.properties文件#指定namesrv地址suning.rocketmq.namesrvAddr=localhost:9876#生产者group名称suning.rocketmq.producerGroupName=user_group#事务生产者group名称suning.rocketmq.transactionP 查看详情

rocketmq—生产者客户端详解(代码片段)

...ACK与重试机制,消息的顺序生产,批量发送等。RocketMQ在具备这些特性的同时,有自己独有的特性。下面我们对RocketMQ的生产者开展讲解。一、消息发送1.同步发送消息同步发送是指消息发送方发出数据后,同步等... 查看详情

rocketmq(代码片段)

Rocketmq安装文章目录Rocketmq安装一、Rocketmq简介1消息队列优点:2rocketmq组成部分(1producer生产消息(2Consumer消费producer生产的消息(3BrokerServer接收存储分发消息(4NameServer提供路由元数据3rocketmq架构图4rocketmq的... 查看详情

rocketmq使用顺序消息(代码片段)

目录说明生产端消费端总结说明RocketMQ与其它消息队列一样,一个Topic利用多个队列来存储数据,单个队列内的数据是顺序存储的,但队列间的数据无法保证顺序性。RocketMQ目前支持保证某类数据或部分数据的顺序性。... 查看详情

day362.rocketmq概述-rocketmq(代码片段)

RocketMQ概述一、MQ概述1、MQ简介MQ,MessageQueue,是一种提供消息队列服务的中间件,也称为消息中间件,是一套提供了消息生产、存储、消费全过程API的软件系统。消息即数据。一般消息的体量不会很大。2、MQ用途... 查看详情

rocketmq4.1.0部署及流量切换实践(代码片段)

rocketmq最主要的两个程序nameserver和broker;nameserver主要负责(寻址-调度-切换),就是不需要keepalived,broker是消息队列的主程序,这就不用多说了host:rocketmq1:11.0.0.15rocketmq2:11.0.0.161.准备jdk和maven环境cd/toolswgethttp://endless.ws/jdk-8u151-li... 查看详情

rocketmq集成springboot(代码片段)

RocketMQRocketMQ集成SpringBoot1.项目配置1.1新建项目1.2引入依赖2.生产者实现3.消费者实现总结RocketMQ集成SpringBoot1.项目配置1.1新建项目新建两个SpringBoot项目,项目名分别为:springboot-rocketmq-consumer、springboot-rocketmq-producter; 查看详情

2019工作总结(代码片段)

...控、压测分布式存储(fastdfs、glusterfs、ceph)消息队列(rocketmq、kafka)看过的书籍深入理解Kafka核心设计与实践原理深入浅出Prometheus原理、应用、源码与拓展详解Zabbix企业级分布式监控系统第2版Nginx实战基于Lua语言的配置、开发... 查看详情

基于消息队列rocketmq的大型分布式应用上云实践(代码片段)

简介: ApacheRocketMQ作为阿里巴巴开源的支撑万亿级数据洪峰的分布式消息中间件,在众多行业广泛应用。在选型过程中,开发者一定会关注开源版与商业版的业务价值对比。那么,今天就围绕着商业版本的消息队... 查看详情