rocketmq-尝试理解

bad_boy_f bad_boy_f     2022-08-15     533

关键词:

普通的信息发送和消费

首先要启动nameserver和broker,nameserver是一个几乎无状态节点。broker分为master和slave,master和slave的对应关系通过指定相同的BrokerName,不同的BorkerId来定义,BrokerId为0表示Master,其他为Slave。每个Broker和Name Server集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。

Producer在发送消息前,获取DefaultMQProducer,指定groupName,设置namesrvAddr。开始启动producer,用本身实例defaultMQProducer生成MQClinetManager的实例mQClinetFactory,向mQClinetFactory注册producer也就是向ConcurrentHashMap类型的producerTable中put。然后mQClientFactory.start()。
  获取并设置NameServer Addr。
  启动通讯服务。
  启动定时任务(1:定时获取NameServerAddr,2:定时更新路由信息(也就是更新DefaultMQProducerImpl中的topicPushlishInfoTable)。3:持久化消费记录,4:动态调整线程池)。
  pullMessageService.start()。
  负载均衡启动
  再次调用DefaultMQProducerImp.start(false)这次不走mQClientFactory.start()。
向所有Broker发送心跳,还有一个方法uploadFilterClassSource()(应该是加载过滤器的)
获取完producer(DefaultMQProducer)后,初始化要发送的消息,消息中设置topic,tag,key,消息体(body)。
发送消息。producer.send()
获取路由信息(从topicPublishInfoTable获取topic的TopicPushlishInfo,没有路由信息或 !topicPublishInfo.ok()从NameServer获取,构建请求头,remoteClinic调nameServer获取,看本地是否需要更新,如果需要,更新到本地,获取过来的TopicRouteData转成TopicPublishInfo(producer用的)跟新到本地,也就是向topicPublishInfoTable中put。转成subscribeInfo(consumer用的)向RebalanceImpl.topicSubscribeInfoTable中put)
获取重试次数(timesTotal),循环这么多次发送消息。获取上一次发送失败的lastBrokerName,第一次发送时,mq为空,如果是轮训,依次轮训topic下的MessageQueue,跳过上次失败的,选取一个mq。如果是顺序消息,就是自己根据算法选择一个MessageQueue列表的中的一个mq。
开始发送。根据上面获取的brokername从MQClientInstance.brokerAddrTable获取brokerAddr,如果为空重复上面的获取路由信息。继续获取。构建SendMessageRequestHeader发送。
=========================================================================================
broker端接受到消息,将消息写入到CommitLog中。
=========================================================================================
Consumer启动前,获取Consumer对象(这里是DefaultMQPushConsumer),指定groupName,设置NamesrvAddr,指定ConsumeFromWhere,指定订阅topic,push的方法指定MessageListener。consumer.start()启动
复制订阅关系。defaultMQPushConsumer.subscription复制到RebalanceImpl.subscriptionInner。初始化rebalanceImpl对象。构建offsetStore消费对象。consumeMessageService.start()启动消费消息服务。mQClientFactory注册这个消费者(也就是MQClientInstance.consumerTable中put)。mQClientFactory.start()和上面producer启动的过程一样。
在rebalanceService.start()中,进行负载均衡。大体是遍历所有的topic,构建PullRequest,经过多步调用,放在PullMessageService的pullRequestQueue队列中,让pullMessageService线程去获取。
pullMessageService.start()这个是在rebalanceService.start()上面一行启动的。而pullMessageService.start()才是正真的消费。

简单理解rocketmq---入门

简介:    RocketMQ是阿里巴巴出品的一款低延迟,可靠,可扩展,易于使用的面向消息的中间件,支撑着阿里巴巴集团庞大的消息业务增长。RocketMQ是基于MetaQ的一个开源分支,几乎重写了MetaQ所有的核心组件,可以... 查看详情

深入理解rocketmq---实战(控制台搭建)(代码片段)

 rocketMQ控制台搭建(1)下载rocketmq-console代码:https://github.com/875279177/incubator-rocketmq-externals(2)修改配置application配置文件,主要修改端口号及rocketmq.config.dataPathserver.contextPath=server.port=8080#spring.application.index=truespring.application.n... 查看详情

一文带你理解rocketmq广播模式实现机制

RocketMQ有两种消费模式,集群模式和广播模式。集群模式是指RocketMQ中的一条消息只能被同一个消费者组中的一个消费者消费。如下图,Producer向TopicTest这个Topic并发写入3条新消息,分别被分配到了MessageQueue1~MessageQueue3这3个队列... 查看详情

深入理解rocketmq是如何做到高性能的?

1、RocketMQ的核心Broker对rocketmq稍有了解的同学,都知道它主要由4部分组成,Producer、Consumer、Broker、NameServer。Broker作为RocketMQ的核心,提供了强大的数据存储能力,可以把亿万级的消息存储在服务器磁盘上。它决定... 查看详情

深入理解rocketmq是如何做到高性能的?

1、RocketMQ的核心Broker对rocketmq稍有了解的同学,都知道它主要由4部分组成,Producer、Consumer、Broker、NameServer。Broker作为RocketMQ的核心,提供了强大的数据存储能力,可以把亿万级的消息存储在服务器磁盘上。它决定... 查看详情

深入理解rocketmq普通消息和顺序消息使用,原理,优化(代码片段)

...以这方面为主。这次打压的过程中收获比较的大的是,对RocketMq的一些优化。最开始我们公司使用的是RabbitMq,在一些流量高峰的场景下,发现队列堆积比较严重,导致RabbitMq挂了。为了应对这个场景,最终我们引入了阿里云的Rocke... 查看详情

一文带你理解rocketmq广播模式实现机制(代码片段)

RocketMQ有两种消费模式,集群模式和广播模式。集群模式是指RocketMQ中的一条消息只能被同一个消费者组中的一个消费者消费。如下图,Producer向TopicTest这个Topic并发写入3条新消息,分别被分配到了MessageQueue1~MessageQueue3... 查看详情

深入理解rocketmq--消息存储

一、MQ存储分类文件系统:RocketMQ/Kafka/RabbitMQ关系型数据库DB:ActiveMQ(默认采用的KahaDB做消息存储)可选用JDBC的方式来做消息持久化分布式KV存储:ZeroMQ对比:存储效率, 文件系统>分布式KV存储>关系型数据库DB易于实现... 查看详情

必须先理解的rocketmq入门手册,才能再次深入解读(代码片段)

RocketMQ入门手册RocketMQ是一个分布式、队列模型的开源消息中间件,前身是MetaQ,是阿里研发的一个队列模型的消息中间件,后开源给apache基金会成为了apache的顶级开源项目,具有高性能、高可靠、高实时、分布式特点,同时,广... 查看详情

rocketmq——角色与术语详解

原文地址:http://jaskey.github.io/blog/2016/12/15/rocketmq-concept/RocketMQ——角色与术语详解2016-12-15THU 15:48RocketMQ中有很多概念,其中包括一些术语和角色。理清楚基本的概念能有效的帮助理解RocketMQ的原理以及排查问题。角色:Producer... 查看详情

rocketmq-rebalance介绍

参考技术ArocketMq概念介绍rocketMq-namesrv介绍rocketMq-Topic创建过程rocketMq-producer介绍rocketMq-consumer介绍rocketMq-rebalance介绍rocketMq-并发消费过程rocketMq-串行消费过程rocketMq-broker介绍rocketMq-broker消息存储介绍rocketMq-commitLogrocketMq-index介绍roc... 查看详情

rocketmq-架构原理(代码片段)

RocketMQ是阿里开源的分布式消息中间件,跟其它中间件相比,RocketMQ的特点是纯JAVA实现;集群和HA实现相对简单;在发生宕机和其它故障时消息丢失率更低。一、RocketMQ专业术语先讲专业术语的含义,后面会画流程图来更好的去理... 查看详情

中间件系列activemq,rocketmq,rabbitmq,kafka,mycat让你深入理解学习中间件

以前的网络主要是客户端与服务器(C/S)结构或浏览器/服务器(B/S)  形式的两层结构,随着企业信息的不断扩大,企业级应用不再满足于简单的两层系统,而是向着三层和多层体系结构发展。中间件就是在其中加入一... 查看详情

rocketmq-topic创建过程

参考技术ArocketMq概念介绍rocketMq-namesrv介绍rocketMq-Topic创建过程rocketMq-producer介绍rocketMq-consumer介绍rocketMq-rebalance介绍rocketMq-并发消费过程rocketMq-串行消费过程rocketMq-broker介绍rocketMq-broker消息存储介绍rocketMq-commitLogrocketMq-index介绍roc... 查看详情

rocketmq架构原理及名词概念

这节主要讲述RocketMQ的整体架构,和常用术语解释。当我们接触一个新东西的时候,一定要知道他的原理,只有知道原理之后,才会产生问题。只有带着问题去读源码才会事半功倍。首先盗用官方的一张图片:(官方地址:http://... 查看详情

rocketmq(代码片段)

应用场景 主要作用解耦、滑峰填谷 异构系统的整合,这个问题比较容易理解,在原阿里SOAESB比较火的年代,很多异构系统需要进行互联互通。应用和应用之间的松耦合,这个在阿里巴巴内部很多的同步链路到异步链路里... 查看详情

分布式事务之深入理解什么是2pc3pc及tcc协议?

导读 在上一篇文章《【分布式事务】基于RocketMQ搭建生产级消息集群?》中给大家介绍了基于RocketMQ如何搭建生产级消息集群。因为本系列文章最终的目的是介绍基于RocketMQ的事物消息来解决分布式系统中的数据一致性问题,... 查看详情

rocketmq最佳实战

RocketMQ客户端最佳实践1.Producer最佳实践发送消息注意事项1).一个应用尽可能用一个Topic,消息子类型用tags来标识,tags可以由应用自由设置。2).消息发送成功或者失败,要打印消息日志,务必要打印sendresult和key字段。SEND_OK,消息... 查看详情