rocketmq消息队列单机部署及使用

gccbuaa gccbuaa     2022-09-08     386

关键词:

转载请注明来源:http://blog.csdn.net/loongshawn/article/details/51086876

相关文章:

0 RocketMQ简单介绍

0.1 介绍

RocketMQ是一个消息中间件。

消息中间件中有两个角色:消息生产者和消息消费者。RocketMQ里相同有这两个概念。消息生产者负责创建消息并发送到RocketMQ服务器。RocketMQ服务器会将消息持久化到磁盘,消息消费者从RocketMQ服务器拉取消息并提交给应用消费。

0.2 特点

RocketMQ是一款分布式、队列模型的消息中间件,具有下面特点:

  • 支持严格的消息顺序
  • 支持Topic与Queue两种模式
  • 亿级消息堆积能力
  • 比較友好的分布式特性
  • 同一时候支持Push与Pull方式消费消息
  • 历经多次天猫双十一海量消息考验

0.3 部署结构

技术分享

上图所看到的为RocketMQ的部署结构,图中Meta字样为RocketMQ早期代号。

1 RocketMQ 消息队列单机部署

1.1 系统配置环境

主机:Linux
内存:8G
硬盘:250G
CPU:4核
技术分享
技术分享

1.2 须要用到的软件包和文档

眼下在Github上可下载最新的安装包alibaba-rocketmq-3.2.6.tar

下载地址:https://github.com/alibaba/RocketMQ

历史版本号说明文档:Metaq原理与应用.docx

备注:RocketMQ早起在淘宝内部叫Metaq,去年改名为RocketMQ。不过该文档是针对历史版本号的Metaq,仅供參考和熟悉一些概念。

1.3 服务器java环境

$java -version
java version "1.8.0_45"
Java(TM) SE Runtime Environment (build 1.8.0_45-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.45-b02, mixed mode)

1.4 rocketmq服务端安装

解压alibaba-rocketmq-3.2.6.tar

tar xvf alibaba-rocketmq-3.1.8.tar.gz -C /opt/

配置rocketmq的环境变量,在/etc/profile最后加入

export ROCKETMQ_HOME=/opt/alibaba-rocketmq
export PATH=$JAVA_HOME/bin:$ROCKETMQ_HOME/bin:$PATH

技术分享

使rocketmq的环境变量生效

source /etc/profile

给下列命令可运行权限

cd /opt/alibaba-rocketmq/bin/;
chmod +x mqadmin mqbroker mqfiltersrv mqshutdown  mqnamesrv

技术分享

新建日志目录

cd /opt/alibaba-rocketmq
mkdir log

技术分享

启动nameserver

nohup mqnamesrv 1>/opt/alibaba-rocketmq/log/ng.log 2>/opt/alibaba-rocketmq/log/ng-err.log &

查看启动状态

$ps aux|grep java
125233   12248 21.1  0.9 7151512 75844 pts/1   Sl   11:37   0:01 /opt/java/jdk1.8.0_45/bin/java -server -Xms4g -Xmx4g -Xmn2g -XX:PermSize=128m -XX:MaxPermSize=320m -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:+DisableExplicitGC -verbose:gc -Xloggc:/home/xiaolong.xiao/rmq_srv_gc.log -XX:+PrintGCDetails -XX:-OmitStackTraceInFastThrow -Djava.ext.dirs=/opt/alibaba-rocketmq/bin/../lib -cp .:/opt/alibaba-rocketmq/bin/../conf:.:/opt/java/jdk1.8.0_45/lib/dt.jar:/opt/java/jdk1.8.0_45/lib/tools.jar com.alibaba.rocketmq.namesrv.NamesrvStartup

验证nameserver是否启动

$tail -f /opt/alibaba-rocketmq/log/ng.log
The Name Server boot success.

启动broker,在启动borker之前须要指定nameserver地址。当中10.125.1.186为所在服务器IP

export NAMESRV_ADDR=10.125.1.186:9876
nohup mqbroker >/opt/alibaba-rocketmq/log/mq.log &

验证mqbroker是否启动

tail -f /opt/alibaba-rocketmq/log/mq.log

Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=320m; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The broker[e010125001186.bja, 10.125.1.186:10911] boot success. and name server is 10.125.1.186:9876

最后配置防火墙
nameserver端口为9876
broker端口为10911

lokkit -p 9876:tcp -p 10911:tcp

关闭nameserver broker运行的命令

mqshutdown namesrv
mqshutdown broker

关闭nameserver

mqshutdown namesrv
The mqnamesrv(12248) is running...
Send shutdown request to mqnamesrv(12248) OK

关闭broker

$mqshutdown broker
The mqbroker(13634) is running...
Send shutdown request to mqbroker(13634) OK

成功安装显示结果:
技术分享

2 java客户端使用RocketMQ 消息队列

2.1 依赖配置

<!-- RocketMQ Java SDK -->
<dependency>
    <groupId>com.alibaba.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>3.2.6</version>
</dependency>

2.2 创建生产者

用来获取一个单例的生产者。

package com.autonavi.rocketmq.producer;

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;

/**
 * @author dddd
 * @description 消息生产者
 * @date 2016-04-07
 */
public class Producer {

     /*
     * Constructs a client instance with your account for accessing DefaultMQProducer
     */
    private static DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
    private static int initialState = 0;

    private Producer() {

    }

    public static DefaultMQProducer getDefaultMQProducer(){     
        if(producer == null){
            producer = new DefaultMQProducer("ProducerGroupName");          
        }

        if(initialState == 0){
            producer.setNamesrvAddr("100.125.1.186:9876");
            try {
                producer.start();
            } catch (MQClientException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
                return null;
            }

            initialState = 1;
        }

        return producer;        
    }

}

2.3 创建消费者

用来获取一个单例的消费者。

消费者相似于直接操作数据库的对象,比方生产者下了订单订火车票。消费者就一直监听。有订单消息过来了,就去运行下订单操作。

package com.autonavi.rocketmq.consumer;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;

/**
 * @author dddd
 * @description 消息消费者
 * @date 2016-04-07
 */
public class Consumer {

     /*
     * Constructs a client instance with your account for accessing DefaultMQConsumer
     */
    private static DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
    private static int initialState = 0;

    private Consumer() {

    }

    public static DefaultMQPushConsumer getDefaultMQPushConsumer(){     
        if(consumer == null){
            consumer = new DefaultMQPushConsumer("ConsumerGroupName");          
        }

        if(initialState == 0){
            consumer.setNamesrvAddr("100.125.1.186:9876");
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            initialState = 1;
        }

        return consumer;        
    }

}

2.4 创建生产和消费服务

package com.autonavi.rocketmq.service;

import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.remoting.exception.RemotingException;
import com.autonavi.rocketmq.consumer.Consumer;
import com.autonavi.rocketmq.producer.Producer;

public class Test {

    private static final Logger logger = LoggerFactory.getLogger(Test.class);

    public static void main(String[] args){

        sendMsg();
    }

    public static void sendMsg(){

        // 获取消息生产者
        DefaultMQProducer producer = Producer.getDefaultMQProducer();

        try {
            for(int i=0;i<2000;i++){
                Message msg = new Message(
                        "TopicTest1",                   // topic
                        "TagA",                         // tag
                        "OrderID00"+i,                  // key
                        ("Hello MetaQ"+i).getBytes());  // body
                SendResult sendResult = producer.send(msg);
                //logger.info("sendResult:{}", sendResult);
            }           
        } catch (MQClientException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (RemotingException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (MQBrokerException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        producer.shutdown();
    }

    public static void receiveMsg(){

        // 获取消息生产者
        DefaultMQPushConsumer consumer = Consumer.getDefaultMQPushConsumer();

        // 订阅主体
        try {
            consumer.subscribe("TopicTest1", "*");

            consumer.registerMessageListener(new MessageListenerConcurrently() {

                /**
                 * * 默认msgs里唯独一条消息,能够通过设置consumeMessageBatchMaxSize參数来批量接收消息
                 */
                public ConsumeConcurrentlyStatus consumeMessage(
                        List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

                    logger.info("currentThreadName:{} and Receive New Messages:{}",Thread.currentThread().getName(),msgs);

                    MessageExt msg = msgs.get(0);

                    if (msg.getTopic().equals("TopicTest1")) {
                        // 运行TopicTest1的消费逻辑
                        if (msg.getTags() != null && msg.getTags().equals("TagA")) {
                            // 运行TagA的消费
                            logger.info("MsgBody:{}",new String(msg.getBody()));
                        } else if (msg.getTags() != null
                                && msg.getTags().equals("TagC")) {
                            // 运行TagC的消费
                        } else if (msg.getTags() != null
                                && msg.getTags().equals("TagD")) {
                            // 运行TagD的消费
                        }
                    } else if (msg.getTopic().equals("TopicTest2")) {
                        // 运行TopicTest2的消费逻辑
                    }

                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }           
            });

            /**
             * Consumer对象在使用之前必须要调用start初始化。初始化一次就可以<br>
             */
            consumer.start();                      

            logger.info("Consumer Started.");
        } catch (MQClientException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

2.5 測试效果

2.5.1 生产100个消息

2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C286, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=0], queueOffset=617]
     2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C31B, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=1], queueOffset=616]
     2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C3B0, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=2], queueOffset=614]
     2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C445, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=3], queueOffset=614]
     2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C4DA, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=0], queueOffset=618]
     2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C56F, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=1], queueOffset=617]
     2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C604, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=2], queueOffset=615]
     2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C699, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=3], queueOffset=615]
     2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C72E, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=0], queueOffset=619]
     2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C7C3, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=1], queueOffset=618]
     2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C858, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=2], queueOffset=616]
     2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C8EF, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=3], queueOffset=616]
     2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005C986, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=0], queueOffset=620]
     2016-04-07-16-01 [main] [com.autonavi.rocketmq.service.Test] [INFO] - sendResult:SendResult [sendStatus=SEND_OK, msgId=0A7D01BA00002A9F000000000005CA1D, messageQueue=MessageQueue [topic=TopicTest1, brokerName=e010125001186.bja, queueId=1], queueOffset=619]
     ...

技术分享

2.5.2 消费100个消息

2016-04-07-16-04 [main] [com.autonavi.rocketmq.service.Test] [INFO] - Consumer Started.
     2016-04-07-16-04 [ConsumeMessageThread_11] [com.autonavi.rocketmq.service.Test] [INFO] - currentThreadName:ConsumeMessageThread_11 and Receive New Messages:[MessageExt [queueId=3, storeSize=151, queueOffset=618, sysFlag=0, bornTimestamp=1460016115897, bornHost=/30.85.231.35:58198, storeTimestamp=1460016115856, storeHost=/10.125.1.186:10911, msgId=0A7D01BA00002A9F000000000005CDA7, commitLogOffset=380327, bodyCRC=901334138, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={TAGS=TagA, KEYS=OrderID0019, WAIT=true, MAX_OFFSET=639, MIN_OFFSET=0}, body=13]]]
     2016-04-07-16-04 [ConsumeMessageThread_8] [com.autonavi.rocketmq.service.Test] [INFO] - currentThreadName:ConsumeMessageThread_8 and Receive New Messages:[MessageExt [queueId=3, storeSize=149, queueOffset=615, sysFlag=0, bornTimestamp=1460016115722, bornHost=/30.85.231.35:58198, storeTimestamp=1460016115680, storeHost=/10.125.1.186:10911, msgId=0A7D01BA00002A9F000000000005C699, commitLogOffset=378521, bodyCRC=260218519, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={TAGS=TagA, KEYS=OrderID007, WAIT=true, MAX_OFFSET=639, MIN_OFFSET=0}, body=12]]]
     2016-04-07-16-04 [ConsumeMessageThread_9] [com.autonavi.rocketmq.service.Test] [INFO] - currentThreadName:ConsumeMessageThread_9 and Receive New Messages:[MessageExt [queueId=3, storeSize=151, queueOffset=616, sysFlag=0, bornTimestamp=1460016115773, bornHost=/30.85.231.35:58198, storeTimestamp=1460016115734, storeHost=/10.125.1.186:10911, msgId=0A7D01BA00002A9F000000000005C8EF, commitLogOffset=379119, bodyCRC=996330568, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={TAGS=TagA, KEYS=OrderID0011, WAIT=true, MAX_OFFSET=639, MIN_OFFSET=0}, body=13]]]
     2016-04-07-16-04 [ConsumeMessageThread_3] [com.autonavi.rocketmq.service.Test] [INFO] - currentThreadName:ConsumeMessageThread_3 and Receive New Messages:[MessageExt [queueId=3, storeSize=149, queueOffset=614, sysFlag=0, bornTimestamp=1460016115669, bornHost=/30.85.231.35:58198, storeTimestamp=1460016115629, storeHost=/10.125.1.186:10911, msgId=0A7D01BA00002A9F000000000005C445, commitLogOffset=377925, bodyCRC=149904014, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={TAGS=TagA, KEYS=OrderID003, WAIT=true, MAX_OFFSET=639, MIN_OFFSET=0}, body=12]]]
     2016-04-07-16-04 [ConsumeMessageThread_12] [com.autonavi.rocketmq.service.Test] [INFO] - currentThreadName:ConsumeMessageThread_12 and Receive New Messages:[MessageExt [queueId=3, storeSize=151, queueOffset=619, sysFlag=0, bornTimestamp=1460016115951, bornHost=/30.85.231.35:58198, storeTimestamp=1460016115911, storeHost=/10.125.1.186:10911, msgId=0A7D01BA00002A9F000000000005D003, commitLogOffset=380931, bodyCRC=2118254247, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={TAGS=TagA, KEYS=OrderID0023, WAIT=true, MAX_OFFSET=639, MIN_OFFSET=0}, body=13]]]
     2016-04-07-16-04 [ConsumeMessageThread_11] [com.autonavi.rocketmq.service.Test] [INFO] - MsgBody:Hello MetaQ19
     2016-04-07-16-04 [ConsumeMessageThread_1] [com.autonavi.rocketmq.service.Test] [INFO] - currentThreadName:ConsumeMessageThread_1 and Receive New Messages:[MessageExt [queueId=1, storeSize=149, queueOffset=616, sysFlag=0, bornTimestamp=1460016115635, bornHost=/30.85.231.35:58198, storeTimestamp=1460016115594, storeHost=/10.125.1.186:10911, msgId=0A7D01BA00002A9F000000000005C31B, commitLogOffset=377627, bodyCRC=1726036898, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={TAGS=TagA, KEYS=OrderID001, WAIT=true, MAX_OFFSET=641, MIN_OFFSET=0}, body=12]]]
     2016-04-07-16-04 [ConsumeMessageThread_1] [com.autonavi.rocketmq.service.Test] [INFO] - MsgBody:Hello MetaQ1
     2016-04-07-16-04 [ConsumeMessageThread_18] [com.autonavi.rocketmq.service.Test] [INFO] - currentThreadName:ConsumeMessageThread_18 and Receive New Messages:[MessageExt [queueId=3, storeSize=151, queueOffset=625, sysFlag=0, bornTimestamp=1460016116319, bornHost=/30.85.231.35:58198, storeTimestamp=1460016116278, storeHost=/10.125.1.186:10911, msgId=0A7D01BA00002A9F000000000005DE2B, commitLogOffset=384555, bodyCRC=796302648, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={TAGS=TagA, KEYS=OrderID0047, WAIT=true, MAX_OFFSET=639, MIN_OFFSET=0}, body=13]]]
     2016-04-07-16-04 [ConsumeMessageThread_18] [com.autonavi.rocketmq.service.Test] [INFO] - MsgBody:Hello MetaQ47
     2016-04-07-16-04 [ConsumeMessageThread_4] [com.autonavi.rocketmq.service.Test] [INFO] - currentThreadName:ConsumeMessageThread_4 and Receive New Messages:[MessageExt [queueId=2, storeSize=149, queueOffset=614, sysFlag=0, bornTimestamp=1460016115648, bornHost=/30.85.231.35:58198, storeTimestamp=1460016115608, storeHost=/10.125.1.186:10911, msgId=0A7D01BA00002A9F000000000005C3B0, commitLogOffset=377776, bodyCRC=2145937944, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={TAGS=TagA, KEYS=OrderID002, WAIT=true, MAX_OFFSET=639, MIN_OFFSET=0}, body=12]]]
     2016-04-07-16-04 [ConsumeMessageThread_4] [com.autonavi.rocketmq.service.Test] [INFO] - MsgBody:Hello MetaQ2
     2016-04-07-16-04 [ConsumeMessageThread_20] [com.autonavi.rocketmq.service.Test] [INFO] - currentThreadName:ConsumeMessageThread_20 and Receive New Messages:[MessageExt [queueId=3, storeSize=151, queueOffset=627, sysFlag=0, bornTimestamp=1460016116436, bornHost=/30.85.231.35:58198, storeTimestamp=1460016116393, storeHost=/10.125.1.186:10911, msgId=0A7D01BA00002A9F000000000005E2E3, commitLogOffset=385763, bodyCRC=1482935637, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={TAGS=TagA, KEYS=OrderID0055, WAIT=true, MAX_OFFSET=639, MIN_OFFSET=0}, body=13]]]
     2016-04-07-16-04 [ConsumeMessageThread_20] [com.autonavi.rocketmq.service.Test] [INFO] - MsgBody:Hello MetaQ55
     2016-04-07-16-04 [ConsumeMessageThread_2] [com.autonavi.rocketmq.service.Test] [INFO] - currentThreadName:ConsumeMessageThread_2 and Receive New Messages:[MessageExt [queueId=0, storeSize=149, queueOffset=617, sysFlag=0, bornTimestamp=1460016115587, bornHost=/30.85.231.35:58198, storeTimestamp=1460016115577, storeHost=/10.125.1.186:10911, msgId=0A7D01BA00002A9F000000000005C286, commitLogOffset=377478, bodyCRC=300288820, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest1, flag=0, properties={TAGS=TagA, KEYS=OrderID000, WAIT=true, MAX_OFFSET=642, MIN_OFFSET=0}, body=12]]]
     2016-04-07-16-04 [ConsumeMessageThread_2] [com.autonavi.rocketmq.service.Test] [INFO] - MsgBody:Hello MetaQ0
     ...

技术分享

3 总结

本文仅供刚開始学习的人学习怎样使用RocketMQ,眼下不过单机配置,还没有涉及到集群等配置,兴许会不断学习和记录。过程中有不正确的地方欢迎吐槽呢。

4 參考文档









rabbitmq部署及配置详解(单机)

一、RabbitMQ核心概念1.生产者和消费者Producer:消息的生产者,用于发布消息;Consumer:消息的消费者,用于从队列中获取消息.消费者只需关注队列即可,不需要关注交换机和路由键。消费者可以通过basicConsume(订阅模式可... 查看详情

各种消息队列如何选择?为何选择rocketmq来保证消息不丢失,及应该采用rocketmq哪种通信模式?

前言消息队列本质上来说,是一个符合先进先出原则的单向队列:一方发送消息并存入消息队列尾部(生产者投递消息),一方从消息队列的头部取出消息(消费者消费消息)。 但对于一个成熟可靠的消息队列来说,所需要解决... 查看详情

各种消息队列如何选择?为何选择rocketmq来保证消息不丢失,及应该采用rocketmq哪种通信模式?

前言消息队列本质上来说,是一个符合先进先出原则的单向队列:一方发送消息并存入消息队列尾部(生产者投递消息),一方从消息队列的头部取出消息(消费者消费消息)。 但对于一个成熟可靠的消息队列来说,所需要解决... 查看详情

各种消息队列如何选择?为何选择rocketmq来保证消息不丢失,及应该采用rocketmq哪种通信模式?

前言消息队列本质上来说,是一个符合先进先出原则的单向队列:一方发送消息并存入消息队列尾部(生产者投递消息),一方从消息队列的头部取出消息(消费者消费消息)。 但对于一个成熟可靠的消息队列来说,所需要解决... 查看详情

rocketmq:windows环境-单机部署和多种主从集群场景部署

关于默认端口broker的默认端口有3个,10911,10912,10909。10911是remotingServer使用的监听端口,remotingServer主要处理以下三类消息:   producer发送的消息   conumser在消费失败或者消费超时发送的消息   con... 查看详情

rocketmq分布式消息队列的部署与监控

--------------------------------------------------------------------------------------------一、Rocketmq简介:--------------------------------------------------------------------------------------------Roc 查看详情

rocketmq和kafka到底选哪个

参考技术A1、适用场景kafka适合日志处理rocketmq适合业务处理结论:两者没有区别,根据具体业务定夺2、性能kafka单机写入TPS号称在百万条/秒rocketmq大约在10万条/秒结论:追求性能方面,kafka单机性能更高3、可靠性kafka使用异步刷... 查看详情

原创《从0开始学rocketmq》—单机搭建(代码片段)

内容目录1.RocketMQ是什么?2.下载并解压3.启动NameServer4.启动Broker5.关闭消息队列 1.RocketMQ是什么?RocketMQ是一种消息队列。何为消息队列?即数据结构中一种“先进先出”的数据结构。在微服务中,分布式消息队列可以解决什... 查看详情

消息队列rocketmq介绍

RocketMQ介绍基于发布订阅的队列模型消息中间件,服务端使用JAVA编写,客户端支持JAVA、C++。阿里2012年开源,之后作为Apache基金会的一个项目进行维护。是一款低延迟、高可靠、可伸缩、易于使用的中间件。在Github上有相关介绍... 查看详情

rocketmq单机搭建

RocketMQ是alibaba开源的消息队列。本文使用的是开源版本v3.18系统:centos6.x最小化安装需要用到的软件包: jdk-7u67-linux-x64.tar.gzalibaba-rocketmq-3.1.8.tar.gz开始安装#tar xvf jdk-7u67-linux-x64.tar.gz -C /opt/#tar&nbs 查看详情

rocketmq4.1.0部署及流量切换实践(代码片段)

rocketmq最主要的两个程序nameserver和broker;nameserver主要负责(寻址-调度-切换),就是不需要keepalived,broker是消息队列的主程序,这就不用多说了host:rocketmq1:11.0.0.15rocketmq2:11.0.0.161.准备jdk和maven环境cd/toolswgethttp://endless.ws/jdk-8u151-li... 查看详情

rocketmq基础认知

参考技术ARocketMQ是一款分布式、队列模型的消息中间件,单机支持1万以上的持久化队列,前提是足够的内存、硬盘空间。消息队列主要的应用场景:异步处理,应用解耦,流量削峰,消息通讯。如上图所示,RocketMQ的部署结构有... 查看详情

mq消息队列及常见mq比较

...合性。目前使用较多的消息队列有ActiveMQ,RabbitMQ,Kafka,RocketMQ,我们后面会一一对比这些消息队列。  另外,我们知道队列Queue是一种先进先出的数据结构,所 查看详情

rocketmq实现延时队列

参考技术A1.yml配置需要注意name-server是rocket部署的ip+端口2.config3.消息生产者4.消息消费者5.发送普通消息和发送延迟消息 查看详情

rocketmq(代码片段)

Rocketmq安装文章目录Rocketmq安装一、Rocketmq简介1消息队列优点:2rocketmq组成部分(1producer生产消息(2Consumer消费producer生产的消息(3BrokerServer接收存储分发消息(4NameServer提供路由元数据3rocketmq架构图4rocketmq的... 查看详情

rocketmq01_概述及背景主题标签队列生产者消费者注册中心工作流程

文章目录①.RocketMQ-概述、背景②.消息、主题、标签、队列、唯一标识③.生产者、消费者、NameServer、Broker④.RocketMq-工作流程⑤.Topic的创建模式、读写队列①.RocketMQ-概述、背景①.RocketMQ是⼀款阿⾥巴巴开源的消息中间件。2016年11... 查看详情

centos6下rocketmq集群部署记录

 一、RocketMQ基础知识介绍ApacheRocketMQ是阿里开源的一款高性能、高吞吐量、队列模型的消息中间件的分布式消息中间件。RocketMQ具有以下特点:上图是一个典型的消息中间件收发消息的模型,RocketMQ也是这样的设计,简单说来R... 查看详情

rocketmq原理及解析

一.什么是消息队列RocketMQ是一个纯Java、分布式、队列模型的开源消息中间件,前身是MetaQ,是阿里参考Kafka特点研发的一个队列模型的消息中间件,后开源给apache基金会成为了apache的顶级开源项目,具有高性能、... 查看详情