每个用户的 GCP PubSub(或 GCP 任务)同步处理

     2023-02-16     281

关键词:

【中文标题】每个用户的 GCP PubSub(或 GCP 任务)同步处理【英文标题】:GCP PubSub (or GCP Tasks) Synchronous Processing per User 【发布时间】:2021-03-04 19:26:29 【问题描述】:

我有一个需要处理一组事件的用例。我需要他们对每个用户进行整体并行处理但串行处理。这可以在 PubSub 中完成吗(可能是 GCP 任务?)?

例如:

6 个事件同时进入(User_A_Event_1、User_A_Event_2、User_B_Event_1、User_B_Event_2、User_C_Event_1、User_D_Event_1)。

我想按 UserID 对它们进行分组,并行处理每个用户,然后依次处理每个事件(在成功完成前一个事件之前,不会开始后续事件处理)。比如:

用户 A 串行处理:处理 User_A_Event_1 --> 处理 User_A_Event_2 用户 B 串行处理:处理 User_B_Event_1 --> 处理 User_B_Event_2 用户 C 串行处理:处理 User_C_Event_1 用户 D 串行处理:处理 User_D_Event_1

如果重要的话,我不知道哪些用户会在什么时间举办活动。我们可能几个月都看不到用户的任何事件,然后开始收到大量事件。

我正在尝试找出一种在 GCP PubSub 中实现此目的的方法,但我也对其他解决方案持开放态度。我的偏好是通过推而不是拉来执行此操作,因为我可以在队列中没有任何内容的情况下进行很长时间。

感谢您的帮助。

克雷格

【问题讨论】:

3 个问题: 1. 你怎么知道一个事件在另一个事件之前?你有时间戳吗?增量ID?还要别的吗? 2、你提前知道用户数吗?如果是这样,提供新用户的流程是什么? 3. 您需要实时还是可以按批次(例如每小时)处理事件? 我确实有一个时间戳,但我实际上对 FIFO 没问题。当事件发生时,我也会知道用户 ID。我不知道用户什么时候会有事件,但我系统中的所有用户最终可能每天都会有事件。不幸的是,我确实需要它,批量处理无法满足我的需求。 好吧,事实上,如果你有 2 个事件发生的时间非常接近,你想要一个像“锁”这样的东西来不并行处理它们。您知道同一用户的 2 个事件之间可以有的高频率(或最低间隔)吗?你知道一个事件的最长处理时间吗? 【参考方案1】:

Cloud Pub/Sub's ordered delivery 可以在这里提供帮助。您将使用用户作为订购键。这意味着 Cloud Pub/Sub 将按照服务从您的发布者那里收到消息的顺序将消息传递给您的订阅者。在您事先不知道用户集以及特定用户的事件可能很少或突发的情况下,有序交付将具有您想要的属性。

在订阅方面,所做出的保证取决于订阅者的类型。对于客户端库(使用流式拉取),您提供的回调将针对具有相同键的消息一次执行完成。对于使用拉取的订阅者,每个拉取请求将按照接收顺序包含密钥的消息,并且密钥的消息一次仅在一个拉取响应中未完成。对于推送订阅者,订购密钥的每条消息都将单独发送到您的端点,并且在确认同一密钥的上一条消息之前不会发送下一条消息。

请注意,Cloud Pub/Sub 的有序交付仍然具有至少一次交付语义,这意味着可以重新交付已确认的消息,这也会导致针对同一密钥重新交付后续消息。

有关详细信息,请参阅Medium post about ordering。

【讨论】:

GCP Pubsub 未传递消息的数量不会改变

】GCPPubsub未传递消息的数量不会改变【英文标题】:GCPPubsubnumofundeliveredmessageswontchange【发布时间】:2021-08-2423:59:27【问题描述】:我正在使用pubsub来触发我定义为最多具有10个实例的云功能。当大约300条或更多消息到达主题并开... 查看详情

如何在 apache camel 中执行 gcp pubsub 消息的并行处理

】如何在apachecamel中执行gcppubsub消息的并行处理【英文标题】:howtoperformparallelprocessingofgcppubsubmessagesinapachecamel【发布时间】:2020-08-1317:15:03【问题描述】:我在下面有这段代码,它从pubsub源主题获取消息->根据模板对其进行转... 查看详情

主题的 GCP PubSub 权限

】主题的GCPPubSub权限【英文标题】:GCPPubSubPermissionsforatopic【发布时间】:2019-06-2009:40:03【问题描述】:我正在尝试创建一个仅对特定pubsub主题具有权限的服务帐户。我创建一个主题:gcloudpubsubtopicscreatemytopic然后创建一个服务帐... 查看详情

如何将 GCP Pubsub 订阅的消息转发到另一个主题?

】如何将GCPPubsub订阅的消息转发到另一个主题?【英文标题】:HowcanIforwardaGCPPubsubsubscription\'smessagestoanothertopic?【发布时间】:2017-04-0111:35:34【问题描述】:我希望能够将GoogleCloudPlatform(GCP)Pubsub订阅的消息转发到另一个GCP主题,... 查看详情

如何使用 gsutil 向 GCP 存储桶添加 pubsub 通知

】如何使用gsutil向GCP存储桶添加pubsub通知【英文标题】:HowtoaddpubsubnotificationtoGCPstoragebucketusinggsutil【发布时间】:2021-07-2510:21:52【问题描述】:我想为GCp存储桶添加pubsub通知。因此,每当文件放入GCP存储桶时,就会发送pubsub通... 查看详情

从现有的 GCP pubsub 订阅中消费

】从现有的GCPpubsub订阅中消费【英文标题】:ConsumefromExistingGCPpubsubSubscription【发布时间】:2020-03-0412:44:43【问题描述】:我正在尝试使用此存储库中托管的示例代码。https://github.com/spring-cloud/spring-cloud-gcp/tree/master/spring-cloud-gcp-sa... 查看详情

GCP PubSub 主题推送问题

】GCPPubSub主题推送问题【英文标题】:GCPPubSubtopicpushissue【发布时间】:2019-05-1007:52:38【问题描述】:我在gke(gcp)上部署了一个nodejsdocker映像应用程序。这个应用程序只是一个等待通知的消息订阅者。我创建了一个主题(pubsub)... 查看详情

GCP 云功能未正确接收/确认 PubSub 消息

】GCP云功能未正确接收/确认PubSub消息【英文标题】:GCPCloudFunctionnotcorrectlypickingup/acknowledgingPubSubmessages【发布时间】:2020-12-0621:25:52【问题描述】:我在GoogleCloudPlatform中设置了一些数据处理工作流程。这些位置处理物理地址并... 查看详情

GCP - 如何添加关于发送到 pubsub 死信队列的消息数量的警报?

】GCP-如何添加关于发送到pubsub死信队列的消息数量的警报?【英文标题】:GCP-howtoaddalertonnumberofmessagessenttoapubsubdeadletterqueue?【发布时间】:2020-10-2204:03:10【问题描述】:我的应用程序处理来自pubsub主题的消息,如果失败,消息... 查看详情

从存储中读取 JSON 数组并发送到 GCP PubSub

...517:21:56【问题描述】:我在GoogleCloudStorage中有多个文件,每个文件都包含如下JSON数组-"Data":["Country":"IN","Order":"1033616591","Method":"LCDzoneEsamed 查看详情

Golang 的稳定 GCP PubSub API

】Golang的稳定GCPPubSubAPI【英文标题】:StableGCPPubSubAPIforGolang【发布时间】:2016-12-0711:17:55【问题描述】:我们有一个基础架构,我们将GCPPubsub用作全局pubsub,将redis用作本地pubsub。因此我们应该创建一个组件来订阅GCPpubsub并将传... 查看详情

如何从 GCP 函数 GUI 调用 PubSub 函数

】如何从GCP函数GUI调用PubSub函数【英文标题】:HowinvokeaPubSubfunctionfromtheGCPFunctionsGUI【发布时间】:2020-07-1005:08:23【问题描述】:我部署了以下功能:interfaceMessageDatareviewId:string;exportconstapplyPreAssessRules=functions.pubsub.topic("applyPreAsses 查看详情

GCP - 验证 PubSub 推送的云功能 https 端点的所有权

】GCP-验证PubSub推送的云功能https端点的所有权【英文标题】:GCP-VerifyownershipofacloudfunctionhttpsendpointforaPubSubpush【发布时间】:2017-12-0818:41:03【问题描述】:很确定没有办法做到这一点,但如果有其他人有任何想法,那就太好了。... 查看详情

GCP PubSub Spring Boot 重复提取消息

】GCPPubSubSpringBoot重复提取消息【英文标题】:GCPPubSubSpringBootrepeatextractmessage【发布时间】:2019-10-2508:47:35【问题描述】:我需要有关gcppub/sus问题的帮助。我有一个进程向pubsub发送100条带有过滤器的消息,另一个应用程序(在spri... 查看详情

使用 GCP pubsub 的 Spring Cloud Stream 消费者的并发设置

】使用GCPpubsub的SpringCloudStream消费者的并发设置【英文标题】:ConcurrencysettingsforSpringCloudStreamconsumerwithGCPpubsub【发布时间】:2020-06-0123:44:05【问题描述】:我的应用程序正在使用绑定到GCPpubsub的SpringCloudStream接收消息。我正在试... 查看详情

Gcp PubSub 未将消息推送到 REST 端点

】GcpPubSub未将消息推送到REST端点【英文标题】:GcpPubSubnotpushingmessagetoRESTEndpoint【发布时间】:2017-07-1717:38:32【问题描述】:我遇到了一个问题,GcpPubSub没有将消息推送到我注册的端点。让我详细叙述一下我执行的步骤。注意,... 查看详情

在 GCP 上使用 Pubsub 时如何解决身份验证范围不足的问题

】在GCP上使用Pubsub时如何解决身份验证范围不足的问题【英文标题】:HowtosolveinsufficientauthenticationscopeswhenusePubsubonGCP【发布时间】:2020-07-0213:45:08【问题描述】:我正在尝试构建2个微服务(在JavaSpringBoot中)以使用GCPPub/Sub相互... 查看详情

GCP-PUBSUB:-sun.security.provider.certpath.SunCertPathBuilderException:无法找到请求目标的有效证书路径

】GCP-PUBSUB:-sun.security.provider.certpath.SunCertPathBuilderException:无法找到请求目标的有效证书路径【英文标题】:GCP-PUBSUB:-sun.security.provider.certpath.SunCertPathBuilderException:unabletofindvalidcertificationpathtorequestedtarget【发布时 查看详情