关键词:
前言
在某些业务场景下是需要消息按照顺序进行消费,比如一个账户的加钱,减钱的动作必须按照时间先后去执行,否则就会发生金额不够导致操作失败。
顺序消息
故名知意就是消息按照发送的顺序进行消费,队列本身是一种先进先出的数据结构,而RocketMQ理论上说也遵循这种机制。但是默认生产者以Round Robin轮询方式把消息发送到不同的Queue分区队列;消费者从多个队列中消费消息,这种情况没法保证顺序。所以在RocketMQ中如何保证消息顺序呢?
全局有序消息
在RocketMQ中消息分为全局有序和局部有序消息,全局有序是一个topic下的所有消息都要保证顺序,如果要保证消息全局顺序消费,就需要保证使用一个队列存放消息,一个消费者从这一个队列消费消息
就能保证顺序,即:单线程执行,可以通过 producer.setDefaultTopicQueueNums(1);来指定队列数量。
下面我们使用一个订单来模拟顺序消息,订单状态有:创建 ,支付,发货。需要按照顺序发送和消费消息
订单实体
public class Order
private Long id;
private String name;
private String status;
public Order()
public Order(Long id, String name, String status)
this.id = id;
this.name = name;
this.status = status;
public Long getId()
return id;
public void setId(Long id)
this.id = id;
public String getName()
return name;
public void setName(String name)
this.name = name;
public String getStatus()
return status;
public void setStatus(String status)
this.status = status;
发送者
生产者通过 producer.setDefaultTopicQueueNums(1);
把队列数量设置成1,然后正常发送消息
public class Producer
//演示消息同步发送
public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException
//生产者
DefaultMQProducer producer = new DefaultMQProducer("order-producer");
//设置name server地址
producer.setNamesrvAddr("127.0.0.1:9876");
//队列数量,1个
producer.setDefaultTopicQueueNums(1);
//启动
producer.start();
for (long i = 0 ; i < 4 ; i++)
Order order = new Order(i,"订单"+i,"创建");
//添加内容
byte[] bytes = (JSON.toJSONString(order)).getBytes(CharsetUtil.UTF_8);
Message message = new Message("order-topic","order",bytes);
message.setKeys("key-"+i);
//执行发送第一个消息
SendResult result = producer.send(message);
System.out.println(result);
//====================================================================
order.setStatus("支付");
//添加内容
bytes = (JSON.toJSONString(order)).getBytes(CharsetUtil.UTF_8);
message = new Message("order-topic","order",bytes);
message.setKeys("key-"+i);
//执行发送
result = producer.send(message);
System.out.println(result);
//====================================================================
order.setStatus("发货");
//添加内容
bytes = (JSON.toJSONString(order)).getBytes(CharsetUtil.UTF_8);
message = new Message("order-topic","order",bytes);
message.setKeys("key-"+i);
//执行发送
result = producer.send(message);
//打印结果
System.out.println(result);
producer.shutdown();
发送结果如下
SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC450000, offsetMsgId=AC1028C700002A9F0000000000638E1D, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=48]
SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC4E0001, offsetMsgId=AC1028C700002A9F0000000000638EF8, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=49]
SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC4F0002, offsetMsgId=AC1028C700002A9F0000000000638FD3, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=50]
SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC510003, offsetMsgId=AC1028C700002A9F00000000006390AE, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=51]
SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC550004, offsetMsgId=AC1028C700002A9F0000000000639189, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=52]
SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC560005, offsetMsgId=AC1028C700002A9F0000000000639264, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=53]
SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC580006, offsetMsgId=AC1028C700002A9F000000000063933F, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=54]
SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC590007, offsetMsgId=AC1028C700002A9F000000000063941A, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=55]
SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC5B0008, offsetMsgId=AC1028C700002A9F00000000006394F5, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=56]
SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC5D0009, offsetMsgId=AC1028C700002A9F00000000006395D0, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=57]
SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC5F000A, offsetMsgId=AC1028C700002A9F00000000006396AB, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=58]
SendResult [sendStatus=SEND_OK, msgId=7F000001244418B4AAC25E78BC62000B, offsetMsgId=AC1028C700002A9F0000000000639786, messageQueue=MessageQueue [topic=order-topic, brokerName=LAPTOP-20VLGCRC, queueId=0], queueOffset=59]
消费者
消费者设置 MessageListenerOrderly 进行顺序消费
public class Consumer
public static void main(String[] args) throws MQClientException
//创建消费者
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("order-consumer");
//设置name server 地址
defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876");
//从开始位置消费
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//广播模式
//最大线程1个
//defaultMQPushConsumer.setConsumeThreadMax(1);
//defaultMQPushConsumer.setConsumeThreadMin(1);
//同时只拉取一个消息
//defaultMQPushConsumer.setPullBatchSize(1);
//订阅
defaultMQPushConsumer.subscribe("order-topic","order");
defaultMQPushConsumer.registerMessageListener(new MessageListenerOrderly()
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context)
msgs.forEach(message->
System.out.println(message+" ; "+new String(message.getBody(), CharsetUtil.UTF_8));
);
return ConsumeOrderlyStatus.SUCCESS;
);
defaultMQPushConsumer.start();
消费结果如下
MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=48, sysFlag=0, bornTimestamp=1632010570822, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570826, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F0000000000638E1D, commitLogOffset=6524445, bodyCRC=543694636, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Messagetopic='order-topic', flag=0, properties=MIN_OFFSET=0, MAX_OFFSET=49, KEYS=key-0, CONSUME_START_TIME=1632010570828, UNIQ_KEY=7F000001244418B4AAC25E78BC450000, WAIT=true, TAGS=order, body=[123, 34, 105, 100, 34, 58, 48, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 48, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -27, -120, -101, -27, -69, -70, 34, 125], transactionId='null'] ; "id":0,"name":"订单0","status":"创建"
MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=49, sysFlag=0, bornTimestamp=1632010570830, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570830, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F0000000000638EF8, commitLogOffset=6524664, bodyCRC=400232688, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Messagetopic='order-topic', flag=0, properties=MIN_OFFSET=0, MAX_OFFSET=50, KEYS=key-0, CONSUME_START_TIME=1632010570832, UNIQ_KEY=7F000001244418B4AAC25E78BC4E0001, WAIT=true, TAGS=order, body=[123, 34, 105, 100, 34, 58, 48, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 48, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -26, -108, -81, -28, -69, -104, 34, 125], transactionId='null'] ; "id":0,"name":"订单0","status":"支付"
MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=50, sysFlag=0, bornTimestamp=1632010570831, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570832, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F0000000000638FD3, commitLogOffset=6524883, bodyCRC=1884939776, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Messagetopic='order-topic', flag=0, properties=MIN_OFFSET=0, MAX_OFFSET=51, KEYS=key-0, CONSUME_START_TIME=1632010570835, UNIQ_KEY=7F000001244418B4AAC25E78BC4F0002, WAIT=true, TAGS=order, body=[123, 34, 105, 100, 34, 58, 48, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 48, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -27, -113, -111, -24, -76, -89, 34, 125], transactionId='null'] ; "id":0,"name":"订单0","status":"发货"
MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=51, sysFlag=0, bornTimestamp=1632010570833, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570836, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F00000000006390AE, commitLogOffset=6525102, bodyCRC=1061325741, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Messagetopic='order-topic', flag=0, properties=MIN_OFFSET=0, MAX_OFFSET=53, KEYS=key-1, CONSUME_START_TIME=1632010570839, UNIQ_KEY=7F000001244418B4AAC25E78BC510003, WAIT=true, TAGS=order, body=[123, 34, 105, 100, 34, 58, 49, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 49, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -27, -120, -101, -27, -69, -70, 34, 125], transactionId='null'] ; "id":1,"name":"订单1","status":"创建"
MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=52, sysFlag=0, bornTimestamp=1632010570837, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570837, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F0000000000639189, commitLogOffset=6525321, bodyCRC=150045809, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Messagetopic='order-topic', flag=0, properties=MIN_OFFSET=0, MAX_OFFSET=53, KEYS=key-1, CONSUME_START_TIME=1632010570841, UNIQ_KEY=7F000001244418B4AAC25E78BC550004, WAIT=true, TAGS=order, body=[123, 34, 105, 100, 34, 58, 49, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 49, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -26, -108, -81, -28, -69, -104, 34, 125], transactionId='null'] ; "id":1,"name":"订单1","status":"支付"
MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=53, sysFlag=0, bornTimestamp=1632010570838, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570839, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F0000000000639264, commitLogOffset=6525540, bodyCRC=1869836929, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Messagetopic='order-topic', flag=0, properties=MIN_OFFSET=0, MAX_OFFSET=55, KEYS=key-1, CONSUME_START_TIME=1632010570844, UNIQ_KEY=7F000001244418B4AAC25E78BC560005, WAIT=true, TAGS=order, body=[123, 34, 105, 100, 34, 58, 49, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 49, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -27, -113, -111, -24, -76, -89, 34, 125], transactionId='null'] ; "id":1,"name":"订单1","status":"发货"
MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=54, sysFlag=0, bornTimestamp=1632010570840, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570840, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F000000000063933F, commitLogOffset=6525759, bodyCRC=507328046, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Messagetopic='order-topic', flag=0, properties=MIN_OFFSET=0, MAX_OFFSET=56, KEYS=key-2, CONSUME_START_TIME=1632010570845, UNIQ_KEY=7F000001244418B4AAC25E78BC580006, WAIT=true, TAGS=order, body=[123, 34, 105, 100, 34, 58, 50, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 50, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -27, -120, -101, -27, -69, -70, 34, 125], transactionId='null'] ; "id":2,"name":"订单2","status":"创建"
MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=55, sysFlag=0, bornTimestamp=1632010570841, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570842, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F000000000063941A, commitLogOffset=6525978, bodyCRC=697186802, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Messagetopic='order-topic', flag=0, properties=MIN_OFFSET=0, MAX_OFFSET=57, KEYS=key-2, CONSUME_START_TIME=1632010570847, UNIQ_KEY=7F000001244418B4AAC25E78BC590007, WAIT=true, TAGS=order, body=[123, 34, 105, 100, 34, 58, 50, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 50, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -26, -108, -81, -28, -69, -104, 34, 125], transactionId='null'] ; "id":2,"name":"订单2","status":"支付"
MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=56, sysFlag=0, bornTimestamp=1632010570843, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570844, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F00000000006394F5, commitLogOffset=6526197, bodyCRC=1309462274, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Messagetopic='order-topic', flag=0, properties=MIN_OFFSET=0, MAX_OFFSET=58, KEYS=key-2, CONSUME_START_TIME=1632010570850, UNIQ_KEY=7F000001244418B4AAC25E78BC5B0008, WAIT=true, TAGS=order, body=[123, 34, 105, 100, 34, 58, 50, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 50, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -27, -113, -111, -24, -76, -89, 34, 125], transactionId='null'] ; "id":2,"name":"订单2","status":"发货"
MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=57, sysFlag=0, bornTimestamp=1632010570845, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570846, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F00000000006395D0, commitLogOffset=6526416, bodyCRC=18326191, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Messagetopic='order-topic', flag=0, properties=MIN_OFFSET=0, MAX_OFFSET=59, KEYS=key-3, CONSUME_START_TIME=1632010570851, UNIQ_KEY=7F000001244418B4AAC25E78BC5D0009, WAIT=true, TAGS=order, body=[123, 34, 105, 100, 34, 58, 51, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 51, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -27, -120, -101, -27, -69, -70, 34, 125], transactionId='null'] ; "id":3,"name":"订单3","status":"创建"
MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=58, sysFlag=0, bornTimestamp=1632010570847, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570848, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F00000000006396AB, commitLogOffset=6526635, bodyCRC=916761971, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Messagetopic='order-topic', flag=0, properties=MIN_OFFSET=0, MAX_OFFSET=60, KEYS=key-3, CONSUME_START_TIME=1632010570853, UNIQ_KEY=7F000001244418B4AAC25E78BC5F000A, WAIT=true, TAGS=order, body=[123, 34, 105, 100, 34, 58, 51, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 51, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -26, -108, -81, -28, -69, -104, 34, 125], transactionId='null'] ; "id":3,"name":"订单3","status":"支付"
MessageExt [brokerName=LAPTOP-20VLGCRC, queueId=0, storeSize=219, queueOffset=59, sysFlag=0, bornTimestamp=1632010570850, bornHost=/172.16.40.199:1056, storeTimestamp=1632010570850, storeHost=/172.16.40.199:10911, msgId=AC1028C700002A9F0000000000639786, commitLogOffset=6526854, bodyCRC=1361468291, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Messagetopic='order-topic', flag=0, properties=MIN_OFFSET=0, MAX_OFFSET=60, KEYS=key-3, CONSUME_START_TIME=1632010570855, UNIQ_KEY=7F000001244418B4AAC25E78BC62000B, WAIT=true, TAGS=order, body=[123, 34, 105, 100, 34, 58, 51, 44, 34, 110, 97, 109, 101, 34, 58, 34, -24, -82, -94, -27, -115, -107, 51, 34, 44, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, -27, -113, -111, -24, -76, -89, 34, 125], transactionId='null'] ; "id":3,"name":"订单3","status":"发货"
局部有序消息
还有一种就是分区有序或者部分有序,部分顺序消息只要保证某一组消息被顺序消费,即:只需要保证一个队列中的消息有序消费即可。
比如:保证同一个订单ID的生成、付款、发货消息按照顺序消费即可实现原理:
- 把同一个订单ID的消息放入同一个MessageQueue
- 保证这个MessageQueue只有一个消费者不被并发处理 ,这个需要使用到 MessageQueueSelector 来保证同一个订单的消息在同一个队列
发送者
使用 MessageQueueSelector 消息队列选择器来把消息路由到不同的队列,下面案例就是把同一个订单的消息:创建,支付,发货 路由到同一个队列,达到局部消费的目的。
//演示消息同步发送
public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException
//生产者
DefaultMQProducer producer = new DefaultMQProducer("order-producer2");
//设置name server地址
producer.setNamesrvAddr("127.0.0.1:9876");
//发送消息超时时间
producer.setSendMsgTimeout(1000);
//启动
producer.start();
for (long i = 0 ; i < 4 ; i++)
Order order = new Order(i,"订单"+i,"创建");
//添加内容
byte[] bytes = (JSON.toJSONString(order)).getBytes(CharsetUtil.UTF_8);
Message message = new Message("order-topic","order2",bytes);
message.setKeys("key-"+i);
//执行发送
SendResult result = producer.send(message, new MessageQueueSelector()
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg)
Long id = (Long) arg;
//使用取模算法确定id存放到哪个队列
int index =(int) (id % mqs.size());
//index就是要存放的队列的索引
return mqs.get(index);
//把订单ID作为参数,作为选择器的基础数据
,order.getId());
//打印结果
System.out.println(result);
//====================================================================
order.setStatus("支付");
//添加内容
bytes = (JSON.toJSONString(order)).getBytes(CharsetUtil.UTF_8);
message = new Message("order-topic","order2",bytes);
message.setKeys("key-"+i);
//执行发送
result = producer.send(message,new MessageQueueSelector()
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg)
Long id = (Long) arg;
//使用取模算法确定id存放到哪个队列
int index =(int) (id % mqs.size());
//index就是要存放的队列的索引
return mqs.get(index);
,order.getId());
//打印结果
System.out.println(result);
//====================================================================
order.setStatus("发货");
//添加内容
bytes = (JSON.toJSONString(order)).getBytes(CharsetUtil.UTF_8);
message = new Message("order-topic","order2",bytes);
message.setKeys("key-"+i);
//执行发送
result = producer.send(message,new MessageQueueSelector()
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg)
Long id = (Long) arg;
//使用取模算法确定id存放到哪个队列
int index =(int) (id % mqs.size());
//index就是要存放的队列的索引
return mqs.get(index);
,order.getId());
//打印结果
System.out.println(result);
producer.shutdown();
消费者
消费者一样可通过 MessageListenerOrderly 进行顺序消费
public class Consumer
public static void main(String[] args) throws MQClientException
//创建消费者
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("order-consumer");
//设置name server 地址
defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876");
//从开始位置消费
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//广播模式
//最大线程1个
//defaultMQPushConsumer.setConsumeThreadMax(1);
//defaultMQPushConsumer.setConsumeThreadMin(1);
//同时只拉取一个消息
//defaultMQPushConsumer.setPullBatchSize(1);
//订阅
defaultMQPushConsumer.subscribe("order-topic","order");
defaultMQPushConsumer.registerMessageListener(new MessageListenerOrderly()
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context)
msgs.forEach(message->
System.out.println(message+" ; "+new String(message.getBody(), CharsetUtil.八.rocketmq极简入门-在springboot中使用rocketmq(代码片段)
...不得不考虑和SpringBoot集成,本篇文章为SpirngBoot整合RocketMQ案例SpringBoot集成RocketMQ导入依赖这里使用整合RocketMQ的基础依赖:rocketmq-spring-b 查看详情
五.rocketmq极简入门-rocketmq延迟消息(代码片段)
使用场景我们通常使用定时任务比如Quartz来解决超时业务,比如:订单支付超时关单,VIP会员超时提醒。但是使用定时任务来处理这些业务场景在数据量大的时候并不是一个很好的选择,会造成大量的空扫描浪费... 查看详情
七.rocketmq极简入门-rocketmq事务消息(代码片段)
概述如果业务只涉及到一个数据库的写操作,我们只需要保证这一个事物的提交和回滚,这种事务管理叫传统事物或本地事务,如果业务涉及到多个数据库(多个服务)的写操作,我们需要保证多个数据库同时提交... 查看详情
六.rocketmq极简入门-rocketmq消息批量发送(代码片段)
使用场景如果消息过多,每次发送消息都和MQ建立连接,无疑是一种性能开销,批量消息可以把消息打包批量发送,批量发送消息能显著提高传递小消息的性能。批量消息概述批量发送消息能显著提高传递小消息... 查看详情
一.rocketmq极简入门-mq概述&rocketmq安装(代码片段)
前言最近挺忙的,好久没更新文章了,最近在搞RocketMQ,那就先发点这个,Netty的文章等我空了再继续更。一.MQ概述1.MQ是什么MQ全称为MessageQueue,即消息队列,是一种提供消息队列服务的中间件,也称为... 查看详情
rocketmq入门到精通—rocketmq中级特性能力|解释一下顺序消息原理
...【经历了6个月的失踪,我将带着干货终究归来!【RocketMQ入门到精通】】什么是消息的顺序性消息的顺序性指的是在消息消费时,能按照发送的顺序来消费。例如:针对于商城服务的的下单到付款的流程中,会产生三条... 查看详情
rocketmq入门简介
...流量削峰、消息分发、保证最终一致性、方便动态扩容。rocketmq历史:Notify(2007)->Napoli(2010)->MetaQ(2011)->RocketMQ(2012)->开源(2016)第一代的Notify主要使用了推模型,解决了事务消;第二代的MetaQ主要使用了拉模型,解决了顺序... 查看详情
rocketmq入门到入土事务消息&顺序消息(代码片段)
接上一篇:RocketMQ入门到入土(一)新手也能看懂的原理和实战!一、事务消息的由来1、案例引用官方的购物案例:小明购买一个100元的东西,账户扣款100元的同时需要保证在下游的积分系统给小明这个账号增加100积分。账号系... 查看详情
rocketmq入门到精通—rocketmq学习入门指南|精讲rocketmq是什么
...源经历了6个月的失踪,我将带着干货终究归来!【RocketMQ入门到精通】前提概要RocketMQ是一个统一消息引擎、轻量级数据处理平台。RocketMQ是一款阿里巴巴开源的消息中间件。2016年11月28日,阿里巴巴向广西党性培训Apache... 查看详情
rocketmq入门到精通—rocketmq学习入门指南|rocketmq服务发现(nameserver)精讲
...源经历了6个月的失踪,我将带着干货终究归来!【RocketMQ入门到精通】NameServer前提概要RocketMQ中,NameServers被设计用来做简单的路由管理。其职责包括。Brokers定期向每个NameServer注册路由数据(topic以及生产者信息\\消费... 查看详情
rocketmq源码—十rocketmq顺序消息(代码片段)
RocketMQ本身支持顺序消息,在使用上发送顺序消息和非顺序消息有所区别发送顺序消息SendResultsendResult=producer.send(msg,newMessageQueueSelector()@OverridepublicMessageQueueselect(List<MessageQueue>mqs,Messagemsg,Objectarg)Integeri 查看详情
rocketmq源码—十rocketmq顺序消息(代码片段)
RocketMQ本身支持顺序消息,在使用上发送顺序消息和非顺序消息有所区别发送顺序消息SendResultsendResult=producer.send(msg,newMessageQueueSelector()@OverridepublicMessageQueueselect(List<MessageQueue>mqs,Messagemsg,Objectarg)Integeri 查看详情
rocketmq快速入门
前面几篇文章介绍了为什么选择RocketMQ,以及与kafka的一些对比:阿里RocketMQ优势对比,方便大家对于RocketMQ有一个简单的整体了解,之后介绍了:MQ应用场景,让我们知道MQ在什么时候可以使用,可以解决什么问题,之后介绍了:... 查看详情
rocketmq入门之安装运行
一、几个重要的地址Git地址:https://github.com/apache/incubator-rocketmq编译好的文件:https://rocketmq.incubator.apache.org/dowloading/releases/快速入门:https://rocketmq.incubator.apache.org/docs/quick-start/二、下载及编译(以下内容基本出自 查看详情
rocketmq专题1:入门(代码片段)
RocketMQ入门源码和应用下载?这里以RocketMQ的4.3.0版本为例,本地环境为windows10,jdk1.8,maven3.2.1.源码下载地址:http://mirrors.hust.edu.cn/apache/rocketmq/4.3.0/rocketmq-all-4.3.0-source-release.zip应用下载地址:https://www.apache.org/dyn/ 查看详情
rocketmq事务消息入门介绍(代码片段)
说明周五的时候发了篇:Rocketmq4.3支持事务啦!!!,趁着周末的时候把相关内容看了下,下面的主要内容就是关于RocketMQ事务相关内容介绍了。说明:今天这篇仅仅是入门介绍,并没有涉及到很多细节,先把大概流程说明白,... 查看详情
rocketmq
转:windows版下载:RocketMQ在windows环境下的搭建安装 windows版安装: windows版安装:RocketMQ在windows环境下的安装 入门demo:RocketMQ入门教程 查看详情
rocketmq-消息消费模式顺序消费(代码片段)
RocketMQ-消息消费模式顺序消费RocketMQ-消息消费模式集群模式集群模式的演示(本身就默认)Rocketmq存储队列广播模式顺序消费如何改实现顺序消费RocketMQ-消息消费模式集群模式在消费模式为集群的情况下,如果机器是集群的,消息只会... 查看详情