如何在任何 MQ 平台中实现这个单一并发分布式队列?

     2023-03-10     86

关键词:

【中文标题】如何在任何 MQ 平台中实现这个单一并发分布式队列?【英文标题】:How can I implement this single concurrency distributed queue in any MQ platform? 【发布时间】:2017-06-18 04:19:02 【问题描述】:

我目前正在努力寻找实现特定类型队列的解决方案,这需要以下特征:

    所有队列都必须遵守添加作业的顺序。 整个队列的并发数为 1,这意味着每个 队列 一次只能执行一个作业,而不是工作线程。 这样的队列将超过几千个。 它需要分布式并且能够扩展(例如,如果我添加了一个工人)

基本上它是一个单进程 FIFO 队列,这正是我在试用 ActiveMQ 或 RabbitMQ 等不同的消息队列软件时想要的,但是一旦我将它扩展到 2 个工作线程,它就不起作用,因为在这种情况下我希望它能够扩展并保持与单个进程队列完全相同的功能。下面我附上它应该如何在具有多个工作人员的分布式环境中工作的描述。

拓扑结构示例:(请注意,QueueWorkers 之间是多对多关系)

如何运行的示例:

+------+-----------------+-----------------+-----------------+
| Step | Worker 1        | Worker 2        | Worker 3        |
+------+-----------------+-----------------+-----------------+
| 1    | Fetch Q/1/Job/1 | Fetch Q/2/Job/1 | Waiting         |
+------+-----------------+-----------------+-----------------+
| 2    | Running         | Running         | Waiting         |
+------+-----------------+-----------------+-----------------+
| 3    | Running         | Done Q/2/Job/1  | Fetch Q/2/Job/2 |
+------+-----------------+-----------------+-----------------+
| 4    | Done Q/1/Job/1  | Fetch Q/1/Job/2 | Running         |
+------+-----------------+-----------------+-----------------+
| 5    | Waiting         | Running         | Running         |
+------+-----------------+-----------------+-----------------+

这可能不是最好的表示,但它表明,即使在 Queue 1Queue 2 中,也有更多的工作,但 Worker 3 确实如此直到前一个作业完成后才开始获取下一个作业。

这是我努力寻找好的解决方案。

我已经尝试了很多其他解决方案,例如rabbitMQ、activeMQ、apollo...这些允许我创建数千个队列,但在我尝试的时候,所有这些都将使用 worker 3 来运行队列中的下一个作业。并发是每个工人

是否有任何解决方案可以在任何 MQ 平台上实现这一点,例如 ActiveMQ、RabbitMQ、ZeroMQ 等?

谢谢你:)

【问题讨论】:

虽然这个话题确实很有趣。在 Stack Overflow 上请求场外资源是题外话。 异地资源是什么意思?如果需要,我会改写这个问题。我想问的更像是一种实现方式,而不是严格寻找程序/解决方案。 【参考方案1】:

您可以使用 Redis 列表和一个额外的“调度”队列来实现这一点,所有工作人员 BRPOP 都在为他们的工作启用该队列。调度队列中的每个作业都使用原始队列 ID 进行标记,当工作人员完成作业时,它会进入这个原始队列并在调度队列上执行RPOPLPUSH 以使下一个作业可用于任何其他工作人员。因此,调度队列最多有 num_queues 个元素。

您必须处理的一件事是当源队列为空时调度队列的初始填充。这可能只是发布者针对最初设置的每个队列的“空”标志进行的检查,并且当原始队列中没有任何东西要分派时也由工作人员设置。如果设置了这个标志,发布者可以直接LPUSH第一个作业进入调度队列。

【讨论】:

如何在 QML 中实现对象之间的单一连接?

】如何在QML中实现对象之间的单一连接?【英文标题】:HowtoimplementsingleconnectionbetweenobjectsinQML?【发布时间】:2018-12-0615:07:21【问题描述】:在我的Qt应用程序中,我有很多窗口,有时它们需要一个“返回”按钮。此按钮位于Appli... 查看详情

尝试在c中实现并发TCP服务器和客户端

...实现了TCp并发服务器和客户端。但我没有办法检查是否有任何其他标准的实施方式这个。我已经查看了标准编码的东西,但没有发现任何有用的东西。有人可以分享一些好的链接或代码,这样我就可以有一个标准的想法实现 查看详情

如何在单一方法Springboot JPA中实现多个更新的事务

】如何在单一方法SpringbootJPA中实现多个更新的事务【英文标题】:HowtoimplementTransactionalformultipleupdatesinsinglemethodSpringbootJPA【发布时间】:2021-11-2120:47:33【问题描述】:我有一个使用SPringbootJPA对DB进行多次更新、删除和保存方法... 查看详情

如何在mq中实现支持任意延迟的消息?

什么是定时消息和延迟消息?定时消息:Producer将消息发送到MQ服务端,但并不期望这条消息立马投递,而是推迟到在当前时间点之后的某一个时间投递到Consumer进行消费,该消息即定时消息。延迟消息:Producer将消息发送到MQ服... 查看详情

如何在 Python 中实现优先级队列?

】如何在Python中实现优先级队列?【英文标题】:HowtoimplementPriorityQueuesinPython?【发布时间】:2012-04-1516:31:51【问题描述】:很抱歉提出这么愚蠢的问题,但Python文档令人困惑......链接1:队列实现http://docs.python.org/library/queue.html... 查看详情

在 WebSphere MQ 中配置死信队列

...:我在我的应用程序中使用IBMWebsphere和MQ,现在我想知道如何配置(或)让Websphere了解在MQ中创建了一个死信队列,比如DEAD.QUEUE,它应该在处理来自请求/响应队列的消息时出现任何故障的情况。我正在尝试这样做,因为队列中 查看详情

是否可以在一个堆栈中实现多个队列?

...时间】:2021-12-2114:05:18【问题描述】:谁能给我解释一下如何在一个堆栈中实现多个队列//implementstacksusingplainarrayswithpushandpopfunctionsvarStack1=[];varStack2=[];//implementen 查看详情

[java]分布式消息队列(mq)

...靠性差技术选型性能、优缺点、业务场景集群架构模式,分布式、可扩展、高可用、可维护性综合成本,集群规模,人员 查看详情

如何在 JS/TS 中实现伪阻塞异步队列?

】如何在JS/TS中实现伪阻塞异步队列?【英文标题】:HowtoimplementapseudoblockingasyncqueueinJS/TS?【发布时间】:2018-04-1919:48:11【问题描述】:所以这里有一个矛盾:我想在javascript/typescript中创建一个异步阻塞队列(如果你可以在没有ty... 查看详情

如何在springcloud分布式系统中实现分布式锁?

一、简介一般来说,对数据进行加锁时,程序先通过acquire获取锁来对数据进行排他访问,然后对数据进行一些列的操作,最后需要释放锁。Redis本身用watch命令进行了加锁,这个锁是乐观锁。使用watch命令对于频繁访问的键会引... 查看详情

如何在 Java EE 环境中实现请求限制?

】如何在JavaEE环境中实现请求限制?【英文标题】:HowdoIimplementRequestthrottlinginaJavaEEenvironment?【发布时间】:2012-04-2301:36:11【问题描述】:我们的一个应用程序通过消息队列接受请求,并为每个请求拨打电话。拨打电话时涉及2个... 查看详情

如何在spring boot中实现jms队列

】如何在springboot中实现jms队列【英文标题】:Howtoimplementjmsqueueinspringboot【发布时间】:2017-04-1708:58:00【问题描述】:我已经使用初始化程序创建了一个SpringBoot项目,我正在尝试创建我的第一条消息,但我不知道从哪里开始。我... 查看详情

如何在使用 TPT 层次结构时首先在 EF 代码中实现并发

】如何在使用TPT层次结构时首先在EF代码中实现并发【英文标题】:HowtoimplementConcurrencyinEFcodefirstwhileusingaTPThierarchy【发布时间】:2011-04-0809:56:55【问题描述】:大家好!我正在尝试首先使用实体​​框架代码实现乐观并发检查(... 查看详情

如何在python中实现概率分布的合并?

】如何在python中实现概率分布的合并?【英文标题】:HowtoimplementConflationforprobabilitydistributioninpython?【发布时间】:2021-01-2810:26:09【问题描述】:我在网上寻找将几个连续概率分布组合成一个连续概率分布的方法。这种方法称为C... 查看详情

如何在 SQL 中实现聚合? (这与 GroupBy 无关)

】如何在SQL中实现聚合?(这与GroupBy无关)【英文标题】:HowtoimplementanAggregationinSQL?(ThisisnotaboutGroupBy)【发布时间】:2018-06-2903:51:22【问题描述】:在大学项目的范围内,我应该实现我的数据库的聚合。我得到了一个类似于这个... 查看详情

如何在 Scipy 中实现日志均匀分布?

】如何在Scipy中实现日志均匀分布?【英文标题】:HowtoimplementaloguniformdistributioninScipy?【发布时间】:2018-09-0708:27:00【问题描述】:我不明白如何在Scipy中实现对数均匀概率分布。根据thispost的cmets,可以通过只定义_pdf来实现。另... 查看详情

mq(队列消息的入门)(代码片段)

...递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成,通过提供消息传递和消息排队模型,它可以在分布式环境下拓展进程间的通信,对于消息中间件,常见的角色大致也就有Producer(生产者).Consumer(消费者) MQ&n... 查看详情

mq在高并发环境下,如果队列满了,如何防止消息丢失?

...下会产生消息丢失的现象?消息队列满了的情况下。3、如何解决消息丢失的问题?(1)生产者可以采用重试机制。因为消费者会不停的消费消息,可以重试将消息放入队列。如果还是不行,可以将消息记录到数据库,后期做补... 查看详情