redis消息队列有没有

author author     2023-05-09     436

关键词:

参考技术A 基于Redis消息队列-实现短信服务化
1.Redis实现消息队列原理
常用的消息队列有RabbitMQ,ActiveMQ,个人觉得这种消息队列太大太重,本文介绍下基于Redis的轻量级消息队列服务。
一般来说,消息队列有两种模式,一种是发布者订阅模式,另外一种是生产者和消费者模式。Redis的消息队列,也是基于这2种原理的实现。
发布者和订阅者模式:发布者发送消息到队列,每个订阅者都能收到一样的消息。
生产者和消费者模式:生产者将消息放入队列,多个消费者共同监听,谁先抢到资源,谁就从队列中取走消息去处理。注意,每个消息只能最多被一个消费者接收。
2.Redis消息队列使用场景
在我们的项目中,使用消息队列来实现短信的服务化,任何需要发送短信的模块,都可以直接调用短信服务来完成短信的发送。比如用户系统登录注册短信,订单系统的下单成功的短信等。
3.SpringMVC中实现Redis消息队列
因为我们短信只需要发送一次,所以我们使用的是消息队列的生产者和消费者模式。
3.1引入Maven依赖
引入Redis相应的maven依赖,这里需要spring-data-redis和jedis
//pom.xml <dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>1.6.0.RELEASE</version>
</dependency>

<!-- jedis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.5.1</version>
</dependency>12345678910111213

3.2配置redis生成者消费者模式
//applicationContext-redis.xml <?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:cache="http://www.springframework.org/schema/cache"
xmlns:redis="http://www.springframework.org/schema/redis"
xsi:schemaLocation="http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-34.0.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache-4.0.xsd
http://www.springframework.org/schema/redis http://www.springframework.org/schema/redis/spring-redis-1.0.xsd">
<description>spring-data-redis配置</description>

<bean id="redisConnectionFactory"
class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
<property name="hostName" value="$redis.host"></property>
<property name="port" value="$redis.port"></property>
<property name="usePool" value="true"></property>
</bean>

<bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
<property name="connectionFactory" ref="redisConnectionFactory"></property>
</bean>

<bean id="jdkSerializer"
class="org.springframework.data.redis.serializer.JdkSerializationRedisSerializer" />

<bean id="smsMessageListener"
class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter">
<property name="delegate" ref="smsMessageDelegateListener" />
<property name="serializer" ref="jdkSerializer" />
</bean>

<bean id="sendMessage" class="com.djt.common.cache.redis.queue.SendMessage">
<property name="redisTemplate" ref="redisTemplate"/>
</bean>

<redis:listener-container>
<redis:listener ref="smsMessageListener" method="handleMessage"
serializer="jdkSerializer" topic="sms_queue_web_online" />
</redis:listener-container>

<!-- jedis -->
<bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
<property name="maxIdle" value="300" /> <!-- 最大能够保持idel状态的对象数 -->
<property name="maxTotal" value="60000" /> <!-- 最大分配的对象数 -->
<property name="testOnBorrow" value="true" /> <!-- 当调用borrow Object方法时,是否进行有效性检查 -->
</bean>

<bean id="jedisPool" class="redis.clients.jedis.JedisPool">
<constructor-arg index="0" ref="jedisPoolConfig" />
<constructor-arg index="1" value="$redis.host" />
<constructor-arg index="2" value="$redis.port" type="int" />
</bean></beans>123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960

主要的配置说明:
1.序列化:一般我们向Redis发送一个消息定义的Java对象,这个对象需要序列化。这里使用JdkSerializationRedisSerializer:
<bean id="jdkSerializer" class="org.springframework.data.redis.serializer.JdkSerializationRedisSerializer" />1

2.发送者:
<bean id="sendMessage" class="com.djt.common.cache.redis.queue.SendMessage">
<property name="redisTemplate" ref="redisTemplate"/>
</bean>123

3.监听者:
<bean id="smsMessageListener"
class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter">
<property name="delegate" ref="smsMessageDelegateListener" />
<property name="serializer" ref="jdkSerializer" />
</bean>
<redis:listener-container>
<redis:listener ref="smsMessageListener" method="handleMessage"
serializer="jdkSerializer" topic="sms_queue_web_online" />
</redis:listener-container>123456789

smsMessageListener:消息监听器
redis:listener-container:定义消息监听,method:监听消息执行的方法,serializer:序列化,topic:监听主题(可以理解为队列名称)
3.3代码实现
1.定义短信消息对象SmsMessageVo
public class SmsMessageVo implements Serializable
//id
private Integer smsId; //手机号
private String mobile; //类型,1:验证码 2:订单通知
private Byte type; //短信创建时间
private Date createDate; //短信消息处理时间
private Date processTime; //短信状态,1:未发送 2:发送成功 3:发送失败
private Byte status; //短信内容
private String content; //省略setter和getter方法
...123456789101112131415161718192021222324

2.定义消息队列发送对象SendMessage
//SendMessage.javapublic class SendMessage private RedisTemplate<String, Object> redisTemplate; public RedisTemplate<String, Object> getRedisTemplate() return redisTemplate;
public void setRedisTemplate(RedisTemplate<String, Object> redisTemplate) this.redisTemplate = redisTemplate;
public void sendMessage(String channel, Serializable message)
redisTemplate.convertAndSend(channel, message);

12345678910111213141516171819202122

3.发送消息
String smsContent = templateToContent(template.getContent(),
regMsgCode);
SmsMessageVo smsMessageVo = new SmsMessageVo();
smsMessageVo.setMobile(mobile);
smsMessageVo.setType((byte) SmsType.VERIFICATION.getType());
smsMessageVo.setChannelId(1);
smsMessageVo.setContent(smsContent);
smsMessageVo.setCreateDate(new Date());
smsMessageVo.setStatus((byte) SmsSendStatus.TO_SEND.getType());
smsMessageVo.setTemplateId(1);

//先把待发送的短信存入数据库
SmsQueue smsQueue = new SmsQueue();
BeanUtils.copyProperties(smsQueue, smsMessageVo);
smsQueueService.addSmsQueue(smsQueue);

//异步发送短信到redis队列
sendMessage.sendMessage(Constants.REDIS_QUEUE_SMS_WEB, smsMessageVo);
//Constants.REDIS_QUEUE_SMS_WEB = "sms_queue_web_online",和applicationContext-redis中topic配置一样123456789101112131415161718192021

4.监听消息
//SmsMessageDelegateListener.java@Component("smsMessageDelegateListener")public class SmsMessageDelegateListener

@Autowired
private SmsQueueService smsQueueService; //监听Redis消息
public void handleMessage(Serializable message) if(message instanceof SmsMessageVo)
SmsMessageVo messageVo = (SmsMessageVo) message; //发送短信
SmsSender smsSender = SmsSenderFactory.buildEMaySender();
smsSender.setMobile(messageVo.getMobile());
smsSender.setContent(messageVo.getContent()); boolean sendSucc = false; //判断短信类型
//验证码短信
if(messageVo.getType() == (byte)SmsType.VERIFICATION.getType())
sendSucc = smsSender.send();
if(!sendSucc) return;
// 异步更新短信表状态为发送成功
final Integer smsId = messageVo.getSmsId();
Executor executor = Executors.newSingleThreadExecutor();
executor.execute(new Runnable() public void run()
SmsQueue smsQueue = new SmsQueue();
smsQueue.setSmsId(smsId);
smsQueue.setStatus((byte)SmsSendStatus.SEND.getType());
smsQueue.setProcessTime(new Date());
smsQueueService.updateSmsQueue(smsQueue);

);



123456789101112131415161718192021222324252627282930313233343536373839404142434445

4.总结
下面使用一张流程图,来总结Redis消息队列和短信服务。

redis的基本使用(二)消息队列

参考技术A使用消息中间件的时候,并非每次都需要非常专业的消息中间件,假如只有一个消息队列,只有一个消费者,那就没有必要去使用专业的消息中间件,这种情况可以直接使用Redis来做消息队列。Redis的消息队列不是特别... 查看详情

将redis发布订阅模式用做消息队列和rabbitmq的区别

可靠性redis:没有相应的机制保证消息的可靠消费,如果发布者发布一条消息,而没有对应的订阅者的话,这条消息将丢失,不会存在内存中;rabbitmq:具有消息消费确认机制,如果发布一条消息,还没有消费者消费该队列,那... 查看详情

redis实现消息队列

...息ID和记录的已处理过的消息ID,来判断当前收到的消息有没有经过处理。如果已经处理过,那么,消费者程序就不再进行处理了。为了留存消息,List类型提供了BRPOPLPUSH命令,这个命令的作用是让消费者程序从一个List中读取消... 查看详情

redis消息队列(代码片段)

  消息队列主要分为两种,分别是生产者消费者模式和发布者订阅者模式,这两种模式Redis都支持。生产消费者模式  在生产消费者(Producer/Consumer)模式下,上层应用接收到的外部请求后开始处理其当前步骤的操作,... 查看详情

redis基于(listpubsubstream消费者组)实现消息队列,基于stream结构实现异步秒杀下单(代码片段)

(目录)上一篇博文部分:Redis消息队列1、Redis消息队列-认识消息队列什么是消息队列?最简单的消息队列模型包括3个角色:使用队列的好处在于解耦:这种场景在秒杀中就变成了:这里我们可以使用一些现成的mq,比如kafka,rabbi... 查看详情

接收订单用redis做缓存好还是rabbit做消息队列好

具体对比可靠消费Redis:没有相应的机制保证消息的消费,当消费者消费失败的时候,消息体丢失,需要手动处理RabbitMQ:具有消息消费确认,即使消费者消费失败,也会自动使消息体返回原队列,同时可全程持久化,保证消息... 查看详情

消息队列消息队列选型问题

上一篇我们探讨了为什么使用消息队列,以及消息队列的缺点。今天我们来探讨一下我们到底该使用哪一种消息队列。没有最好的技术只有最合适的技术,不要为了追求最好的性能而忽略了可用性,时刻记住“过早优化是原... 查看详情

mq和redis对比

参考技术Aredis:没有相应的机制保证消息的可靠消费,如果发布者发布一条消息,而没有对应的订阅者的话,这条消息将丢失,不会存在内存中;rabbitmq:具有消息消费确认机制,如果发布一条消息,还没有消费者消费该队列,... 查看详情

redis与rabbitmq做消息队列的区别

参考技术A消息队列(MessageQueue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到MQ中而不用管谁来取,消息使用者只管从MQ中取消息而不管是谁发布的。这... 查看详情

场景应用:redis如何做消息队列?(代码片段)

文章目录Redis做消息队列消息保序处理重复的消息证消息可靠性总结Redis做消息队列因为Redis中的list是双向链表也可以当队列的特性消息队列在存取消息时,必须要满足三个需求,分别是消息保序、处理重复的消息和保证... 查看详情

redis属于redis的“消息队列”:redisstream(浅析)(代码片段)

文章目录关于redisstreamredisstream使用示例添加消息使用示例时间复杂度读取消息XREAD时间复杂度XRANGE删除消息XDELXTRIM关于redisstream这以前只知道redis有类似于消息队列的发布/订阅,还真不知道它居然悄咪咪的有“消息队列”呀... 查看详情

redis属于redis的“消息队列”:redisstream(浅析)(代码片段)

文章目录关于redisstreamredisstream使用示例添加消息使用示例时间复杂度读取消息XREAD时间复杂度XRANGE删除消息XDELXTRIM关于redisstream这以前只知道redis有类似于消息队列的发布/订阅,还真不知道它居然悄咪咪的有“消息队列”呀... 查看详情

redis怎么做消息队列

参考技术Aredis只适合做缓存,做消息队列你可以看看activeMQ 查看详情

常用的消息队列

参考技术A一、redis消息队列和kafka消息队列的比较  1、Redis作为消息队列  Redis的pub-sub模式非常像西式快餐一样,快产快消,全都是因为Redis是使用内存来做存取,所有你生产的消息立马会被消费者一次性全部处理掉,并且没有留... 查看详情

一个redis消息队列实现

消息队列某次在某乎上看到有人提到消息队列的问题,然后有人在回答里提到了Redis,接着便有人在评论里指出:Redis是缓存,不是消息队列。但不幸的是,Redis的确提供一个简易的消息队列机制,可以用于... 查看详情

一个redis消息队列实现

消息队列某次在某乎上看到有人提到消息队列的问题,然后有人在回答里提到了Redis,接着便有人在评论里指出:Redis是缓存,不是消息队列。但不幸的是,Redis的确提供一个简易的消息队列机制,可以用于... 查看详情

消息队列为啥用redis实现

...信息(请求内容,请求方标识等等).参考技术A因为redis实现消息队列很简单!$this->redis->rPush($key, $val); // 右边入$this->redis->lPop($key);        // 左边出本回答被提问者采纳 查看详情

基于stream的redis消息队列(代码片段)

目录一、消息队列二、基于List结构模拟消息队列基于List的消息队列的优点:基于List的消息队列的缺点:三、基于PubSub的消息队列基于PubSub的消息队列的优点:基于PubSub的消息队列的缺点:四、基于Stream的消息队... 查看详情