关键词:
1 在resources目录下创建config目录,新建文件rocketmq.properties文件
# 指定namesrv地址 suning.rocketmq.namesrvAddr=localhost:9876 #生产者group名称 suning.rocketmq.producerGroupName=user_group #事务生产者group名称 suning.rocketmq.transactionProducerGroupName=order_transaction #消费者group名称 suning.rocketmq.consumerGroupName=user_consumer_group #生产者实例名称 suning.rocketmq.producerInstanceName=user_producer_instance #消费者实例名称 suning.rocketmq.consumerInstanceName=user_consumer_instance #事务生产者实例名称 suning.rocketmq.producerTranInstanceName=user_producer_transacition #一次最大消费多少数量消息 suning.rocketmq.consumerBatchMaxSize=1 #广播消费 suning.rocketmq.consumerBroadcasting=false #消费的topic:tag suning.rocketmq.subscribe[0]=user-topic:white #启动的时候是否消费历史记录 suning.rocketmq.enableHistoryConsumer=false #启动顺序消费 suning.rocketmq.enableOrderConsumer=false
2 新建properties文件读取类
package com.test.domi.config; import java.util.ArrayList; import java.util.List; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; /** * @Author 18011618 * @Date 19:31 2018/7/18 * @Function 读取配置文件信息 */ @PropertySource("classpath:config/rocketmq.properties") @ConfigurationProperties(prefix = "suning.rocketmq") @Configuration public class RocketMQProperties private String namesrvAddr; private String producerGroupName; private String transactionProducerGroupName; private String consumerGroupName; private String producerInstanceName; private String consumerInstanceName; private String producerTranInstanceName; private int consumerBatchMaxSize; private boolean consumerBroadcasting; private boolean enableHistoryConsumer; private boolean enableOrderConsumer; private List<String> subscribe = new ArrayList<String>(); public String getNamesrvAddr() return namesrvAddr; public void setNamesrvAddr(String namesrvAddr) this.namesrvAddr = namesrvAddr; public String getProducerGroupName() return producerGroupName; public void setProducerGroupName(String producerGroupName) this.producerGroupName = producerGroupName; public String getTransactionProducerGroupName() return transactionProducerGroupName; public void setTransactionProducerGroupName(String transactionProducerGroupName) this.transactionProducerGroupName = transactionProducerGroupName; public String getConsumerGroupName() return consumerGroupName; public void setConsumerGroupName(String consumerGroupName) this.consumerGroupName = consumerGroupName; public String getProducerInstanceName() return producerInstanceName; public void setProducerInstanceName(String producerInstanceName) this.producerInstanceName = producerInstanceName; public String getConsumerInstanceName() return consumerInstanceName; public void setConsumerInstanceName(String consumerInstanceName) this.consumerInstanceName = consumerInstanceName; public String getProducerTranInstanceName() return producerTranInstanceName; public void setProducerTranInstanceName(String producerTranInstanceName) this.producerTranInstanceName = producerTranInstanceName; public int getConsumerBatchMaxSize() return consumerBatchMaxSize; public void setConsumerBatchMaxSize(int consumerBatchMaxSize) this.consumerBatchMaxSize = consumerBatchMaxSize; public boolean isConsumerBroadcasting() return consumerBroadcasting; public void setConsumerBroadcasting(boolean consumerBroadcasting) this.consumerBroadcasting = consumerBroadcasting; public boolean isEnableHistoryConsumer() return enableHistoryConsumer; public void setEnableHistoryConsumer(boolean enableHistoryConsumer) this.enableHistoryConsumer = enableHistoryConsumer; public boolean isEnableOrderConsumer() return enableOrderConsumer; public void setEnableOrderConsumer(boolean enableOrderConsumer) this.enableOrderConsumer = enableOrderConsumer; public List<String> getSubscribe() return subscribe; public void setSubscribe(List<String> subscribe) this.subscribe = subscribe; @Override public String toString() return "RocketMQProperties" + "namesrvAddr=‘" + namesrvAddr + ‘‘‘ + ", producerGroupName=‘" + producerGroupName + ‘‘‘ + ", transactionProducerGroupName=‘" + transactionProducerGroupName + ‘‘‘ + ", consumerGroupName=‘" + consumerGroupName + ‘‘‘ + ", producerInstanceName=‘" + producerInstanceName + ‘‘‘ + ", consumerInstanceName=‘" + consumerInstanceName + ‘‘‘ + ", producerTranInstanceName=‘" + producerTranInstanceName + ‘‘‘ + ", consumerBatchMaxSize=" + consumerBatchMaxSize + ", consumerBroadcasting=" + consumerBroadcasting + ", enableHistoryConsumer=" + enableHistoryConsumer + ", enableOrderConsumer=" + enableOrderConsumer + ", subscribe=" + subscribe + ‘‘;
3.加载properties文件
package com.test.domi.config; import javax.annotation.PostConstruct; import groovy.util.logging.Slf4j; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.List; import java.util.stream.Collectors; /** * @Author 18011618 * @Date 19:36 2018/7/18 * @Function 通过使用指定的文件读取类 来加载配置文件到字段中 */ @Configuration @EnableConfigurationProperties(RocketMQProperties.class) @Slf4j public class RocketMQConfiguration @Autowired private RocketMQProperties rocketMQProperties; //事件监听 @Autowired private ApplicationEventPublisher publisher = null; private static boolean isFirstSub = true; private static long startTime = System.currentTimeMillis(); private static Logger log = LoggerFactory.getLogger(RocketMQConfiguration.class); /** * 容器初始化的时候 打印参数 */ @PostConstruct public void init() System.err.println(rocketMQProperties.getNamesrvAddr()); System.err.println(rocketMQProperties.getProducerGroupName()); System.err.println(rocketMQProperties.getConsumerBatchMaxSize()); System.err.println(rocketMQProperties.getConsumerGroupName()); System.err.println(rocketMQProperties.getConsumerInstanceName()); System.err.println(rocketMQProperties.getProducerInstanceName()); System.err.println(rocketMQProperties.getProducerTranInstanceName()); System.err.println(rocketMQProperties.getTransactionProducerGroupName()); System.err.println(rocketMQProperties.isConsumerBroadcasting()); System.err.println(rocketMQProperties.isEnableHistoryConsumer()); System.err.println(rocketMQProperties.isEnableOrderConsumer()); System.out.println(rocketMQProperties.getSubscribe().get(0)); /** * 创建普通消息发送者实例 * @return * @throws MQClientException */ @Bean public DefaultMQProducer defaultProducer() throws MQClientException DefaultMQProducer producer = new DefaultMQProducer( rocketMQProperties.getProducerGroupName()); producer.setNamesrvAddr(rocketMQProperties.getNamesrvAddr()); producer.setInstanceName(rocketMQProperties.getProducerInstanceName()); producer.setVipChannelEnabled(false); producer.setRetryTimesWhenSendAsyncFailed(10); producer.start(); log.info("rocketmq producer server is starting...."); return producer; /** * 创建支持消息事务发送的实例 * @return * @throws MQClientException */ @Bean public TransactionMQProducer transactionProducer() throws MQClientException TransactionMQProducer producer = new TransactionMQProducer( rocketMQProperties.getTransactionProducerGroupName()); producer.setNamesrvAddr(rocketMQProperties.getNamesrvAddr()); producer.setInstanceName(rocketMQProperties .getProducerTranInstanceName()); producer.setRetryTimesWhenSendAsyncFailed(10); // 事务回查最小并发数 producer.setCheckThreadPoolMinSize(2); // 事务回查最大并发数 producer.setCheckThreadPoolMaxSize(2); // 队列数 producer.setCheckRequestHoldMax(2000); producer.start(); log.info("rocketmq transaction producer server is starting...."); return producer; /** * 创建消息消费的实例 * @return * @throws MQClientException */ @Bean public DefaultMQPushConsumer pushConsumer() throws MQClientException DefaultMQPushConsumer consumer = new DefaultMQPushConsumer( rocketMQProperties.getConsumerGroupName()); consumer.setNamesrvAddr(rocketMQProperties.getNamesrvAddr()); consumer.setInstanceName(rocketMQProperties.getConsumerInstanceName()); //判断是否是广播模式 if (rocketMQProperties.isConsumerBroadcasting()) consumer.setMessageModel(MessageModel.BROADCASTING); //设置批量消费 consumer.setConsumeMessageBatchMaxSize(rocketMQProperties .getConsumerBatchMaxSize() == 0 ? 1 : rocketMQProperties .getConsumerBatchMaxSize()); //获取topic和tag List<String> subscribeList = rocketMQProperties.getSubscribe(); for (String sunscribe : subscribeList) consumer.subscribe(sunscribe.split(":")[0], sunscribe.split(":")[1]); // 顺序消费 if (rocketMQProperties.isEnableOrderConsumer()) consumer.registerMessageListener(new MessageListenerOrderly() @Override public ConsumeOrderlyStatus consumeMessage( List<MessageExt> msgs, ConsumeOrderlyContext context) try context.setAutoCommit(true); msgs = filterMessage(msgs); if (msgs.size() == 0) return ConsumeOrderlyStatus.SUCCESS; publisher.publishEvent(new MessageEvent(msgs, consumer)); catch (Exception e) e.printStackTrace(); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; return ConsumeOrderlyStatus.SUCCESS; ); // 并发消费 else consumer.registerMessageListener(new MessageListenerConcurrently() @Override public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context) try //过滤消息 msgs = filterMessage(msgs); if (msgs.size() == 0) return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; publisher.publishEvent(new MessageEvent(msgs, consumer)); catch (Exception e) e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; ); new Thread(new Runnable() @Override public void run() try Thread.sleep(5000); try consumer.start(); catch (Exception e) e.printStackTrace(); log.info("rocketmq consumer server is starting...."); catch (InterruptedException e) e.printStackTrace(); ).start(); return consumer; /** * 消息过滤 * @param msgs * @return */ private List<MessageExt> filterMessage(List<MessageExt> msgs) if (isFirstSub && !rocketMQProperties.isEnableHistoryConsumer()) msgs = msgs.stream() .filter(item -> startTime - item.getBornTimestamp() < 0) .collect(Collectors.toList()); if (isFirstSub && msgs.size() > 0) isFirstSub = false; return msgs;
4 创建生产者
package com.test.domi.controller; import com.test.domi.dto.User; import org.apache.rocketmq.client.producer.*; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import com.alibaba.fastjson.JSON; import java.util.List; @RestController public class ProducerController @Autowired private DefaultMQProducer defaultProducer; @Autowired private TransactionMQProducer transactionProducer; /** * 发送普通消息 */ @GetMapping("/sendMessage") public void sendMsg() for(int i=0;i<10;i++) User user = new User(); user.setId(String.valueOf(i)); user.setUsername("jhp"+i); String json = JSON.toJSONString(user); Message msg = new Message("user-topic","white",json.getBytes()); try SendResult result = defaultProducer.send(msg); System.out.println("消息id:"+result.getMsgId()+":"+","+"发送状态:"+result.getSendStatus()); catch (Exception e) e.printStackTrace(); System.out.println("消息发送失败"); /** * 发送事务消息 * @return */ @GetMapping("/sendTransactionMess") public String sendTransactionMsg() SendResult sendResult = null; try // a,b,c三个值对应三个不同的状态 String ms = "c"; Message msg = new Message("user-topic","white",ms.getBytes()); // 发送事务消息 sendResult = transactionProducer.sendMessageInTransaction(msg, (Message msg1, Object arg) -> String value = ""; if (arg instanceof String) value = (String) arg; if (value == "") throw new RuntimeException("发送消息不能为空..."); else if (value =="a") return LocalTransactionState.ROLLBACK_MESSAGE; else if (value =="b") return LocalTransactionState.COMMIT_MESSAGE; return LocalTransactionState.ROLLBACK_MESSAGE; , 4); System.out.println(sendResult); catch (Exception e) e.printStackTrace(); return sendResult.toString(); /** * 支持顺序发送消息 */ @GetMapping("/sendMessOrder") public void sendMsgOrder() for(int i=0;i<100;i++) User user = new User(); user.setId(String.valueOf(i)); user.setUsername("jhp" + i); String json = JSON.toJSONString(user); Message msg = new Message("user-topic", "white", json.getBytes()); try defaultProducer.send(msg, new MessageQueueSelector() @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) int index = ((Integer) arg) % mqs.size(); return mqs.get(index); ,i); catch (Exception e) e.printStackTrace();
5.创建监听对象
package com.test.domi.config; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.common.message.MessageExt; import org.springframework.context.ApplicationEvent; import java.util.List; /** * 监听对象 * @author 18011618 * */ public class MessageEvent extends ApplicationEvent private static final long serialVersionUID = -4468405250074063206L; private DefaultMQPushConsumer consumer; private List<MessageExt> msgs; public MessageEvent(List<MessageExt> msgs, DefaultMQPushConsumer consumer) throws Exception super(msgs); this.consumer = consumer; this.setMsgs(msgs); public DefaultMQPushConsumer getConsumer() return consumer; public void setConsumer(DefaultMQPushConsumer consumer) this.consumer = consumer; public List<MessageExt> getMsgs() return msgs; public void setMsgs(List<MessageExt> msgs) this.msgs = msgs;
6.监听消息进行消费
package com.test.domi.config; import java.util.List; import org.apache.rocketmq.common.message.MessageExt; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; /** * 监听消息进行消费 */ @Component public class ConsumerService @EventListener(condition = "#event.msgs[0].topic==‘user-topic‘ && #event.msgs[0].tags==‘white‘") public void rocketmqMsgListener(MessageEvent event) try List<MessageExt> msgs = event.getMsgs(); for (MessageExt msg : msgs) System.err.println("消费消息:"+new String(msg.getBody())); catch (Exception e) e.printStackTrace();
访问/sendMessage的url生产消息,控制台打印如下:
消息id:C0A801652B5C18B4AAC20F5FF9C60032:,发送状态:SEND_OK 消息id:C0A801652B5C18B4AAC20F5FFFE20033:,发送状态:SEND_OK 消息id:C0A801652B5C18B4AAC20F5FFFE50034:,发送状态:SEND_OK 消息id:C0A801652B5C18B4AAC20F5FFFE80035:,发送状态:SEND_OK 消息id:C0A801652B5C18B4AAC20F5FFFEB0036:,发送状态:SEND_OK 消息id:C0A801652B5C18B4AAC20F5FFFEF0037:,发送状态:SEND_OK 消息id:C0A801652B5C18B4AAC20F5FFFF30038:,发送状态:SEND_OK 消息id:C0A801652B5C18B4AAC20F5FFFF90039:,发送状态:SEND_OK 消息id:C0A801652B5C18B4AAC20F5FFFFC003A:,发送状态:SEND_OK 消息id:C0A801652B5C18B4AAC20F5FFFFF003B:,发送状态:SEND_OK 消费消息:"id":"9","username":"jhp9" 消费消息:"id":"6","username":"jhp6" 消费消息:"id":"0","username":"jhp0" 消费消息:"id":"5","username":"jhp5" 消费消息:"id":"1","username":"jhp1" 消费消息:"id":"2","username":"jhp2" 消费消息:"id":"7","username":"jhp7" 消费消息:"id":"3","username":"jhp3" 消费消息:"id":"4","username":"jhp4" 消费消息:"id":"8","username":"jhp8"
apacherocketmq:使用官方demo测试rocketmq(代码片段)
当前rocketmq版本4.91.声明当前内容主要为使用官方的demo测试之前的rocketmq是否正常,测试发送和消费消息,主要参考官方文档2.pom依赖<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</ar 查看详情
rocketmq事务消息篇之事务消息的使用(代码片段)
前言在RocketMQ事务消息篇(一)之事务消息的介绍里对RocketMQ的事务消息作了相关说明,本文提供一些基本的开发示例。java示例依赖<dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>ro 查看详情
你应该知道的rocketmq(代码片段)
...是使用的是Kafka,而现在换了公司之后,更多的使用的是Rocketmq,本篇文章会尽力全面的介绍RocketMQ和Kafka各个关键点的比较,希望大家读完能有所收获。RocketMQ前身叫做MetaQ,在MeataQ发布3.0版本的时候改名为RocketMQ,其本质上的设计... 查看详情
rocketmq使用顺序消息(代码片段)
目录说明生产端消费端总结说明RocketMQ与其它消息队列一样,一个Topic利用多个队列来存储数据,单个队列内的数据是顺序存储的,但队列间的数据无法保证顺序性。RocketMQ目前支持保证某类数据或部分数据的顺序性。... 查看详情
rocketmq使用(代码片段)
rocketmq 基本使用可以看官网和官网给的demo.https://github.com/apache/rocketmq/tree/master/example这里主要说明几个点:rocketmq 发送类型常用:1,普通消息.(可以获取发送结果,失败了重试)2,有序消息.(秒杀,等需要有序的消费场景)3,事... 查看详情
八.rocketmq极简入门-在springboot中使用rocketmq(代码片段)
...不得不考虑和SpringBoot集成,本篇文章为SpirngBoot整合RocketMQ案例SpringBoot集成RocketMQ导入依赖这里使用整合RocketMQ的基础依赖:rocketmq-spring-b 查看详情
rocketmq系列批量发送与过滤(代码片段)
今天我们再来看看RocketMQ的另外两个小功能,消息的批量发送和过滤。这两个小功能提升了我们使用RocketMQ的效率。批量发送以前我们发送消息的时候,都是一个一个的发送,这样效率比较低下。能不能一次发送多个消息呢?当... 查看详情
rocketmq(20)——整合springboot(代码片段)
整合SpringBootApacheRocketMQ提供了对SpringBoot的支持,目前最新的整合版本使用的是RocketMQ4.5.1版本,使用SpringBoot是2.0.5版本。整合SpringBoot需要引入rocketmq-spring-boot-starter依赖,目前最新版本是2.0.3。<dependency><groupId>org.apache.rock... 查看详情
深入理解rocketmq普通消息和顺序消息使用,原理,优化(代码片段)
...以这方面为主。这次打压的过程中收获比较的大的是,对RocketMq的一些优化。最开始我们公司使用的是RabbitMq,在一些流量高峰的场景下,发现队列堆积比较严重,导致RabbitMq挂了。为了应对这个场景,最终我们引入了阿里云的Rocke... 查看详情
rocketmq(20)——整合springboot(代码片段)
整合SpringBootApacheRocketMQ提供了对SpringBoot的支持,目前最新的整合版本使用的是RocketMQ4.5.1版本,使用SpringBoot是2.0.5版本。整合SpringBoot需要引入rocketmq-spring-boot-starter依赖,目前最新版本是2.0.3。<dependency><groupId>org.apache.ro 查看详情
rocketmq有序消息(代码片段)
RocketMQ提供的顺序消费消息实现是使用的FIFO先进先出算法Producer消息发送publicclassProducerpublicstaticvoidmain(String[]args)throwsUnsupportedEncodingExceptiontryMQProducerproducer=newDefaultMQProducer("please_rename_unique_ 查看详情
rocketmq(01)——简介(代码片段)
RocketMQ简介笔者使用的是ApacheRocketMQ,官网是http://rocketmq.apache.org/。RocketMQ是Alibaba开源的一个分布式消息队列,可以通过http://rocketmq.apache.org/dowloading/releases/下载当前最新的版本。下载后解压缩,然后通过bin/mqnamesrv启... 查看详情
rocketmq(15)——rocketmq控制台(代码片段)
RocketMQ控制台RocketMQ提供了一个管理控制台,可以查看RocketMQ的相关信息和进行一些管理操作。RocketMQ在Github上专门有一个仓库用来存放一些与RocketMQ相关的项目,地址是https://github.com/apache/rocketmq-externals,控制台就是其中一个。如... 查看详情
rocketmq(15)——rocketmq控制台(代码片段)
RocketMQ控制台RocketMQ提供了一个管理控制台,可以查看RocketMQ的相关信息和进行一些管理操作。RocketMQ在Github上专门有一个仓库用来存放一些与RocketMQ相关的项目,地址是https://github.com/apache/rocketmq-externals,控制台就是其中一个。如... 查看详情
五.rocketmq极简入门-rocketmq延迟消息(代码片段)
使用场景我们通常使用定时任务比如Quartz来解决超时业务,比如:订单支付超时关单,VIP会员超时提醒。但是使用定时任务来处理这些业务场景在数据量大的时候并不是一个很好的选择,会造成大量的空扫描浪费... 查看详情
rocketmq使用批量消息(代码片段)
目录说明生产端消费端遗留问题说明批量发送消息条件:1、发送到同一个topic2、等待同一个发送结果3、不允许使用定时消息4、不支持半事务特性5、同一批消息大小不能超过1MB,否则需要自己进行切割发送批量消息,... 查看详情
使用springbootstarter开发rocketmq(代码片段)
RocketMQ最早是阿里巴巴开源的MQ中间件,在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景。现已捐献给Apache,目前是Apache下的顶级项目。捐献后的最... 查看详情
rocketmq入门笔记(代码片段)
消息队列经典场景优点异步原来的下单场景只是用户支付即可结束,现在需要发送成功短信,给用户增加积分,订阅物流信息等等,这就使得用户的下单时间大大加长,这样就可以使用消息队列,把各个动作发到消息队列,每个服务再去... 查看详情