rocketmq实战

author author     2022-08-28     143

关键词:

在上一篇《RocketMQ实战(一)》中已经为大家初步介绍了下RocketMQ以及搭建了双Master环境,接下来继续为大家介绍!

Quick Start

写一个简单的生产者、消费者,带大家快速体验RocketMQ~


Maven配置:

技术分享


生产者:

技术分享

消费者:


技术分享

无论生产者、消费者都必须给出GroupName,而且具有唯一性!

生产到哪个Topic的哪个Tag下,消费者也是从Topic的哪个Tag进行消费,可见这个Tag有点类似于JMS Selector机制,即实现消息的过滤。

生产者、消费者需要设置NameServer地址。

这里,采用的是Consumer Push的方式,即设置Listener机制回调,相当于开启了一个线程。以后为大家介绍Consumer Pull的方式。


我们看一下运行结果:

技术分享

仔细看看生产者结果输出,你会发现,有的消息发往broker-a,有的在broker-b上,自动实现了消息的负载均衡!

技术分享


这里消费消息是没有什么顺序的,以后我们在来谈消息的顺序性。

我们再来看一看管控台:

技术分享

技术分享

在多Master模式中,如果某个Master进程挂了,显然这台broker将不可用,上面的消息也将无法消费,要知道开源版本的RocketMQ是没有提供切换程序,来自动恢复故障的,因此在实际开发中,我们一般提供一个监听程序,用于监控Master的状态。

在ActiveMQ中,生产消息的时候会提供是否持久化的选择,但是对于RocketMQ而言,消息是一定会被持久化的!

上面的消费者采用的是Push Consumer的方式,那么监听的Listener中的消息List到底是多少条呢?虽然提供了API,如consumer.setConsumeMessageBatchMaxSize(10),实际上即使设置了批量的条数,但是注意了,是最大是10,并不意味着每次batch的都是10,只有在消息有挤压的情况下才有可能。而且Push Consumer的最佳实践方式就是一条条的消费,如果需要batch,可以使用Pull Consumer。

务必保证先启动消费者进行Topic订阅,然后在启动生产者进行生产(否则极有可能导致消息的重复消费,重复消费,重复消费!重要的事情说三遍!关于消息的重复问题后续给大家介绍~)。而且在实际开发中,有时候不会批量的处理消息,而是原子性的,单线程的去一条一条的处理消息,这样就是实时的在处理消息。(批量的处理海量的消息,可以考虑Kafka)


初步了解消息失败重试机制

消息失败,无非涉及到2端:从生产者端发往MQ的失败;消费者端从MQ消费消息的失败;

生产者端的失败重试

技术分享


生产者端的消息失败:比如网络抖动导致生产者发送消息到MQ失败。

上图代码示例的处理手段是:如果该条消息在1S内没有发送成功,那么重试3次。


消费者端的失败重试

消费者端的失败,分为2种情况,一个是timeout,一个是exception

timeout,比如由于网络原因导致消息压根就没有从MQ到消费者上,在RocketMQ内部会不断的尝试发送这条消息,直至发送成功为止!(比如集群中一个broker失败,就尝试另一个broker)

exception,消息正常的到了消费者,结果消费者发生异常,处理失败了。这里涉及到一些问题,需要我们思考下,比如,消费者消费消息的状态有哪些定义?如果失败,MQ将采取什么策略进行重试?假设一次性批量PUSH了10条,其中某条数据消费异常,那么消息重试是10条呢,还是1条呢?而且在重试的过程中,需要保证不重复消费吗?

技术分享


消息消费的状态,有2种,一个是成功(CONSUME_SUCCESS),一个是失败&稍后重试(RECONSUME_LATER)


技术分享


在启动broker的过程中,可以观察下日志,你会发现RECONSUME_LATER的策略。

如果消费失败,那么1S后再次消费,如果失败,那么5S后,再次消费,......直至2H后如果消费还失败,那么该条消息就会终止发送给消费者了!

RocketMQ为我们提供了这么多次数的失败重试,但是在实际中也许我们并不需要这么多重试,比如重试3次,还没有成功,我们希望把这条消息存储起来并采用另一种方式处理,而且希望RocketMQ不要在重试呢,因为重试解决不了问题了!这该如何做呢?


我们先来看一下一条消息MessageExt对象的输出:

MessageExt [queueId=0, storeSize=137, queueOffset=0, sysFlag=0, bornTimestamp=1492213846916, bornHost=/192.168.99.219:50478, storeTimestamp=1492213846981, storeHost=/192.168.99.121:10911, msgId=C0A8637900002A9F0000000000000000, commitLogOffset=0, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest2, flag=0, properties={TAGS=TagA, WAIT=true, MAX_OFFSET=3, MIN_OFFSET=0}, body=16]]

注意到reconsumeTimes属性,这个属性就代表消息重试的次数!来看一段代码:

技术分享


注意了,对于消费消息而言,存在2种指定的状态(成功 OR 失败重试),如果一条消息在消费端处理没有返回这2个状态,那么相当于这条消息没有达到消费者,势必会再次发送给消费者!也即是消息的处理必须有返回值,否则就进行重发。


天然的消息负载均衡及高效的水平扩展机制

技术分享


对于RocketMQ而言,通过ConsumeGroup的机制,实现了天然的消息负载均衡!通俗点来说,RocketMQ中的消息通过ConsumeGroup实现了将消息分发到C1/C2/C3/......的机制,这意味着我们将非常方便的通过加机器来实现水平扩展!

我们考虑一下这种情况:比如C2发生了重启,一条消息发往C3进行消费,但是这条消息的处理需要0.1S,而此时C2刚好完成重启,那么C2是否可能会收到这条消息呢?答案是肯定的,也就是consume broker的重启,或者水平扩容,或者不遵守先订阅后生产消息,都可能导致消息的重复消费!关于去重的话题会在后续中予以介绍!

至于消息分发到C1/C2/C3,其实也是可以设置策略的。


技术分享


集群消费 AND 广播消费

RocketMQ的消费方式有2种,在默认情况下,就是集群消费,也就是上面提及的消息的负载均衡消费。另一种消费模式,是广播消费。广播消费,类似于ActiveMQ中的发布订阅模式,消息会发给Consume Group中的每一个消费者进行消费。

技术分享


技术分享


OK,到这里,本期的RocketMQ就结束了,咱们下期见~



本文出自 “学海无涯 心境无限” 博客,请务必保留此出处http://zhangfengzhe.blog.51cto.com/8855103/1916811

深入理解rocketmq---实战(控制台搭建)(代码片段)

 rocketMQ控制台搭建(1)下载rocketmq-console代码:https://github.com/875279177/incubator-rocketmq-externals(2)修改配置application配置文件,主要修改端口号及rocketmq.config.dataPathserver.contextPath=server.port=8080#spring.ap 查看详情

rocketmq最佳实战

RocketMQ客户端最佳实践1.Producer最佳实践发送消息注意事项1).一个应用尽可能用一个Topic,消息子类型用tags来标识,tags可以由应用自由设置。2).消息发送成功或者失败,要打印消息日志,务必要打印sendresult和key字段。SEND_OK,消息... 查看详情

源码分析rocketmq与运维实战

 RocketMQ是笔者当前最突出的亮点,正是由于在CSDN中连载RocketMQ,最终促成了《RocketMQ技术内幕》一书的出版,也凭借此专栏的高质量,最终成为CSDN2020年年度博客之星TOP2。RocketMQ专栏目前已经输出48篇文章,并... 查看详情

mac下rocketmq各种集群模式搭建实战

...nc)4、双master异步模式(2m-2s-async)5、优化和监控首先去rocketmq官网下载安装包,地址为:http://mirrors.hust.edu.cn/apache/rocketmq/4.3.2/rocketmq-all-4.3.2 查看详情

深入理解rocketmq---实战(控制台搭建)(代码片段)

 rocketMQ控制台搭建(1)下载rocketmq-console代码:https://github.com/875279177/incubator-rocketmq-externals(2)修改配置application配置文件,主要修改端口号及rocketmq.config.dataPathserver.contextPath=server.port=8080#spring.application.index=truespring.application.n... 查看详情

(十九)atp应用测试平台——springboot集成rocketmq案例实战(代码片段)

前言本节内容是关于RocketMQ消息中间键的实战内容,主要介绍在springboot项目中如何集成使用RocketMQ消息中间键,包括消息的发送、消息的接收以及RocketMQ的一些配置说明,以及效果说明。话不多说,开始实战内容。... 查看详情

精华推荐|深入浅出rocketmq原理及实战「底层源码挖掘系列」透彻剖析贯穿rocketmq的消费者端的运行核心的流程(上篇)

精华推荐|【深入浅出RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程上篇:分析对应总体消费流程的判断和校验以及限流控制和回调等处理流程分析下篇:分析基于上篇的总体流程的... 查看详情

rocketmq事务消息实战(代码片段)

RocketMQ事务消息阅读目录指引:RocketMQ源码分析之从官方示例窥探RocketMQ事务消息实现基本思想RocketMQ源码分析之RocketMQ事务消息实现原理上篇RocketMQ源码分析之RocketMQ事务消息实现原理中篇----事务消息状态回查RocketMQ源码分析... 查看详情

一次rocketmq进程自动退出排查经验分享(实战篇)(代码片段)

1、背景公司一个RocketMQ集群由4主4从组成,突然其中3台服务器“竟然”在同一时间下线,其监控显示如下:依次查看三台机器的监控图形,时间戳几乎完美“吻合”,不可思议吧。2、故障分析出现问题,先二话不说,马上重启... 查看详情

精华推荐|深入浅出rocketmq原理及实战「底层原理挖掘系列」透彻剖析贯穿rocketmq的存储系统的实现原理和持久化机制

RocketMQ的发展历史RocketMQ是一个统一消息引擎、轻量级数据处理平台。RocketMQ是一款阿里巴巴开源的消息中间件。2016年11月28日,阿里巴巴向广西党性培训Apache软件基金会捐赠RocketMQ,成为Apache孵化项目。2017年9月25日,Apache宣布Rock... 查看详情

《rocketmq实战专栏》为什么是你学习rocketmq的最佳资料

《RocketMQ实战与原理》专栏简介简介RocketMQ业界主流的消息中间件之一,承载公司核心业务消息的流转。对RocketMQ核心原理的理解与最佳实践成了开发与运维同学的必备技能。听很多同学抱怨,在学习中苦于缺少参考资料... 查看详情

精华推荐|深入浅出rocketmq原理及实战「底层原理挖掘系列」透彻剖析贯穿rocketmq的存储系统的实现原理和持久化机制

RocketMQ的发展历史RocketMQ是一个统一消息引擎、轻量级数据处理平台。RocketMQ是一款阿里巴巴开源的消息中间件。2016年11月28日,阿里巴巴向广西党性培训Apache软件基金会捐赠RocketMQ,成为Apache孵化项目。2017年9月25日,A... 查看详情

springboot实战项目整合阿里云rocketmq消息队列实现发送普通消息,延时消息(代码片段)

原文地址:Springboot实战项目整合阿里云RocketMQ消息队列实现发送普通消息,延时消息--附代码-学不会丶-博客园一.为什么选择RocketMQ消息队列?(可跳过看三的整合代码实例)首先RocketMQ是阿里巴巴自研出来的&#... 查看详情

rocketmq实战-搭建高可用的分布式消息队列集群(代码片段)

文章目录RocketMQ各部分角色RocketMQ多集群配置和部署实战集群查看方式Broker配置参数介绍RocketMQ各部分角色NameServer,Broker,Producer,Consumer类比邮政系统,Producer是发信人;Consumer是收信人;Broker是负责暂存、... 查看详情

rocketmq-spring:实战与源码解析一网打尽(代码片段)

RocketMQ是大家耳熟能详的消息队列,开源项目rocketmq-spring可以帮助开发者在SpringBoot项目中快速整合RocketMQ。这篇文章会介绍SpringBoot项目使用rocketmq-springSDK实现消息收发的操作流程,同时笔者会从开发者的角度解读SDK的设计逻辑。... 查看详情

精华推荐|深入浅出rocketmq原理及实战「性能原理挖掘系列」透彻剖析贯穿rocketmq的系统服务底层原理以及高性能存储设计挖掘深入

...本身的设计越复杂,采用文件系统作为消息存储的方式。RocketMQ存储机制消息中间件的存储一般都是利用磁盘,一般是使用机械硬盘,但机械硬盘的速度比访问内存慢了n个数量级,一款优秀的消息中间件必然会将硬件资源压榨到... 查看详情

阿里资深架构推荐:rocketmq实战与原理解析。

...的日志传输领域到后来阿里集团全维度在线业务的支撑,RocketMQ被广泛用于交易、数据同步、缓存同步、IM通讯、流计算、IoT等场景。在双11全球狂欢节中,RocketMQ以万亿级的消息总量支撑了全集团3000多个应用,为复杂的业务场景... 查看详情

深入浅出rocketmq原理及实战「底层原理挖掘系列」透彻剖析贯穿rocketmq的消息发送的全部流程和落盘原理分析指南(代码片段)

前言介绍RocketMQ目前在国内应该是比较流行的MQ了,目前本人也在公司的项目中进行使用和研究,借着这个机会,分析一下RocketMQ发送一条消息到存储一条消息的过程,这样会对以后大家分析和研究RocketMQ相关的问... 查看详情