关键词:
一、环境搭建
- 采用maven多module模式,共计创建三个子module
- common:通用实体信息
- rabbitmq-publisher:消息发布者,基于SpringBoot
- rabbitmq-subscriber:消息订阅者,基于SpringBoot
- 在消息发布者和订阅者两个项目中加入rabbitmq maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
复制代码
在两个项目中加入rabbitmq的配置信息
spring:
rabbitmq:
host: xxx.xxx.xxx.xxx
port: 5672
username: username
password: password
# 虚拟主机,需要后台先配置
# virtual-host: springboot
复制代码
上述三步完成后,rabbitmq的基础环境搭建完成
rabbitmq配置属性类
- org.springframework.boot.autoconfigure.amqp.RabbitProperties
二、四大交换器
2.1 direct - 直连交换器
2.1.1 消息发送者
在消息发布者中新建配置类,声明交换器信息
- 只用声明交换器,队列和交换器绑定是订阅者操作
- 不同的类型提供不同的交换器
- 如果只声明交换器并不会创建交换器,而是绑定时或者发送消息时才创建
import org.springframework.amqp.core.DirectExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class AmqpPublisherConfig
@Bean
public DirectExchange emailDirectExchange()
// 声明方式一
// return new DirectExchange("exchange.direct.springboot.email");
// 声明方式二
return ExchangeBuilder.directExchange("exchange.direct.springboot.email").build();
复制代码
发送消息时,使用的是RabbitTemplate,为SpringBoot提供的RabbitMQ消息发送器
- org.springframework.amqp.rabbit.core.RabbitTemplate
- 发送消息示例
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
public class PublishController
@Resource
private RabbitTemplate rabbitTemplate;
@RequestMapping("/direct")
public Object direct(String message)
try
rabbitTemplate.convertAndSend("交换器", "路由键", message);
return message;
catch (AmqpException e)
System.out.println(e.getMessage());
return "网络中断,请稍后再试~";
复制代码
2.2.2 消息接收者
接收者需要配置以下内容
- 交换器:直接new对应的交换器类型
- 队列:只有Queue类型,通过名称区分
- 交换器和队列的绑定:通过BindingBuilder.bind(队列).to(交换器).with(路由键);
- 只声明交换器和队列绑定,并不会马上创建,而是在发送消息或者监听队列时才会创建
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class AmqpSubscriberConfig
/**
* 直连交换器
*/
@Bean
public DirectExchange emailDirectExchange()
// 声明方式一
// return new DirectExchange("exchange.direct.springboot.email");
// 声明方式二
return ExchangeBuilder.directExchange("exchange.direct.springboot.email").build();
/**
* 声明队列
*/
@Bean
public Queue emailQueue()
// 声明方式一
// return new Queue("queue.direct.springboot.email");
// 声明方式二
return QueueBuilder.durable("queue.direct.springboot.email").build();
/**
* 交换器和队列绑定
*/
@Bean
@Resource
public Binding emailBiding(Queue emailQueue, DirectExchange emailDirectExchange)
// 将路由使用路由键绑定到交换器上
return BindingBuilder.bind(emailQueue).to(emailDirectExchange).with("springboot.email.routing.key");
复制代码
监听队列
- 监听的队列必须存在,否则将会报错
- 监听的队列消费完成会自动确认消息
- 如果多个队列同时监听一个队列,则消息会轮训地由不同方法处理
- 可以在参数中指定接收类型,消息将会自动转为对应类型
- 也可以指定Message参数获取对应消息信息
- org.springframework.amqp.core.Message
- 获取消息属性:message.getMessageProperties()
- 获取消息内容:message.getBody()
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 消息订阅监听
*/
@Component
public class SubscriberListener
/**
* direct监听,相同监听队列消息将会轮流处理
*/
@RabbitListener(queues = "queue.direct.springboot.email")
public void receiver01(String msg)
System.out.println("receiver01 message = " + msg);
@RabbitListener(queues = "queue.direct.springboot.email")
public void receiver02(String msg)
System.out.println("receiver02 message = " + msg);
复制代码
2.1.3 消息发布订阅
1.先启动订阅者,可以看到队列声明
2. 启动发布者,然后发布消息
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
public class PublishController
@Resource
private RabbitTemplate rabbitTemplate;
@RequestMapping("/direct")
public Object direct(String message)
try
// 指定发送的交换器和路由键
rabbitTemplate.convertAndSend("exchange.direct.springboot.email", "springboot.email.routing.key", message);
return message;
catch (AmqpException e)
System.out.println(e.getMessage());
return "网络中断,请稍后再试~";
复制代码
3.订阅者会轮流收到信息
receiver01 message = direct
receiver02 message = direct
receiver01 message = direct
receiver02 message = direct
receiver01 message = direct
receiver02 message = direct
复制代码
2.2 topic - 主题交换器
2.2.1 消息发送者
声明topic交换器
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class BlogPublisherConfig
@Bean
public Exchange blogTopicExchange()
return ExchangeBuilder.topicExchange("exchange.topic.springboot.blog").build();
复制代码
声明controller
@RequestMapping("/topic")
public Object topic(String routingKey, String message)
rabbitTemplate.convertAndSend("exchange.topic.springboot.blog", routingKey, message);
return routingKey + " : " + message;
复制代码
2.2.2 消息接收者
声明交换器、三个队列、队列的绑定
- *:匹配一个串
- #:匹配一个或者多个串
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
@Configuration
public class BlogSubscriberConfig
/**
* 主题交换器
*/
@Bean
public TopicExchange blogTopicExchange()
return ExchangeBuilder.topicExchange("exchange.topic.springboot.blog").build();
@Bean
public Queue blogJavaQueue()
return QueueBuilder.durable("queue.topic.springboot.blog.java").build();
@Bean
public Queue blogMqQueue()
return QueueBuilder.durable("queue.topic.springboot.blog.mq").build();
@Bean
public Queue blogAllQueue()
return QueueBuilder.durable("queue.topic.springboot.blog.all").build();
@Bean
@Resource
public Binding blogJavaBinding(TopicExchange blogTopicExchange, Queue blogJavaQueue)
return BindingBuilder.bind(blogJavaQueue).to(blogTopicExchange).with("springboot.blog.java.routing.key");
@Bean
@Resource
public Binding blogMqBinding(TopicExchange blogTopicExchange, Queue blogMqQueue)
return BindingBuilder.bind(blogMqQueue).to(blogTopicExchange).with("springboot.blog.mq.routing.key");
@Bean
@Resource
public Binding blogAllBinding(TopicExchange blogTopicExchange, Queue blogAllQueue)
// #: 匹配一个或者多个 *:匹配一个
return BindingBuilder.bind(blogAllQueue).to(blogTopicExchange).with("springboot.blog.#.routing.key");
复制代码
监听队列
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Service
public class BlogService
/**
* topic监听
*/
@RabbitListener(queues = "queue.topic.springboot.blog.java")
public void blogJavaListener(String message)
System.out.println("blogJavaListener message = " + message);
@RabbitListener(queues = "queue.topic.springboot.blog.mq")
public void blogMqListener(String message)
System.out.println("blogMqListener message = " + message);
@RabbitListener(queues = "queue.topic.springboot.blog.all")
public void blogAllaListener(String message)
System.out.println("blogAllListener message = " + message);
复制代码
2.2.3 消息发布订阅
- 发布者发送消息
- 订阅者收到消息
- 全匹配和模糊匹配
- 全匹配无论是哪个都会被匹配上
blogJavaListener message = hello
blogAllListener message = hello
blogAllListener message = hello
blogMqListener message = hello
复制代码
2.3 fanout - 广播交换器
2.3.1 消息发送者
声明fanout交换器
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class NoticePublisherConfig
@Bean
public Exchange radioFanoutExchange()
return ExchangeBuilder.fanoutExchange("exchange.fanout.springboot.radio").build();
复制代码
声明controller
@RequestMapping("/fanout")
public Object fanout(String message)
rabbitTemplate.convertAndSend("exchange.fanout.springboot.radio", null, message);
return message;
复制代码
2.32 消息接收者
创建交换器、路由键、绑定
- 不需要使用路由键
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
@Configuration
public class NoticeSubscriberConfig
@Bean
public FanoutExchange radioFanoutExchange()
return ExchangeBuilder.fanoutExchange("exchange.fanout.springboot.radio").build();
@Bean
public Queue radioQueue()
return QueueBuilder.durable("queue.fanout.springboot.radio").build();
@Bean
@Resource
public Binding radioBinding(FanoutExchange radioFanoutExchange, Queue radioQueue)
// 广播交换器绑定没有路由键,只要绑定即可收到
return BindingBuilder.bind(radioQueue).to(radioFanoutExchange);
复制代码
监听队列
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Service
public class NoticeService
@RabbitListener(queues = "queue.fanout.springboot.radio")
public void radioListener(String message)
System.out.println("radioListener message = " + message);
复制代码
2.3.3 消息发布订阅
发布者发送消息
订阅者收到消息
radioListener message = fanout
复制代码
2.4 headers - 头交换器
2.4.1 消息发送者
- headers模式通过头匹配,会忽略路由键
- 发送者需要创建队列
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class HeadersPublisherConfig
@Bean
public Exchange radioHeadersExchange()
return ExchangeBuilder.headersExchange("exchange.headers.springboot.headers").build();
复制代码
创建controller发送消息
- MessageProperties和Message包是:org.springframework.amqp.core
- 需要创建MessageProperties对象用于设置头信息
- Message用于存储消息和消息属性信息
@RequestMapping("/headers")
public Object headers(@RequestParam Map<String, String> param)
MessageProperties properties = new MessageProperties();
properties.setHeader("name", param.get("name"));
properties.setHeader("token", param.get("token"));
Message mqMessage = new Message(param.get("message").getBytes(), properties);
rabbitTemplate.convertAndSend("exchange.headers.springboot.headers", null, mqMessage);
return properties;
复制代码
2.4.2 消息接收者
接收者和上面三种一样,同样需要声明交换器、队列、绑定
- 在队列绑定时需要使用不同规则
- BindingBuilder.bind(headersQueue01).to(headersExchange).whereAll(key).match()
- 所有字段属性和值全部匹配
- BindingBuilder.bind(headersQueue02).to(headersExchange).whereAny(key).match()
- 任意字段属性和值全部匹配
- BindingBuilder.bind(headersQueue03).to(headersExchange).whereAll("name", "token").exist()
- 指定所有属性字段存在
- BindingBuilder.bind(headersQueue03).to(headersExchange).whereAny("name", "token").exist()
- 指定任意属性存在
- BindingBuilder.bind(headersQueue01).to(headersExchange).whereAll(key).match()
- headerMap中存放的属性就是发送者中封装的属性,属性完全匹配则正确路由到此处
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class HeadersSubscriberConfig
@Bean
public HeadersExchange headersExchange()
return ExchangeBuilder.headersExchange("exchange.headers.springboot.headers").build();
@Bean
public Queue headersQueue01()
return QueueBuilder.durable("queue.headers.springboot.01").build();
@Bean
public Queue headersQueue02()
return QueueBuilder.durable("queue.headers.springboot.02").build();
@Bean
public Queue headersQueue03()
return QueueBuilder.durable("queue.headers.springboot.03").build();
@Bean
@Resource
public Binding headers01Binding(HeadersExchange headersExchange,Queue headersQueue01)
Map<String, Object> key = new HashMap<>(4);
key.put("name", "java");
key.put("token", "001");
return BindingBuilder.bind(headersQueue01).to(headersExchange).whereAll(key).match();
@Bean
@Resource
public Binding headers02Binding(HeadersExchange headersExchange,Queue headersQueue02)
Map<String, Object> key = new HashMap<>(4);
key.put("name", "java");
key.put("token", "002");
return BindingBuilder.bind(headersQueue02).to(headersExchange).whereAny(key).match();
@Bean
@Resource
public Binding headers03Binding(HeadersExchange headersExchange,Queue headersQueue03)
// name和token都需要存在
return BindingBuilder.bind(headersQueue03).to(headersExchange).whereAll("name", "token").exist();
// 任意name或者token存在
// return BindingBuilder.bind(headersQueue03).to(headersExchange).whereAny("name", "token").exist();
复制代码
队列监听
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Service
public class HeadersService
@RabbitListener(queues = "queue.headers.springboot.01")
public void headers01Listener(String message)
System.out.println("headers01Listener message = " + message);
@RabbitListener(queues = "queue.headers.springboot.02")
public void headers02Listener(String message)
System.out.println("headers02Listener message = " + message);
@RabbitListener(queues = "queue.headers.springboot.03")
public void headers03Listener(String message)
System.out.println("headers03Listener message = " + message);
复制代码
2.4.3 消息发布订阅
- 发送消息
- 接收消息
headers01Listener message = headers
headers02Listener message = headers
headers03Listener message = headers
headers02Listener message = headers
headers03Listener message = headers
headers03Listener message = headers
三、发送者异常监控
3.1 发送者异常种类
基本处理流程
- 补偿(兜底)方案
模拟broker宕机:修改发送者端口如5673,然后启动,发送消息,端口不对无法连接主机
- 错误信息:java.net.ConnectException: Connection timed out: connect
- 补偿方案:加入异常处理,如果不可达则返回错误
- 这种错误在发送的时候就已经可以发现,直接将错误返回给调用方即可
@RequestMapping("/direct")
public Object sendEmail(String msg)
try
rabbitTemplate.convertAndSend("exchange.direct.springboot.email", "queue.email.routing.key", msg);
return msg;
catch (AmqpException e)
System.out.println("发送出现异常:" + e.getMessage());
return "网络中断,请稍后再试";
复制代码
模拟无交换器异常
- 错误信息
- ERROR 4880 --- [.200.57.39:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'noExchange' in vhost '/', class-id=60, method-id=40)
- 错误说明:如果没有交换器并不会报错,只会输出一条日志
- 补偿方案:需要采用发送回调来确认是否成功发送消息
模拟无路由异常
- 错误信息:无任何提示,消息直接被丢弃
- 补偿方案:需要采用发送回调来确认是否成功发送消息
3.2 消息发送回调
因为消息是异步发送,所以需要确保消息能正确发送
所以可配置RabbitTemplate然后指定回调信息
步骤01:修改配置文件,配置回调参数
- publisher-confirm-type
- org.springframework.boot.autoconfigure.amqp.RabbitProperties#publisherConfirmType
- org.springframework.amqp.rabbit.connection.CachingConnectionFactory.ConfirmType
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: tianxin
password: tianxin
# 开启消息发broker回调
publisher-confirm-type: correlated
# 开启路由消息路由回调
publisher-returns: true
# 强制确认,也可以在代码中开启
template:
mandatory: true
复制代码
/**
* The type of publisher confirms to use.
*/
public enum ConfirmType
/**
* Use @code RabbitTemplate#waitForConfirms() (or @code waitForConfirmsOrDie()
* within scoped operations.
*/
SIMPLE,
/**
* Use with @code CorrelationData to correlate confirmations with sent
* messsages.
*/
CORRELATED,
/**
* Publisher confirms are disabled (default).
*/
NONE
复制代码
步骤02:配置RabbitTemplate,设置交换器确认回调和路由回调
- setConfirmCallback:无论成功与否都会调用
- setReturnCallback:错误时才调用
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Objects;
@Configuration
public class CustomRabbitTemplate
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory)
RabbitTemplate rabbitTemplate = new RabbitTemplate();
// 开启mandatory为true才能触发回调方法,无论消息推送结果如何强制调用回调方法
rabbitTemplate.setMandatory(true);
// 设置连接工厂信息
rabbitTemplate.setConnectionFactory(connectionFactory);
// 消息发broker回调:发送者到broker的exchange是否正确找到
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) ->
System.out.println("setConfirmCallback 消息数据:" + correlationData);
if (Objects.nonNull(correlationData))
System.out.println("setConfirmCallback 消息数据:" + correlationData.getReturnedMessage());
System.out.println("setConfirmCallback 消息确认:" + ack);
System.out.println("setConfirmCallback 原因:" + cause);
System.out.println("-----------------------------------");
);
// 消息路由回调:从交换器路由到队列是否正确发送
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) ->
System.out.println("setReturnCallback 消息:" + message);
System.out.println("setReturnCallback 回应码:" + replyCode);
System.out.println("setReturnCallback 回应信息:" + replyText);
System.out.println("setReturnCallback 交换器:" + exchange);
System.out.println("setReturnCallback 路由键:" + routingKey);
System.out.println("-----------------------------------");
);
return rabbitTemplate;
复制代码
- 路由回调和消息回调
/**
* A callback for publisher confirmations.
*
*/
@FunctionalInterface
public interface ConfirmCallback
/**
* Confirmation callback.
* @param correlationData correlation data for the callback.
* @param ack true for ack, false for nack
* @param cause An optional cause, for nack, when available, otherwise null.
*/
void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause);
/**
* A callback for returned messages.
*
*/
@FunctionalInterface
public interface ReturnCallback
/**
* Returned message callback.
* @param message the returned message.
* @param replyCode the reply code.
* @param replyText the reply text.
* @param exchange the exchange.
* @param routingKey the routing key.
*/
void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey);
复制代码
步骤03:测试controller
- convertAndSend(String exchange, String routingKey, final Object object, @Nullable CorrelationData correlationData)
- 指定CorrelationData(关联数据/对比数据)
- CorrelationData中可以指定消息id和回调消息
- "id": "dataId", data: "biz数据"
测试无交换器
- http://127.0.0.1:8071/noExchange?message=direct
- 找不到交换器:直接回调setConfirmCallback,不再继续调用
@RequestMapping("/noExchange")
public Object noExchange(String message)
try
// 连接不上路由,则消息直接丢弃
String id = UUID.randomUUID().toString();
rabbitTemplate.convertAndSend("noExchange", "springboot.email.routing.key", message, new CorrelationData(id));
return "ok";
catch (AmqpException e)
System.out.println(e.getMessage());
return e.getMessage();
复制代码
setConfirmCallback 消息数据:CorrelationData [id=9aca9a83-5815-455b-acf0-71b0caed534c]
setConfirmCallback 消息数据:null
setConfirmCallback 消息确认:false
setConfirmCallback 原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'noExchange' in vhost '/', class-id=60, method-id=40)
复制代码
测试无路由
- http://127.0.0.1:8071/noQueue?message=direct
- 找不到路由:先回调setReturnCallback再回调setConfirmCallback
@RequestMapping("/noQueue")
public Object noQueue(String message)
try
// 发送不到队列 ,则消息直接丢弃
String id = UUID.randomUUID().toString();
rabbitTemplate.convertAndSend("exchange.direct.springboot.email", "noQueue", message, new CorrelationData(id));
return "ok";
catch (AmqpException e)
System.out.println(e.getMessage());
return e.getMessage();
复制代码
setReturnCallback 消息:(Body:'direct' MessageProperties [headers=spring_returned_message_correlation=a4b6e77c-4b13-48e4-9a2e-21bd6ef4a697, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
setReturnCallback 回应码:312
setReturnCallback 回应信息:NO_ROUTE
setReturnCallback 交换器:exchange.direct.springboot.email
setReturnCallback 路由键:noQueue
-----------------------------------
setConfirmCallback 消息数据:CorrelationData [id=a4b6e77c-4b13-48e4-9a2e-21bd6ef4a697]
setConfirmCallback 消息数据:(Body:'direct' MessageProperties [headers=spring_listener_return_correlation=42813c45-b804-4303-b9f0-10a73dad71ca, spring_returned_message_correlation=a4b6e77c-4b13-48e4-9a2e-21bd6ef4a697, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=exchange.direct.springboot.email, receivedRoutingKey=noQueue, deliveryTag=0])
setConfirmCallback 消息确认:true
setConfirmCallback 原因:null
复制代码
测试消息正常发送
- http://127.0.0.1:8071/direct/confirm?message=direct
- 消息发送成功:只回调setConfirmCallback
@RequestMapping("/direct/confirm")
public Object directConfirm(String message)
try
String id = UUID.randomUUID().toString();
rabbitTemplate.convertAndSend("exchange.direct.springboot.email", "springboot.email.routing.key", message, new CorrelationData(id));
return "ok";
catch (AmqpException e)
System.out.println(e.getMessage());
return "网络中断,请稍后再试~";
复制代码
setConfirmCallback 消息数据:CorrelationData [id=9bb8a203-2345-4a7e-8bfd-8ad0226da4dc]
setConfirmCallback 消息数据:null
setConfirmCallback 消息确认:true
setConfirmCallback 原因:null
复制代码
指定回调消息的id和消息数据
- http://127.0.0.1:8071/correlationData/message?msg=direct
- 可以指定更多业务类型
- org.springframework.amqp.core.Message
- org.springframework.amqp.core.MessageProperties
@RequestMapping("/correlationData/message")
public Object correlationDataMessage(String msg)
try
String id = UUID.randomUUID().toString();
CorrelationData correlationData = new CorrelationData();
correlationData.setId(id);
// 指定回调更多信息
MessageProperties properties = new MessageProperties();
properties.setMessageId(id);
Message message = new Message(msg.getBytes(), properties);
correlationData.setReturnedMessage(message);
rabbitTemplate.convertAndSend("exchange.direct.springboot.email", "springboot.email.routing.key", msg, correlationData);
return msg;
catch (AmqpException e)
System.out.println(e.getMessage());
return "网络中断,请稍后再试~";
复制代码
setConfirmCallback 消息数据:CorrelationData [id=9f598758-4b0b-4e4a-981a-e7e04eab1335]
setConfirmCallback 消息数据:(Body:'[B@1465d3ea(byte[6])' MessageProperties [headers=, messageId=9f598758-4b0b-4e4a-981a-e7e04eab1335, contentType=application/octet-stream, contentLength=0, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])
setConfirmCallback 消息确认:true
setConfirmCallback 原因:null
复制代码
四、消息持久化
4.1 持久化说明
- 消息在网络发送、网络传输、存盘等都有可能出现意外而导致消息丢失
- 例如如果队列、交换器、消息其中一个没有开启持久化,在broker重启后消息丢失
- 所以需要在消息发送前进行存盘,然后根据状态区分不同的消息种类,可以用来做重试等
4.2 持久化表
持久化需要创建存储消息的表结构
create table msg_log
(
id bigint primary key comment '消息唯一标识',
msg text null comment '消息体, json格式化',
exchange varchar(255) default '' null comment '交换机',
routing_key varchar(255) default '' null comment '路由键',
status int default -1 null comment '状态: -1新建 0投递中 1投递成功 2投递失败 3已消费 4人工处理 5消费失败',
try_count int default 0 null comment '重试次数',
next_try_time datetime null comment '下一次重试时间',
origin_id varchar(32) null comment '原始id',
note varchar(500) null comment '错误信息',
create_time datetime null comment '创建时间',
update_time datetime null comment '更新时间',
) comment '消息投递日志';
复制代码
4.3 持久化实现
- 使用MybatisPlus生成对应的service、mapper、domain信息,标准mybatis使用方式
- 首先需要配置rabbitTemplate配置回调信息
import com.alibaba.fastjson.JSONObject;
import com.codecoord.domain.MsgLog;
import com.codecoord.domain.MsgLogStatus;
import com.codecoord.serivce.MsgLogService;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.Objects;
import java.util.UUID;
@Configuration
public class CustomRabbitTemplate
@Resource
private MsgLogService msgLogService;
@Bean
public RabbitTemplate callbackRabbitTemplate(ConnectionFactory connectionFactory)
RabbitTemplate template = new RabbitTemplate();
// 需要设置连接工程
template.setConnectionFactory(connectionFactory);
// 设置强制性
template.setMandatory(true);
// setConfirmCallback: 消息发送到 Broker 后触发回调(是否正确到达Exchange中)
// 需要在配置文件中开启 publisher-confirm-type: correlated 配置
template.setConfirmCallback((correlationData, ack, cause) ->
if (Objects.nonNull(correlationData) && Objects.nonNull(correlationData.getId()))
MsgLog updateLog = new MsgLog();
updateLog.setId(Long.parseLong(correlationData.getId()));
updateLog.setUpdateTime(LocalDateTime.now());
if (ack)
updateLog.setStatus(MsgLogStatus.DELIVERY_SUCCESS);
else
updateLog.setStatus(MsgLogStatus.DELIVERY_FAIL);
msgLogService.updateById(updateLog);
else
System.out.println("消息异常处理");
// 根据ack判断是否投递成功
System.out.println("setConfirmCallback 消息数据:" + JSONObject.toJSONString(correlationData));
System.out.println("setConfirmCallback 消息确认:" + ack);
System.out.println("setConfirmCallback 原因:" + cause);
System.out.println("-----------------------------------");
);
// setReturnCallback: 启动消息失败返回,比如路由不到队列时触发回调
// 需要在配置文件中开启 publisher-returns: true 配置
template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) ->
// 消息无法投递到队列,新建消息人工处理,因为原始消息会在setConfirmCallback中被置为投递成功
MsgLog msgLog = new MsgLog();
msgLog.setMsg(message.toString());
msgLog.setExchange(exchange);
msgLog.setRoutingKey(routingKey);
msgLog.setStatus(MsgLogStatus.MANUAL_HANDLING);
msgLog.setTryCount(0);
LocalDateTime currentTime = LocalDateTime.now();
msgLog.setNote(replyText);
msgLog.setCreateTime(currentTime);
msgLog.setUpdateTime(currentTime);
// 处理原始id
MsgLog originLog = JSONObject.parseObject(new String(message.getBody()), MsgLog.class);
msgLog.setOriginId(originLog.getId().toString());
msgLogService.save(msgLog);
System.out.println("setReturnCallback 消息:" + message);
System.out.println("setReturnCallback 回应码:" + replyCode);
System.out.println("setReturnCallback 回应信息:" + replyText);
System.out.println("setReturnCallback 交换器:" + exchange);
System.out.println("setReturnCallback 路由键:" + routingKey);
System.out.println("-----------------------------------");
);
return template;
复制代码
消息发送前对消息存盘,这里使用的rabbitTemplate为新配置的模板
- http://localhost:8071/reliable?message=direct&exchange=noExchange&routingKey=reliable.routing.key
- http://localhost:8071/reliable?message=direct&exchange=exchange.direct.reliable&routingKey=noQueue
- http://localhost:8071/reliable?message=direct&exchange=exchange.direct.reliable&routingKey=reliable.routing.key
import com.alibaba.fastjson.JSONObject;
import com.codecoord.domain.MsgLog;
import com.codecoord.domain.MsgLogStatus;
import com.codecoord.serivce.MsgLogService;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.util.IdGenerator;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.time.LocalDateTime;
@RestController
public class RabbitReliableController
@Resource
private RabbitTemplate callbackRabbitTemplate;
@Resource
private MsgLogService msgLogService;
@Resource
private IdGenerator idGenerator;
@RequestMapping("/reliable")
public Object direct(String exchange, String routingKey, String message)
try
// 先存盘再发送,如果存盘失败则没有必要继续发送
MsgLog msgLog = saveMessageLog(exchange, routingKey, message);
CorrelationData correlationData = new CorrelationData(msgLog.getId().toString());
callbackRabbitTemplate.convertAndSend(exchange, routingKey, JSONObject.toJSONString(msgLog), correlationData);
return msgLog;
catch (AmqpException e)
System.out.println(e.getMessage());
return "网络中断,请稍后再试~";
private MsgLog saveMessageLog(String exchange, String routingKey, String msg)
MsgLog msgLog = new MsgLog();
// 测试,生产中使用id生成器
msgLog.setId(System.currentTimeMillis());
msgLog.setMsg(msg);
msgLog.setStatus(MsgLogStatus.CREATE);
msgLog.setExchange(exchange);
msgLog.setRoutingKey(routingKey);
msgLog.setTryCount(0);
LocalDateTime currentTime = LocalDateTime.now();
msgLog.setCreateTime(currentTime);
msgLog.setUpdateTime(currentTime);
msgLogService.save(msgLog);
return msgLog;
复制代码
4.3 补偿机制
- 可以使用补偿机制来将存盘的消息重新发送
- 使用springboot自带的定时任务
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.codecoord.domain.MsgLog;
import com.codecoord.serivce.MsgLogService;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.List;
@Component
@EnableScheduling
public class RabbitMqJob
@Resource
private MsgLogService msgLogService;
@Resource
private RabbitTemplate callbackRabbitTemplate;
@Scheduled(cron = "10/10 * * * * ?")
public void msgResend()
// 每个消息最多重试三次
LambdaQueryWrapper<MsgLog> retryMsg = Wrappers.<MsgLog>lambdaQuery()
.eq(MsgLog::getStatus, -1)
.lt(MsgLog::getTryCount, 3);
List<MsgLog> msgLogList = msgLogService.list(retryMsg);
for (MsgLog msgLog : msgLogList)
msgLog.setTryCount(msgLog.getTryCount() + 1);
msgLog.setUpdateTime(LocalDateTime.now());
LambdaUpdateWrapper<MsgLog> updateWrapper = Wrappers.<MsgLog>lambdaUpdate()
.eq(MsgLog::getId, msgLog.getId());
boolean update = msgLogService.update(msgLog, updateWrapper);
System.out.println("重试状态更新:" + update);
callbackRabbitTemplate.convertAndSend(msgLog.getExchange(), msgLog.getRoutingKey(), msgLog.getMsg(),
new CorrelationData(msgLog.getId().toString()));
复制代码
4.5 消息测试
消息存盘之后效果如下
深度分析springaop,一文带你彻底搞懂springaop底层原理!
SpringAOP我们为什么要使用AOP(面向切面编程)?当我们在现实中完成实际的项目时,我们总是需要在一个“动作”进行前,进行中,或进行后进行一些操作,比如当我们在运行程序时,我们想要进行日志保存,或者在每一个方法... 查看详情
一文带你彻底搞懂docker中的cgroup(代码片段)
前言进程在系统中使用CPU、内存、磁盘等计算资源或者存储资源还是比较随心所欲的,我们希望对进程资源利用进行限制,对进程资源的使用进行追踪。这就让cgroup的出现成为了可能,它用来统一将进程进行分组... 查看详情
一文彻底搞懂zookeeper(代码片段)
本文是基于CentOS7.9系统环境,进行Zookeeper的学习和使用1.Zookeeper简介1.1什么是ZookeeperZookeeper是一个开源的分布式的,为分布式应用提供协调服务的Apache项目。本质上,就是文件系统+通知机制1.2Zookeeper工作机制Zookeepe... 查看详情
一文彻底搞懂zookeeper(代码片段)
本文是基于CentOS7.9系统环境,进行Zookeeper的学习和使用1.Zookeeper简介1.1什么是ZookeeperZookeeper是一个开源的分布式的,为分布式应用提供协调服务的Apache项目。本质上,就是文件系统+通知机制1.2Zookeeper工作机制Zookeepe... 查看详情
一文彻底搞懂slam技术(代码片段)
什么是SLAM?SLAM (simultaneouslocalizationandmapping),也称为CML(ConcurrentMappingandLocalization),即时定位与地图构建,或并发建图与定位。问题可以描述为:将一个机器人放入未知环境中的未知位置,是否有办法让机器人一边逐步描... 查看详情
一文彻底搞懂slam技术(代码片段)
什么是SLAM?SLAM (simultaneouslocalizationandmapping),也称为CML(ConcurrentMappingandLocalization),即时定位与地图构建,或并发建图与定位。问题可以描述为:将一个机器人放入未知环境中的未知位置,是否有办法让机器人一边逐步描... 查看详情
一文带你搞懂内存泄漏!!!(代码片段)
好文推荐:作者:codelang检测内存是否泄漏非常简单,只要在任意位置调用Debug.dumpHprofData(file)即可,通过拿到hprof文件进行分析就可以知道哪里产生了泄漏,但dump的过程会suspend所有的java线程,导致用户界... 查看详情
一文彻底搞懂前端沙箱(代码片段)
什么是“沙箱”沙箱(Sandbox)[1]也称作:“沙箱/沙盒/沙盘”。沙箱是一种安全机制,为运行中的程序提供隔离环境。通常是作为一些来源不可信、具破坏力或无法判定程序意图的程序提供实验之用。沙箱能够安全的执行不受信... 查看详情
一文彻底搞懂docker中的namespace(代码片段)
什么是namespacenamespace是对全局系统资源的一种封装隔离。这样可以让不同namespace的进程拥有独立的全局系统资源。这样改变一个namespace的系统资源只会影响当前namespace中的进程,对其它namespace中的资源没有影响。以前Linux也... 查看详情
一文彻底搞懂kafka(代码片段)
Kafka的学习和使用本文是基于CentOS7.9系统环境,进行Kafka的学习和使用一、Kafka的简介1.1Kafka基本概念(1)什么是KafkaKafka是一个分布式的基于发布/订阅模式的消息队列,主要应用于大数据实时处理领域(2)消息队列点对点模式... 查看详情
一文彻底搞懂hbase(代码片段)
本文是基于CentOS7.9系统环境,进行HBase的学习和使用一、HBase的简介1.1HBase基本概念HBase是一种分布式、可扩展、支持海量数据存储的NoSQL数据库,可以解决HDFS随机写的问题1.2HBase数据模型逻辑上,HBase的数据模型同关系... 查看详情
一文彻底搞懂hbase(代码片段)
本文是基于CentOS7.9系统环境,进行HBase的学习和使用一、HBase的简介1.1HBase基本概念HBase是一种分布式、可扩展、支持海量数据存储的NoSQL数据库,可以解决HDFS随机写的问题1.2HBase数据模型逻辑上,HBase的数据模型同关系... 查看详情
一文带你搞懂“数据”在内外网环境下怎么流通
大学还没毕业,刚出来实习,在搞网络安全的同时我连内外网网络数据包的走向,我都搞不明白怎么去搞安全,所以下面咱们一起学习网络数据包在内外网环境下是怎么进行流通的。1.内网和外网的含义 ... 查看详情
一文彻底搞懂leveldb架构(代码片段)
leveldbleveldb是一个写性能十分优秀的存储引擎,是典型的LSM-tree的实现。LSM的核心思想是为了换取最大的写性能而放弃掉部分读性能。那么,为什么leveldb写性能高?简单来说它就是尽量减少随机写的次数。leveldb首先将... 查看详情
一文彻底搞懂leveldb架构(代码片段)
leveldbleveldb是一个写性能十分优秀的存储引擎,是典型的LSM-tree的实现。LSM的核心思想是为了换取最大的写性能而放弃掉部分读性能。那么,为什么leveldb写性能高?简单来说它就是尽量减少随机写的次数。leveldb首先将... 查看详情
一文彻底搞懂leveldb架构(代码片段)
leveldbleveldb是一个写性能十分优秀的存储引擎,是典型的LSM-tree的实现。LSM的核心思想是为了换取最大的写性能而放弃掉部分读性能。那么,为什么leveldb写性能高?简单来说它就是尽量减少随机写的次数。leveldb首先将... 查看详情
一文搞懂全排列组合子集问题(建议收藏)(代码片段)
前言Hello,大家好,我是bigsai,longtimenosee!在刷题和面试过程中,我们经常遇到一些排列组合类的问题,而全排列、组合、子集等问题更是非常经典问题。本篇文章就带你彻底搞懂全排列!求全排列?全排列即:n个元素取n个元... 查看详情
❤️野指针?悬空指针?❤️一文带你搞懂!(代码片段)
🎈作者:Linux猿🎈简介:CSDN博客专家🏆,C/C++、面试、刷题、算法尽管咨询我,关注我,有问题私聊!🎈关注专栏:C/C++面试通关集锦 (优质好文持续更新中……)... 查看详情