rocketmq使用

奔跑在梦想的道路上 奔跑在梦想的道路上     2022-11-09     346

关键词:

  RocketMQ是阿里巴巴在2012年开源的分布式消息中间件,目前已经捐赠给Apache基金会,并于2016年11月成为 Apache 孵化项目。  

中间件是一类连接软件组件和应用的计算机软件,它包括一组服务。以便于运行在一台或多台机器上的多个软件通过网络进行交互。
中间件技术所提供的互操作性,推动了分布式体系架构的演进,该架构通常用于支持并简化那些复杂的分布式应用程序,它包括web服务器、事务监控器和消息队列软件。
中间件(middleware)是基础软件的一大类,属于可复用软件的范畴。顾名思义,中间件处于操作系统软件与用户的应用软件的中间。
中间件在操作系统、网络和数据库之上,应用软件的下层,总的作用是为处于自己上层的应用软件提供运行与开发的环境,帮助用户灵活、高效地开发和集成复杂的应用软件。

    中间件是位于平台(硬件和操作系统)和应用之间的通用服务,这些服务具有标准的程序接口和协议。针对不同的操作系统和硬件平台,中间件可以有符合接口和协议规范的多种实现:

  一.理论部分

  RocketMQ就是一款分布式消息中间件。那么,RocketMQ主要为了解决哪些问题呢?

  (1)Publish/Subscribe
  发布与订阅是消息中间件的最基本功能,也是相对于传统RPC通信而言。

  (2)Message Priority
  规范中描述的优先级是指在一个消息队列中,每条消息都有不同的优先级,一般用整数来描述,优先级高的消息先投递,如果消息完全在一个内存队列中,那么在投递前可以按照优先级排序,令优先级高的先投递。
  由于RocketMQ所有消息都是持久化的,所以如果按照优先级来排序,开销会非常大,因此RocketMQ没有特意支持消息优先级,但是可以通过变通的方式实现类似功能,即单独配置一个优先级高的队列,和一个普通优先级的队列, 将不同优先级发送到不同队列即可。

  (3)Message Order
  消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了3条消息,分别是订单创建,订单付款,订单完成。消费时,要按照这个顺序消费才能有意义。但是同时订单之间是可以并行消费的。
  RocketMQ可以严格的保证消息有序。

  (4)Message Filter
  ①Broker端消息过滤  
  在Broker中,按照Consumer的要求做过滤,优点是减少了对于Consumer无用消息的网络传输。缺点是增加了Broker的负担,实现相对复杂。
  ②Consumer端消息过滤
  这种过滤方式可由应用完全自定义实现,但是缺点是很多无用的消息要传输到Consumer端。

  (5)Message Persistence
  消息中间件通常采用的几种持久化方式:
  ①持久化到数据库,例如Mysql。
     ②持久化到KV存储,例如levelDB、伯克利DB等KV存储系统。
     ③文件记录形式持久化,例如Kafka,RocketMQ
     ④对内存数据做一个持久化镜像,例如beanstalkd,VisiNotify
     ⑤前三种持久化方式都具有将内存队列Buffer进行扩展的能力,第四种方式只是一个内存的镜像,作用是当Broker挂掉重启后仍然能将之前内存的数据恢复出来。

  RocketMQ充分利用Linux文件系统内存cache来提高性能。

  (6)Message Reliablity
  影响消息可靠性的几种情况:
  ①Broker正常关闭;
     ②Broker异常Crash;
     ③OS Crash;
     ④机器掉电,但是能立即恢复供电情况。
     ⑤机器无法开机(可能是cpu、主板、内存等关键设备损坏)
     ⑥磁盘设备损坏。
  前四种情况都属于硬件资源可立即恢复情况,RocketMQ在这四种情况下能保证消息不丢,或者丢失少量数据(依赖刷盘方式是同步还是异步)。
  后两种情况属于单点故障,且无法恢复,一旦发生,在此单点上的消息全部丢失。RocketMQ在这两种情况下,通过异步复制,可保证99%的消息不丢,但是仍然会有极少量的消息可能丢失。通过同步双写技术可以完全避免单点,同步双写势必会影响性能,适合对消息可靠性要求极高的场合,例如与Money相关的应用。
  RocketMQ从3.0版本开始支持同步双写。

  (7)Low Latency Messaging
  在消息不堆积情况下,消息到达Broker后,能立刻到达Consumer。RocketMQ使用长轮询Pull方式,可保证消息非常实时,消息实时性不低于Push。
  (8)At least Once
  是指每个消息必须投递一次。RocketMQ Consumer先pull消息到本地,消费完成后,才向服务器返回ack,如果没有消费一定不会ack消息,所以RocketMQ可以很好的支持此特性。
  (9)Exactly Only Once
     ①发送消息阶段,不允许发送重复的消息。
     ②消费消息阶段,不允许消费重复的消息。
  只有以上两个条件都满足情况下,才能认为消息是“Exactly Only Once”,而要实现以上两点,在分布式系统环境下,不可避免要产生巨大的开销。所以RocketMQ为了追求高性能,并不保证此特性,要求在业务上进行去重,也就是说消费消息要做到幂等性。RocketMQ虽然不能严格保证不重复,但是正常情况下很少会出现重复发送、消费情况,只有网络异常,Consumer启停等异常情况下会出现消息重复。

  (10)Broker的Buffer问题

  Broker的Buffer通常指的是Broker中一个队列的内存Buffer大小,这类Buffer通常大小有限。
  另外,RocketMQ没有内存Buffer概念,RocketMQ的队列都是持久化磁盘,数据定期清除。RocketMQ同其他MQ有非常显著的区别,RocketMQ的内存Buffer抽象成一个无限长度的队列,不管有多少数据进来都能装得下,这个无限是有前提的,Broker会定期删除过期的数据,例如Broker只保存3天的消息,那么这个Buffer虽然长度无限,但是3天前的数据会被从队尾删除。
  (11)回溯消费
  回溯消费是指Consumer已经消费成功的消息,由于业务上的需求需要重新消费,要支持此功能,Broker在向Consumer投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度,例如由于Consumer系统故障,恢复后需要重新消费1小时前的数据,那么Broker要提供一种机制,可以按照时间维度来回退消费进度。
  RocketMQ支持按照时间回溯消费,时间维度精确到毫秒,可以向前回溯,也可以向后回溯。
  (12)消息堆积
  消息中间件的主要功能是异步解耦,还有个重要功能是挡住前端的数据洪峰,保证后端系统的稳定性,这就要求消息中间件具有一定的消息堆积能力,消息堆积分以下两种情况:
     ①消息堆积在内存Buffer,一旦超过内存Buffer,可以根据一定的丢弃策略来丢弃消息,如CORBA Notification规范中描述。适合能容忍丢弃消息的业务,这种情况消息的堆积能力主要在于内存Buffer大小,而且消息堆积后,性能下降不会太大,因为内存中数据多少对于对外提供的访问能力影响有限。
     ②消息堆积到持久化存储系统中,例如DB,KV存储,文件记录形式。 当消息不能在内存Cache命中时,要不可避免的访问磁盘,会产生大量读IO,读IO的吞吐量直接决定了消息堆积后的访问能力。
  评估消息堆积能力主要有以下四点:
  消息能堆积多少条,多少字节?即消息的堆积容量。
     消息堆积后,发消息的吞吐量大小,是否会受堆积影响?
     消息堆积后,正常消费的Consumer是否会受影响?
     消息堆积后,访问堆积在磁盘的消息时,吞吐量有多大?
  (13)分布式事务
  已知的几个分布式事务规范,如XA,JTA等。其中XA规范被各大数据库厂商广泛支持,如Oracle,Mysql等。其中XA的TM实现佼佼者如Oracle Tuxedo,在金融、电信等领域被广泛应用。
  分布式事务涉及到两阶段提交问题,在数据存储方面的方面必然需要KV存储的支持,因为第二阶段的提交回滚需要修改消息状态,一定涉及到根据Key去查找Message的动作。RocketMQ在第二阶段绕过了根据Key去查找Message的问题,采用第一阶段发送Prepared消息时,拿到了消息的Offset,第二阶段通过Offset去访问消息,并修改状态,Offset就是数据的地址。
  RocketMQ这种实现事务的方式,没有通过KV存储做,而是通过Offset方式,存在一个显著缺陷,即通过Offset更改数据,会令系统的脏页过多,需要特别关注。
  (14)定时消息
  定时消息是指消息发到Broker后,不能立刻被Consumer消费,要到特定的时间点或者等待特定的时间后才能被消费。
  如果要支持任意的时间精度,在Broker层面,必须要做消息排序,如果再涉及到持久化,那么消息排序要不可避免的产生巨大性能开销。
  RocketMQ支持定时消息,但是不支持任意时间精度,支持特定的level,例如定时5s,10s,1m等。
  (15)消息重试
  Consumer消费消息失败后,要提供一种重试机制,令消息再消费一次。Consumer消费消息失败通常可以认为有以下几种情况:
  由于消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。这种错误通常需要跳过这条消息,再消费其他消息,而这条失败的消息即使立刻重试消费,99%也不成功,所以最好提供一种定时重试机制,即过10s秒后再重试。
     由于依赖的下游应用服务不可用,例如db连接不可用,外系统网络不可达等。遇到这种错误,即使跳过当前失败的消息,消费其他消息同样也会报错。这种情况建议应用sleep 30s,再消费下一条消息,这样可以减轻Broker重试消息的压力。
  RocketMQ的设计模型:

  简单说来,RocketMQ具有以下特点:
  ①是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式特点。
     ②Producer、Consumer、队列都可以分布式。
     ③Producer向一些队列轮流发送消息,队列集合称为Topic,Consumer如果做广播消费,则一个consumer实例消费这个Topic对应的所有队列,如果做集群消费,则多个Consumer实例平均消费这个topic对应的队列集合。
     ④能够保证严格的消息顺序。
     ⑤提供丰富的消息拉取模式。
     ⑥高效的订阅者水平扩展能力。
     ⑦实时的消息订阅机制。
     ⑧亿级消息堆积能力。
     ⑨较少的依赖。

  RocketMQ 物理部署结构:

  RocketMQ的部署结构有以下特点:

  ①Name Server是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
     ②Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与Name Server集群中的所有节点建立长连接,定时注册Topic信息到所有Name Server。
     ③Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
     ④Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。

  RocketMQ 逻辑部署结构:

  RocketMQ的逻辑部署结构有Producer和Consumer两个特点。
  (1)Producer Group
  用来表示一个发送消息应用,一个Producer Group下包含多个Producer实例,可以是多台机器,也可以是一台机器的多个进程,或者一个进程的多个Producer对象。一个Producer Group可以发送多个Topic消息,Producer Group作用如下:
     ①标识一类Producer;
     ②可以通过运维工具查询这个发送消息应用下有多个Producer实例;
    ③发送分布式事务消息时,如果Producer中途意外宕机,Broker会主动回调Producer Group内的任意一台机器来确认事务状态。
  (2)Consumer Group
  用来表示一个消费消息应用,一个Consumer Group下包含多个Consumer实例,可以是多台机器,也可以是多个进程,或者是一个进程的多个Consumer对象。一个Consumer Group下的多个Consumer以均摊方式消费消息,如果设置为广播方式,那么这个Consumer Group下的每个实例都消费全量数据。
  RocketMQ 数据存储结构:

  

  RocketMQ采取了一种数据与索引分离的存储方法。有效降低文件资源、IO资源,内存资源的损耗。即便是阿里这种海量数据,高并发场景也能够有效降低端到端延迟,并具备较强的横向扩展能力。

  二.实践部分

  1.在服务器上安装RocketMQ

   此处略。

  2.程序中使用RocketMQ

   创建一个maven项目,在pom文件中添加RocketMQ客户端jar包的依赖。

<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.1.0-incubating</version>
</dependency>

   创建生产者:

//Producer.java
package itszt; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; /** * 生产者 */ public class Producer public static void main(String[] args) DefaultMQProducer producer = new DefaultMQProducer("Producer"); producer.setNamesrvAddr("127.0.0.1:9876"); try producer.start(); Message msg = new Message("PushTopic", "push", "1", "Just for test.".getBytes()); SendResult result = producer.send(msg); System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus()); msg = new Message("PushTopic", "push", "2", "Just for test.".getBytes()); result = producer.send(msg); System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus()); msg = new Message("PushTopic", "push", "1", "Just for test.".getBytes()); result = producer.send(msg); System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus()); catch (Exception e) e.printStackTrace(); finally producer.shutdown();

   创建消费者:

//Consumer.java
package itszt;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * 消费者
 */
public class Consumer 
    public static void main(String[] args) 
        DefaultMQPushConsumer consumer =
                new DefaultMQPushConsumer("PushConsumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        try 
            //订阅PushTopic下Tag为push的消息
            consumer.subscribe("PushTopic", "push");

            //程序第一次启动从消息队列头取数据
            consumer.setConsumeFromWhere(
                    ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.registerMessageListener(new MessageListenerConcurrently() 
                                                 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext Context) 
                                                     Message msg = list.get(0);
//                            System.out.println(msg.toString());

                                                     String topic = msg.getTopic();
                                                     System.out.println("topic = " + topic);
                                                     byte[] body = msg.getBody();
                                                     System.out.println("body:  " + new String(body));
                                                     String keys = msg.getKeys();
                                                     System.out.println("keys = " + keys);
                                                     String tags = msg.getTags();
                                                     System.out.println("tags = " + tags);
                                                     System.out.println("-----------------------------------------------");

                                                     return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                                                 
                                             
            );
            consumer.start();
         catch (Exception e) 
            e.printStackTrace();
        
    

rocketmq---核心概念特性使用等

对于RocketMQ而言,感觉官方提供的东西还是可以的:https://github.com/apache/rocketmq/tree/master/docs/cn  查看详情

rocketmq消息队列单机部署及使用

...http://blog.csdn.net/loongshawn/article/details/51086876相关文章:《RocketMQ消息队列单机部署及使用》《java编写简单消息队列。实现高德坐标变形服务》0RocketMQ简单介绍0.1介绍RocketMQ是一个消息中间件。消息中间件中有两个角色:消息生产... 查看详情

rocketmq使用(代码片段)

rocketmq 基本使用可以看官网和官网给的demo.https://github.com/apache/rocketmq/tree/master/example这里主要说明几个点:rocketmq  发送类型常用:1,普通消息.(可以获取发送结果,失败了重试)2,有序消息.(秒杀,等需要有序的消费场景)3,事... 查看详情

apacherocketmq:使用官方demo测试rocketmq(代码片段)

当前rocketmq版本4.91.声明当前内容主要为使用官方的demo测试之前的rocketmq是否正常,测试发送和消费消息,主要参考官方文档2.pom依赖<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</ar 查看详情

rocketmq快速入门

前面几篇文章介绍了为什么选择RocketMQ,以及与kafka的一些对比:阿里RocketMQ优势对比,方便大家对于RocketMQ有一个简单的整体了解,之后介绍了:MQ应用场景,让我们知道MQ在什么时候可以使用,可以解决什么问题,之后介绍了:... 查看详情

搞懂分布式技术19:使用rocketmq事务消息解决分布式事务

搞懂分布式技术19:使用RocketMQ事务消息解决分布式事务初步认识RocketMQ的核心模块rocketmq模块rocketmq-broker:接受生产者发来的消息并存储(通过调用rocketmq-store),消费者从这里取得消息。rocketmq-client:提供发送、接受消息的客... 查看详情

转发rocketmq集群监控(代码片段)

RocketMQ目前有两个版本 alibaba版本和apache版本一、alibaba版本使用rocketmq命令查看集群状态,查看topic信息时比较麻烦,而且不直观,这个时候可以使用一些web页面来管理rocketmq。以前曾使用过一个老版本的工具,适用于alibaba版... 查看详情

rocketmq入门简介

...流量削峰、消息分发、保证最终一致性、方便动态扩容。rocketmq历史:Notify(2007)->Napoli(2010)->MetaQ(2011)->RocketMQ(2012)->开源(2016)第一代的Notify主要使用了推模型,解决了事务消;第二代的MetaQ主要使用了拉模型,解决了顺序... 查看详情

rocketmq使用

  RocketMQ是阿里巴巴在2012年开源的分布式消息中间件,目前已经捐赠给Apache基金会,并于2016年11月成为Apache孵化项目。  中间件是一类连接软件组件和应用的计算机软件,它包括一组服务。以便于运行在一台或多台机器上的... 查看详情

m1使用docker部署rocketmq单机版

...以下为本人部署时的环境介绍由于M1是arm架构,dockerhub中的rocketmq和rocketmq-console-ng控制台都需要自己编译镜像我已编译好一版,可以直接拿来用[candice0630/rocketmq-console-ng:2.0]!!重点强调:正常情况下只要执行(mvncleanpackage-Dmaven.tes... 查看详情

在ubuntu虚拟机安装rocketmq并使用

参考技术A配置好jdk、maven,详情:Linux-Ubuntu安装:JDK&Tomcat&Maven-简单教程现在网上挺多帖子还是以前的,rocketMQ现在已经迁移到apache我这里使用的是OracleVMVirtualBox在虚拟机的网络里面设置端口转发,不然可能无法跟物理主机沟通 查看详情

rocketmq使用顺序消息(代码片段)

目录说明生产端消费端总结说明RocketMQ与其它消息队列一样,一个Topic利用多个队列来存储数据,单个队列内的数据是顺序存储的,但队列间的数据无法保证顺序性。RocketMQ目前支持保证某类数据或部分数据的顺序性。... 查看详情

rocketmq不消费问题

参考技术A最近在一项目中使用RocketMQ,开始配置好服务都正常使用。突然有一天项目启动MQ不消费,最开始以为是MQ出问题并把服务重启问题还是没有解决,后面经过两小时的问题排查发现配置中心的配置被人修改,把name-server后... 查看详情

rocketmq事务消息篇之事务消息的使用(代码片段)

前言在RocketMQ事务消息篇(一)之事务消息的介绍里对RocketMQ的事务消息作了相关说明,本文提供一些基本的开发示例。java示例依赖<dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>ro 查看详情

3rocketmq可视化控制台安装与使用(代码片段)

RocketMQ提供了一些扩展项目支持,地址:https://github.com/apache/rocketmq-externals其中一个rocketmq-connect-console项目,就是我们需要的可视化控制台;我们把整个项目下载下来,打开rocketmq-connect-console项目;项目是Sp 查看详情

五.rocketmq极简入门-rocketmq延迟消息(代码片段)

使用场景我们通常使用定时任务比如Quartz来解决超时业务,比如:订单支付超时关单,VIP会员超时提醒。但是使用定时任务来处理这些业务场景在数据量大的时候并不是一个很好的选择,会造成大量的空扫描浪费... 查看详情

rocketmq核心技术精讲与高并发抗压实战

第1章课程介绍为什么掌握RocketMQ消息中间件技术对于跳槽,晋级如此重要?学习RocketMQ技术,为什么首选这门课程?电商平台双11高并发场景下是如何抗压的?MQ部分的落地是如何做的?这章讲重点为你解答这些疑惑1-1课前必读(... 查看详情

rocketmq核心技术精讲与高并发抗压实战

第1章课程介绍为什么掌握RocketMQ消息中间件技术对于跳槽,晋级如此重要?学习RocketMQ技术,为什么首选这门课程?电商平台双11高并发场景下是如何抗压的?MQ部分的落地是如何做的?这章讲重点为你解答这些疑惑1-1课前必读(... 查看详情