关键词:
文章目录
Springboot整合使用
pom依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
yml
group: producer-demo1指定生产者组名
rocketmq:
name-server: 192.168.126.100:9876;192.168.64.152:9876
producer:
#生产者组名,规定在一个应用里面必须唯一
group: group-1
#消息发送的超时时间,默认为3000ms
send-message-timeout: 3000
#消息达到4096字节的时候,消息就会被压缩。默认4096
compress-message-body-threshold: 4096
#最大的消息限制 默认为128K
max-message-size: 4194304
#同步消息发送失败重试次数
retry-times-when-send-failed: 3
#在内部发送失败时是否重试其他代理,这个参数在有多个broker才生效。
retry-next-server: true
#异步消息发送失败重试的次数
retry-times-when-send-async-failed: 3
整合使用
消息类型不同比如string,pojo实体类型需要分不同的topic,同consumer-group需要对应一种topic,consumer不管配置什么topic,tag,最后都是以group为准
(如果某个consumer的group名字与group-name1相同,但这个consumer的topic信息与group-name1这个组的topic不同,这会导致group-name1这个组的consumer不能接收全部消息,也就是会导致部分消息无法被消费,所以不要乱操作)
(如果group不同但是topic和tag都相同,那么所有group都会消费同样的消息,有多少个group就消费多少次)
(一台机器可部署多个consumer,需要保证他们的group-name不同,通常topic与group一一对应,所以topic也不同)
为方便查看,示例关键代码
某个service层的实现类MQServiceImpl
package com.ro.service;
import com.ro.pojo.User;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.PayloadApplicationEvent;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@Service
public class MQServiceImpl implements MQService
Logger log= LoggerFactory.getLogger(MQServiceImpl.class);
@Autowired
private RocketMQTemplate mqTemplate;
/**
* 同步消息
* 消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式
*/
@Override
public void syncMQMessageSend()
mqTemplate.syncSend("topic1:tag1", "hello1");
mqTemplate.syncSend("topic1:tag1", "hello2");
SendResult s2=mqTemplate.syncSend("topic1:tag1", "hello3");
log.info("3条同步消息String类型已发送:topic:topic1,tag:tag1:",s2);
User user=new User("tom",100);
SendResult result=mqTemplate.syncSend("topic2:tag1", MessageBuilder.withPayload(user).build());
//可以简写成以下,直接传入pojo对象
SendResult result2=mqTemplate.syncSend("topic2:tag1", user.setName("lily").setAge(200));
log.info("object类型同步消息发送结果:,",result,result2);
/**
* 异步消息
* 指发送方发出数据后,不等接收方发回响应,接着发送下个数据包
* 关键实现异步发送回调接口(SendCallback)
* 在执行消息的异步发送时应用不需要等待服务器响应即可直接返回,通过回调接口接收务器响应,并对服务器的响应结果进行处理
* 这种方式任然需要返回发送消息任务的执行结果,异步不影响后续任务,不会造成阻塞
*/
@Override
public void asyncMQMessageSend()
User user=new User("tom",100);
mqTemplate.asyncSend("topic3:tag1", user, new SendCallback()
@Override
public void onSuccess(SendResult sendResult)
log.info("异步消息发送成功:",sendResult);
@Override
public void onException(Throwable throwable)
log.info("异步消息发送失败:",throwable.getMessage());
);
/**
* 单向消息
* 特点为只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答
* 此方式发送消息的过程耗时非常短,一般在微秒级别
* 应用场景:适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集
*/
@Override
public void oneWaySendMQMessageSend()
User user=new User("tom",100);
mqTemplate.sendOneWay("topic4:tag1", user);
log.info("单向消息已发送");
/**
* 延迟消息
* rocketMQ的延迟消息发送其实是已发送就已经到broker端了,然后消费端会延迟收到消息。
* RocketMQ 目前只支持固定精度的定时消息。
* 固定等级:1到18分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
* 延迟的底层方法是用定时任务实现的。
*/
@Override
public void delayedSendMQMessageSend()
User user=new User("tom",100);
SendCallback sc=new SendCallback()
@Override
public void onSuccess(SendResult sendResult)
log.info("发送异步延时消息成功");
@Override
public void onException(Throwable throwable)
log.info("发送异步延时消息失败:",throwable.getMessage());
;
Message<User> message=MessageBuilder.withPayload(user).build();
//异步延时
mqTemplate.asyncSend("topic5:tag1", message, sc, 3000, 3);
//同步延时(少一个sendCallback)
SendResult result=mqTemplate.syncSend("topic5:tag1", message, 3000, 3);
log.info("发送同步延时消息成功:",result);
/**
* 顺序消息
*使用hashcode对topic队列数量取模得到对应队列
* 使消息按照顺序被消费,顺序与生产出来的顺序一致
* 比如同一个订单生成,付费顺序需要一致,可以按照订单id来当作hashkey
*/
@Override
public void orderlyMQMessageSend()
String s1[]="tom","1";
String s2[]="klee我和tom在同一个消费者被消费,而且在tom之后","1";
String s3[]="lily我可能不会和tom在同一个消费者被消费","2";
//同步顺序,也可以是其他类型比如异步顺序,单向顺序
mqTemplate.syncSendOrderly("topic6:tag1", s1[0],s1[1]);
mqTemplate.syncSendOrderly("topic6:tag1", s2[0],s2[1]);
mqTemplate.syncSendOrderly("topic6:tag1", s3[0],s3[1]);
log.info("单向消息已发送");
/**
* 过滤消息
* Tag 过滤
* Sql 过滤
* Sql类型语法:
* 数值比较,比如:>,>=,<,<=,BETWEEN,=;
* 字符比较,比如:=,<>,IN;
* IS NULL 或者 IS NOT NULL;
* 逻辑符号 AND,OR,NOT;
*/
@Override
public void selectorMQSend()
//Tag过滤就是在发送参数上指定,比如topic1:tag1就指定了tag1,这种光topic1不指定就是所有tag
//这里使用sql92
User user=new User("tom",16);
User user2=new User("klee",9);
Message<User> message=MessageBuilder
.withPayload(user)
.setHeader("age", user.getAge())
.build();
Message<User> message2=MessageBuilder
.withPayload(user)
.setHeader("age", user2.getAge())
.build();
mqTemplate.syncSend("topic10", message);//age=16,消费者设置sql92过滤(header)头数据age=9
mqTemplate.syncSend("topic10", message2);//age=9
log.info("添加age头信息的过滤消息发送完毕");
/**
* 分布式事物消息
*生产者需要一个监听自己的类
*/
@Override
public void transactionMQSend()
User user=new User("klee",9);
Message<User> message=MessageBuilder
.withPayload(user)
.setHeader(RocketMQHeaders.TRANSACTION_ID, UUID.randomUUID())
.build();
TransactionSendResult result=mqTemplate.sendMessageInTransaction("topic15:tag1", message, null);
if(result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)
&&
result.getSendStatus().equals(SendStatus.SEND_OK))
log.info("事物消息发送成功");
log.info("事物消息发送结果:",result);
同步消息消费者
package com.ro.configure;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(consumerGroup = "consumer-group-1",topic = "topic1",selectorExpression = "tag1")
public class MQConsumer implements RocketMQListener<String>
Logger log= LoggerFactory.getLogger(MQConsumer.class);
@Override
public void onMessage(String s)
log.info("consumer-1 收到string类型消息:",s);
package com.ro.configure;
import com.ro.pojo.User;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(consumerGroup = "consumer-group-2",topic = "topic2",selectorExpression = "tag1")
public class MQConsumer implements RocketMQListener<User>
Logger log= LoggerFactory.getLogger(MQConsumer.class);
@Override
public void onMessage(User u)
log.info("consumer-2 收到user类型消息:",u);
异步消息消费者
代码与同步消费者相同
单向消息消费者
代码与同步消费者相同
延时消息消费者
代码与同步消费者相同
顺序消息消费者
只需要在之前的基础上指定consumeMode = ConsumeMode.ORDERLY
package com.ro.configure;
import com.ro.pojo.User;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(consumeMode = ConsumeMode.ORDERLY,consumerGroup = "consumer-orderly",topic = "topic6",selectorExpression = "tag1")
public class MQConsumerOrderly implements RocketMQListener<String>
Logger log= LoggerFactory.getLogger(MQConsumerOrderly.class);
@Override
public void onMessage(String s)
log.info("接收到顺序消息了:",s);
sql92过滤消息消费者
更多rocketmq集群方案安装配置请查看我的其他文章
https://blog.csdn.net/UnicornRe/article/details/117745226
需要在安装的配置文件xxx.conf加入
#支持sql92
enablePropertyFilter=true
sql语法
数值比较,比如:>,>=,<,<=,BETWEEN,=;
字符比较,比如:=,<>,IN;
IS NULL 或者 IS NOT NULL;
逻辑符号 AND,OR,NOT;
package com.ro.configure;
import com.ro.pojo.User;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
//selectorExpression指明了只能接收消息属性(header)中age的消息。
@RocketMQMessageListener(
selectorType = SelectorType.SQL92,
consumerGroup = "consumer-selector",
topic = "topic10",
selectorExpression = "age = 9")
public class MQConsumer implements RocketMQListener<User>
Logger log= LoggerFactory.getLogger(MQConsumer.class);
@Override
public void onMessage(User u)
log.info("接收到sql过滤消息了:",u);
事物消息消费者
发送方向 MQ 服务端发送消息。
MQ Server 将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。
发送方开始执行本地事务逻辑。
发送方根据本地事务执行结果向 MQ Server 提交二次确认(Commit 或是 Rollback),MQ Server 收到 Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;MQ Server 收到 Rollback 状态则删除半 消息,订阅方将不会接受该消息。
在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达 MQ Server,经过固定时间后 MQ Server 将对该消息发起消息回查。
发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
发送方根据检查得到的本地事务的最终状态再次提交二次确认,MQ Server 仍按照步骤4对半消息进行操作。
producer需要一个监听类
package com.ro.configure;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@RocketMQTransactionListener
public class TransactionListner implements RocketMQLocalTransactionListener
Logger log= LoggerFactory.getLogger(TransactionListner.class);
//保障线程安全且保证高性能的hashmap,用来记录执行结果
private static Map<String,RocketMQLocalTransactionState> transStateMap=new ConcurrentHashMap<>();
//执行本地事物
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o)
//方法里的object o这个对象是生产者发送消息方法最后一个参数的值
String transId=(String)message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
log.info("事物id:",transId);
try
//模拟执行任务
Thread.sleep查看详情
[原创]springboot整合rocketmq消息队列
参考技术A什么情况下的异步操作需要使用消息队列而不是多线程?StartNameServerStartBrokerWindows下安装RocketMq:https://www.jianshu.com/p/4a275e779afaRocketMq名词解释:https://my.oschina.net/javamaster/blog/2051703解释Push与Pull区别:https://www.jianshu.com/p/f0... 查看详情
4springboot整合rocketmq实现消息发送和接收(代码片段)
我们使用主流的SpringBoot框架整合RocketMQ来讲解,使用方便快捷;最终项目结构如下:具体步骤如下:第一步:新建SpringBoot项目rocketmq-test,引入rocketmq依赖,以及项目配置<dependency> <groupId>org.apac... 查看详情
6springboot整合rocketmq发送异步消息(代码片段)
发送异步消息是指producer向broker发送消息时指定消息发送成功及发送异常的回调方法,调用API后立即返回,producer发送消息线程不阻塞,消息发送成功或失败的回调任务在一个新的线程中执行。相对发送同步消息,... 查看详情
10springboot整合rocketmq实现延迟消息(代码片段)
延迟消息对于消息中间件来说,producer将消息发送到mq的服务器上,但并不希望这条消息马上被消费,而是推迟到当前时间节点之后的某个时间点,再将消息投递到queue中让consumer进行消费。延迟消息的使用场景... 查看详情
9springboot整合rocketmq实现顺序消息(代码片段)
rocketmq默认发送的消息是进入多个消息队列,然后消费端多线程并发消费,所以默认情况,不是顺序消费消息的;有时候,我们需要实现顺序消费一批消息,比如电商系统,订单创建,支付,完... 查看详情
7springboot整合rocketmq发送单向消息(代码片段)
发送单向消息是指producer向broker发送消息,执行API时直接返回,不等待broker服务器的结果。这种方式主要用在不特别关心发送结果的场景,举例:日志发送;RocketMQTemplate给我们提供了sendOneWay方法(有多个重载)... 查看详情
springboot实战项目整合阿里云rocketmq消息队列实现发送普通消息,延时消息(代码片段)
原文地址:Springboot实战项目整合阿里云RocketMQ消息队列实现发送普通消息,延时消息--附代码-学不会丶-博客园一.为什么选择RocketMQ消息队列?(可跳过看三的整合代码实例)首先RocketMQ是阿里巴巴自研出来的... 查看详情
13springboot整合rocketmq实现过滤消息-根据sql表达式过滤消息(代码片段)
SQL表达式方式可以根据发送消息时输入的属性进行一些计算。RocketMQ的SQL表达式语法只定义了一些基本的语法功能。数字比较,如>,>=,<,<=,BETWEEN,=;字符比较,如:=... 查看详情
springboot整合rocketmq
参考技术AApacheRocketMQ作为阿里开源的一款高性能、高吞吐量的分布式消息中间件。支持Broker和Consumer端消息过滤,支持发布订阅模型和点对点,支持拉pull和推push两种消息模式,单一队列百万消息、亿级消息堆积,支持单master节... 查看详情
12springboot整合rocketmq实现过滤消息-根据tag方式过滤消息(代码片段)
消息发送端只能设置一个tag,消息接收端可以设置多个tag。接收消息端通过‘||’设置多个tag,如下:tag1||tag2||tag3||…上实例,生产端发送三个消息,TAG分别是TAG1,TAG2,TAG3/***发送带Tag消息,测试根据Tag过滤消息*/... 查看详情
5springboot整合rocketmq发送同步消息(代码片段)
发送同步消息是指producer向broker发送消息,执行API时同步等待,直到broker服务器返回发送结果;相对异步发送消息,同步会阻塞线程,性能相对差点,但是可靠性高,这种方式得到广泛使用,比如... 查看详情
springboot系列-springboot整合rabbitmq
一RabbitMQ的介绍 RabbitMQ是消息中间件的一种,消息中间件即分布式系统中完成消息的发送和接收的基础软件.这些软件有很多,包括ActiveMQ(apache公司的),RocketMQ(阿里巴巴公司的,现已经转让给apache). 消息中间件的工作过程可... 查看详情
rocketmq(20)——整合springboot(代码片段)
整合SpringBootApacheRocketMQ提供了对SpringBoot的支持,目前最新的整合版本使用的是RocketMQ4.5.1版本,使用SpringBoot是2.0.5版本。整合SpringBoot需要引入rocketmq-spring-boot-starter依赖,目前最新版本是2.0.3。<dependency><groupId>org.apache.rock... 查看详情
rocketmq(20)——整合springboot(代码片段)
整合SpringBootApacheRocketMQ提供了对SpringBoot的支持,目前最新的整合版本使用的是RocketMQ4.5.1版本,使用SpringBoot是2.0.5版本。整合SpringBoot需要引入rocketmq-spring-boot-starter依赖,目前最新版本是2.0.3。<dependency><groupId>org.apache.ro 查看详情
springboot整合rocketmq
...tMQ(2)---Docker部署RocketMQ集群这篇在上篇搭建好的基础上,将SpringBoot整合RocketMQ实现生产消费。GitHub地址: https://github.com/yudiandemingzi/spring-boot-study一、搭建步骤先说下技术大致架构SpringBoot2.1.6+Mav 查看详情
rocketmq(二十四)整合springboot(代码片段)
1、官网RocketMQ为SpringBoot提供了整合方案,官网地址如下,上面提供了详细的整合步骤及案例。https://github.com/apache/rocketmq-spring官方详细文档(可以切换不同的版本)https://github.com/apache/rocketmq-spring/blob/release-2.0.1 查看详情
springboot整合rocketmq实现入门案例
...习了Spring整合RocketMQ的第一个案例!现在我们来学习SpringBoot如何整合RocketMQ实现更加简单的使用!文章目录1创建maven项目2配置文件3生产者4消费者5测试1创建maven项目创建一个maven项目。引入sp 查看详情