redispub/sub发布订阅模式的深度解析与实现消息队列(代码片段)

刘Java 刘Java     2023-01-12     305

关键词:

详细介绍了Redis 的Pub/Sub的相关命令和优缺点,以及如何实现简单的消息队列。

1 Pub/Sub的概述

我们可以利用Redis的List数据结构实现一

个简单的消息队列,通过lpush命令写入消息,通过rpop 命令拉取消息,也可以使用BRPOP实现阻塞式的拉取消息。

上面的消息队列有一个缺点,那就是不支持消息多播机制,消息多播机制就是生产者生产的一个消息可以被多个消费者消费到,这个功能在分布式系统中非常重要。

Redis单独使用Pub/Sub模块来支持消息多播,即发布/订阅模式(publish/subscribe),它是一种消息通信模式:发布者(pub)发送消息,订阅者(sub)接收消息。

发布者会将的消息发布到一个chanel(通道)中而不是发送给指定的订阅者,发布者也不知道可能有哪些订阅者。

订阅者可以订阅一个或多个channel,只接收来自订阅的channel的消息,并且不知道有哪些(如果有)发布者,这种模式实现了消息发布者和订阅者的解耦。

Pub/Sub 与键空间无关,消息不会被持久化,与数据库也无关,在db10上发布,将可以被 db1 上的订阅者听到。如果我们需要某种范围的范围,那么只能在设置的channel名字上做区分。

2 订阅

客户端使用SUBSCRIBE channel [channel ...]命令订阅通道,可以多次执行该命令,也可以一次订阅多个通道,多个客户端可以订阅相同的通道。

该命令返回一个数组,包括三部分,依次是:命令名称(字符串“subscribe”),订阅的通道名称,目前总共订阅的通道数(包含glob通道)。这三个部分对每一个订阅的通道是连续的。

客户端执行订阅以后,除了可以继续订阅(SUBSCRIBE或者PSUBSCRIBE),取消订阅(UNSUBSCRIBE或者PUNSUBSCRIBE), PING命令和结束连接(QUIT)外, 不能执行其他操作,客户端将阻塞直到订阅通道上发布消息的到来。

如下,表示客户端一次性订阅四个通道:aaa、bba、ccc、ddd:

127.0.0.1:6379> SUBSCRIBE aaa bba ccc ddd
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "aaa"
3) (integer) 1
1) "subscribe"
2) "bba"
3) (integer) 2
1) "subscribe"
2) "ccc"
3) (integer) 3
1) "subscribe"
2) "ddd"
3) (integer) 4

请注意,如果使用redis-cli 一旦进入订阅模式就不会接受任何命令,只能使用 Ctrl-C 退出该模式。

3 取消订阅

客户端使用UNSUBSCRIBE [channel [channel ...]]命令取消订阅指定的通道,可以指定一个或者多个取消的订阅通道名称,也可以不带任何参数,此时将取消所有的订阅的通道(不包括glob通道)。

该命令返回一个数组,包括三部分,依次是:命令名称(字符串“unsubscribe”),订阅的通道名称,目前总共订阅的通道数(包含glob通道)。这三个部分对每一个取消订阅的通道是连续的。当最后一个参数为零时,我们不再订阅任何频道,客户端可以发出任何类型的 Redis 命令,因为我们处于 Pub/Sub 状态之外。

如下,表示客户端退出ccccc通道的订阅:

127.0.0.1:6379> UNSUBSCRIBE ccccc
1) "unsubscribe"
2) "cccc"
3) (integer) 0

4 模式匹配

Redis Pub/Sub 实现支持模式匹配。客户端可以订阅 glob 通道,这样就能接收发送到通道名称与给定模式匹配的通道的所有消息。

客户端使用PSUBSCRIBE pattern [pattern ...] 订阅一个或多个glob 通道。

该命令返回一个数组,包括三部分,依次是:命令名称(字符串“psubscribe”),订阅的glob通道名称,目前总共订阅的通道数(包含非glob通道)。这三个部分对每一个订阅的通道是连续的。

例如,订阅a*和*c模式:

127.0.0.1:6379> PSUBSCRIBE a* *c
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "a*"
3) (integer) 1
1) "psubscribe"
2) "*c"
3) (integer) 2

客户端使用 PUNSUBSCRIBE [pattern [pattern ...]]退订一个或多个glob 通道。也可以不带任何参数,此时将取消所有的订阅的通道(不包括非glob通道)。

该命令返回一个数组,包括三部分,依次是:命令名称(字符串“punsubscribe”),取消订阅的glob通道名称,目前总共订阅的通道数(包含非glob通道)。这三个部分对每一个取消订阅的通道是连续的。当最后一个参数为零时,我们不再订阅任何频道,客户端可以发出任何类型的 Redis 命令,因为我们处于 Pub/Sub 状态之外。

如下,取消对a*的glob通道的订阅:

127.0.0.1:6379> PUNSUBSCRIBE a*
1) "punsubscribe"
2) "a*"
3) (integer) 0

subscribe, unsubscribe, psubscribe 和punsubscribe命令的最后都返回当前客户端订阅的glob通道和通道的总数,如果为0,则客户端自动退出Pub/Sub模式。

5 发布

PUBLISH channel message命令在指定的通道上发布消息。只能在一个通道上发布消息,不能在多个通道上同时发布消息。

将返回通知的接收者数量。这里的接收者数目大于等于订阅该通道的客户端数目,因为一个客户端的glob通道和非glob通道同时匹配发布通道的话,则视为两个接收者。换句话说,如果客户端订阅了多个与已发布消息匹配的模式,或者订阅了与该消息匹配的模式和通道,则该客户端可能会多次收到同一条消息。

在接收端,收到的响应包括三部分,依次是:“message”字符串,匹配的通道名称,发布的消息内容。如果是因为glob模式匹配而接收,那么返回四部分:“pmessage”字符串,匹配的glob通道名称,发送的原始通道名称,发布的消息内容。

如果某个客户端的订阅a*和*c两个模式通道:

127.0.0.1:6379> PSUBSCRIBE a* *c
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "a*"
3) (integer) 1
1) "psubscribe"
2) "*c"
3) (integer) 2

如果发送消息的通道为ac,那么将会返回2:

127.0.0.1:6379> PUBLISH ac xxxxx
(integer) 2

在客户端,将收到两次消息:

1) "pmessage"
2) "a*"
3) "ac"
4) "xxxxx"
1) "pmessage"
2) "*c"
3) "ac"
4) "xxxxx"

6 Pub/Sub原理

每个Redis服务器进程维持着一个标识服务器状态的redis.h/redisServer结构,其中就保存着有订阅的频道 以及 订阅模式 的信息:

struct redisServer 
    // ...
    dict *pubsub_channels;  // 订阅频道
    list *pubsub_patterns;  // 订阅模式
    // ...
;

6.1 pubsub_channels

pubsub_channels是一个dict字典结构,key(数组元素)为channel,value就是某个client。当客户端订阅某一个频道之后,Redis 就会往 pubsub_channels 这个字典中新添加一条channel和client数据,不同的client可以订阅相同的channel,client以链表的方式串联起来,这样就能保存多个client对同一个channel的关系,非常的巧妙。

了解了这个结构,SUBSCRIBE 、PUBLISH 、UNSUBSCRIBE命令的实现也变得十分简单了。

SUBSCRIBE就是将channel和client加入到dict中,如果此前没有该channel,那就新增一个channel元素,然后在再增一个client链表节点,如果此前存在,则直接在链表末尾添加一个client节点。

PUBLISH只需要通过上述字典定位到具体的channel,就能找到所有订阅该channel的客户端,再把消息发送给它们就好了。

UNSUBSCRIBE也很简单,将对应channel下面的链表中的client删除即可。

6.2 pubsub_patterns

pubsub_patterns用于存储所有的glob channel,它是一个list结构,节点类型为redis.h/pubsubPattern

typedefstruct pubsubPattern 
    redisClient *client;  // 订阅模式的客户端
    robj *pattern;        // 订阅的模式
 pubsubPattern;

当使用PSUBSCRIBE命令订阅一个模式时,程序就创建一个pubsubPattern添加到 pubsub_patterns 链表中。如果另一个客户端也订阅一个模式,则向链表的后面新增一个pubsubPattern节点即可。

因此,实际上PUBLISH除了会在pubsub_channels中定位具体的channel之外,还会将指定的channel与pubsub_patterns 中的模式进行对比,如果 指定的channel 和某个模式匹配的话,那么也将 message 发送到订阅那个模式的全部客户端。

PUNSUBSCRIBE的实现也很简单,就是删除pubsub_patterns中,client和pattern信息对比一致的节点。

7 Pub/Sub缺点

发布的消息在Redis系统中不能持久化,因此,必须先执行订阅,再等待消息发布。如果先发布了消息,那么该消息由于没有订阅者,消息将被直接丢弃。

消息只管发送,不管接收,也没有ACK机制,无法保证消息的消费成功。如果某个消费者中途加入进来,或者挂掉重启,那么这之前丢失的消息也不能再次消费。

以上的缺点导致Redis的Pub/Sub模式就像个小玩具,在生产环境中几乎无用武之地,非常的尴尬!为此,Redis5.0版本新增了Stream数据结构,不但支持多播,还支持数据持久化,相比Pub/Sub更加的强大!

相关文章:

  1. https://redis.io

如有需要交流,或者文章有误,请直接留言。另外希望点赞、收藏、关注,我将不间断更新各种Java学习博客!

Redis Pub/Sub 聊天室

】RedisPub/Sub聊天室【英文标题】:RedisPub/SubChatRooms【发布时间】:2016-01-1808:19:26【问题描述】:我对RedisPub/sub很陌生,所以请多多包涵。我正在尝试创建一个IRC,用户可以在其中创建自己的聊天室,有点像Gitter。以下是我到目前... 查看详情

redis pub/sub 订阅返回连接错误

】redispub/sub订阅返回连接错误【英文标题】:redispub/subsubsribereturningconnectionerror【发布时间】:2015-09-0323:22:44【问题描述】:我在Laravel5.1上并遵循此处的指南:http://laravel.com/docs/5.1/redis#pubsub我创建了一个简单的socket.io服务器,... 查看详情

redisrepository封装—redis发布订阅以及stackexchange.redis中的使用

...注明本Redis系列分享地址。http://www.cnblogs.com/tdws/tag/NoSql/RedisPub/Sub模式基本介绍Redis发布订阅—Pub/Sub模式或者说是观察者模式。我想大家即使没有使用过,也已经耳熟能详了。先简单举例说明下应用场景,在场景中我们可以分... 查看详情

Redis命令获取发布/订阅的所有可用频道?

...3104:47:47【问题描述】:我搜索redis命令列表。我找不到在redispub/sub中获取所有可用频道的命令。在meteorserver中,等价的命令是LISTCHANNELS,它列出了所有已知的频道、每个频道上存储的消息数量以及当前订阅者的数 查看详情

redisstream流的深度解析与实现高级消息队列一万字(代码片段)

...息多播功能,没有ACK机制,无法重复消费等等。RedisPub/Sub消息无法持久化,只管发送,如果出现网络断开、Redis宕机等&# 查看详情

redisstream流的深度解析与实现高级消息队列一万字(代码片段)

...息多播功能,没有ACK机制,无法重复消费等等。RedisPub/Sub消息无法持久化,只管发送,如果出现网络断开、Redis宕机等&# 查看详情

如何为即时消息系统设计 redis pub/sub?

】如何为即时消息系统设计redispub/sub?【英文标题】:Howtodesignredispub/subforaninstantmessagingsystem?【发布时间】:2012-04-2521:37:42【问题描述】:我是redispub/sub的新手。我在系统中有一个聊天工具,就像IM。所以我想使用redispub/sub。正... 查看详情

后端扩展时如何有效地发布给graphql订阅者

...。由于我没有在后端保留任何状态,因此我尝试通过实现RedisPUB/SUB来解决问题。微服 查看详情

Node.js、Socket.io、Redis pub/sub 大容量、低延迟困难

】Node.js、Socket.io、Redispub/sub大容量、低延迟困难【英文标题】:Node.js,Socket.io,Redispub/subhighvolume,lowlatencydifficulties【发布时间】:2012-05-2010:22:00【问题描述】:当结合socket.io/node.js和redispub/sub以尝试创建一个由可以处理多个传输的... 查看详情

用于 node.js 中聊天服务器的 Redis pub/sub

】用于node.js中聊天服务器的Redispub/sub【英文标题】:Redispub/subforchatserverinnode.js【发布时间】:2011-11-1919:02:29【问题描述】:我正在尝试使用RedisCookbook示例:varhttp=require(\'http\'),io=require(\'socket.io\')fs=require(\'fs\'),redis=require(\'redis\'... 查看详情

kafka深度解析

...度解析背景介绍Kafka简介  Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下:以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能 查看详情

RoR + Node.js Redis sub/pub 生产环境

...2015-10-2906:15:10【问题描述】:我在生产模式下使用RoR中的Redispub/sub时遇到了一些问题。我有3个实例:RoR服务器、节点服务器和Rake任务以及处于某种状态的模型(模型状态1)RoR服务器更新id=1的模型并将事件“一”发布到Redis。(... 查看详情

fabric共识模式深度解析

...abric的共识算法的观点,普遍都认为是kafka,而kafka是主从模式,所以不能防止作恶。针对这个观点,详细解析fabric的共识模式。fabric共识模式采用的Endorse+Kafka+Commit的模式,这里我们简称EKC共识。此共识包含以下几个步骤: 查看详情

redis pub/sub 与 node.js 中的 socket.io

】redispub/sub与node.js中的socket.io【英文标题】:redispub/subwithsocket.ioinnode.js【发布时间】:2012-08-1221:02:30【问题描述】:我创建了简单的聊天应用程序。因此我使用了node.js我在网上看过很多简单的例子,都说代码运行良好。但是当... 查看详情

在 redis 订阅事件发生后,如何加入套接字 io 房间?

...【发布时间】:2021-12-0922:29:06【问题描述】:我正在使用redispub/sub,我想在redissub事件之后加入一个房间:constexpress=require(\'express\');constapp=express();co 查看详情

vue的数据双向绑定是怎么实现的

vue的数据双向绑定是通过数据劫持和发布-订阅者功能来实现的。实现步骤:1.实现一个监听者Oberver来劫持并监听所有的属性,一旦有属性发生变化就通知订阅者。2.实现一个订阅者watcher来接受属性变化的通知并执行相应的方法... 查看详情

观察者模式解析(代码片段)

...者模式(ObserverPattern),是软件设计模式中的一种,又称发布-订阅模式,属于发布-订阅架构的一种应用。  观察者模式定义了对象之间一种一对多的依赖关系。当一个对象的状态发生改变时,所有依赖于它的对象都会得到通... 查看详情

模板方法模式深度解析

...改变一个算法的结构即可重定义算法的某些特定步骤。2.模式中的角色  2.1抽象类(AbstractClass):实现了模板方法,定义了算法的骨架。  2.2具体类(ConcreteClass):实现抽象类中的抽象方法,已完成完整的算法。3.模式解读... 查看详情