如何使用 GCP 在 pubsub 模型中一次向所有订阅者发送消息

     2023-02-16     44

关键词:

【中文标题】如何使用 GCP 在 pubsub 模型中一次向所有订阅者发送消息【英文标题】:How to send message to all subscribers at once in pubsub model using GCP 【发布时间】:2021-03-16 02:53:30 【问题描述】:

使用google云平台实现pubsub模型,使用函数创建topic、subscriber、publish和pullmsg函数。

func pullMsgs(projectID, subID string, jsonPath string) error 
    ctx := context.Background()
    client, err := pubsub.NewClient(ctx, projectID, option.WithCredentialsFile(jsonPath))
    if err != nil 
        return fmt.Errorf("pubsub.NewClient: %v", err)
    

    // Consume 10 messages.
    var mu sync.Mutex
    received := 0
    sub := client.Subscription(subID)
    cctx, cancel := context.WithCancel(ctx)
    err = sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) 
        mu.Lock()
        defer mu.Unlock()
        // fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
        fmt.Println("Got message: n", string(msg.Data))
        msg.Ack()
        received++
        if received == 10 
            cancel()
        
    )
    if err != nil 
        return fmt.Errorf("Receive: %v", err)
    
    return nil

pullmsg 函数使用订阅 ID 从发布者获取发布的消息。 假设该模型有 3 个特定主题的订阅者。如果发布者为该主题发布消息。 pullmsg 函数必须执行 3 次才能为所有订阅者获取该消息。 有没有什么方法可以一次将发布的消息发送给所有订阅者。

【问题讨论】:

按照设计,每个订阅(以及每个订阅者)都会收到同一消息的不同副本。您能否澄清这 3 个订阅者是否处于同一进程中?如果是这样,是否有 1 个订阅者不够的原因? 【参考方案1】:

我不明白您为什么要一次从 3 个订阅中提取。因为这些消息可能不会在同一时间/大约同时到达。因此,如果您想合并这些消息,您的代码必须等到它收到所有消息。这并不是 Pub/Sub 的真正目的。另一方面,如果您确实想要合并这三个消息,我建议您为每个订阅创建单独的一段代码。

话虽如此,您可以使用 同步 拉取机制或 异步 拉取机制来拉取所有消息。如果您不希望您的代码被阻塞以便它基本上同时监听所有订阅,您可以使用异步拉取。

代码流可以是:

创建非阻塞订阅者函数。 当 x 条消息到达时让这个 pull 函数停止(中断一个 while 循环)。 创建三个传递您的三个主题 ID 的函数实例。 为您处理创建另一个函数。

更多信息可以在here找到。

【讨论】:

在 GCP 上使用 Pubsub 时如何解决身份验证范围不足的问题

】在GCP上使用Pubsub时如何解决身份验证范围不足的问题【英文标题】:HowtosolveinsufficientauthenticationscopeswhenusePubsubonGCP【发布时间】:2020-07-0213:45:08【问题描述】:我正在尝试构建2个微服务(在JavaSpringBoot中)以使用GCPPub/Sub相互... 查看详情

如何使用 gsutil 向 GCP 存储桶添加 pubsub 通知

】如何使用gsutil向GCP存储桶添加pubsub通知【英文标题】:HowtoaddpubsubnotificationtoGCPstoragebucketusinggsutil【发布时间】:2021-07-2510:21:52【问题描述】:我想为GCp存储桶添加pubsub通知。因此,每当文件放入GCP存储桶时,就会发送pubsub通... 查看详情

需要一个解决方案来实现xmpp在群聊中添加朋友并在ios中一次向他们发送消息

...:我在添加朋友、发送邀请以及在群聊中遇到困难,例如使用xmpp为所有人发送一条消息。我知道我需要使用XEP-0045。但 查看详情

如何在cakephp中一次发送多封电子邮件

】如何在cakephp中一次发送多封电子邮件【英文标题】:Howtosendamultipleemailsatatimeincakephp【发布时间】:2011-09-0621:46:07【问题描述】:我需要一次发送多封电子邮件,任何人都可以举个例子吗?或任何想法?我需要一次向我的所有... 查看详情

GCP PubSub 使用 Cloud Functions 神秘/无声地失败

】GCPPubSub使用CloudFunctions神秘/无声地失败【英文标题】:GCPPubSubmysteriously/silentlyfailingwithCloudFunctions【发布时间】:2021-11-1413:47:35【问题描述】:我有大约十几个GCF函数(Python),它们每天运行一次。为了保持正确的顺序,我使用Pub... 查看详情

如何在 apache camel 中执行 gcp pubsub 消息的并行处理

】如何在apachecamel中执行gcppubsub消息的并行处理【英文标题】:howtoperformparallelprocessingofgcppubsubmessagesinapachecamel【发布时间】:2020-08-1317:15:03【问题描述】:我在下面有这段代码,它从pubsub源主题获取消息->根据模板对其进行转... 查看详情

如何从 GCP 函数 GUI 调用 PubSub 函数

】如何从GCP函数GUI调用PubSub函数【英文标题】:HowinvokeaPubSubfunctionfromtheGCPFunctionsGUI【发布时间】:2020-07-1005:08:23【问题描述】:我部署了以下功能:interfaceMessageDatareviewId:string;exportconstapplyPreAssessRules=functions.pubsub.topic("applyPreAsses 查看详情

GCP PubSub:通过 CURL 类型的请求发布消息

...6-2023:47:07【问题描述】:有没有人有一个工作示例,说明如何通过CURL类型的命令直接从shell将消息发布到GCPPubSub主题?我正在尝试不使用CLI并且不使用客户端库,但我对OAUTH的东西越来越感兴趣了。如果有一个在Linux上运行的bas 查看详情

GCP 云功能未正确接收/确认 PubSub 消息

...些位置处理物理地址并返回一些关于它们的指标。工作流使用CloudFunctions和PubSub流的组合。在工作 查看详情

如何将 GCP Pubsub 订阅的消息转发到另一个主题?

】如何将GCPPubsub订阅的消息转发到另一个主题?【英文标题】:HowcanIforwardaGCPPubsubsubscription\'smessagestoanothertopic?【发布时间】:2017-04-0111:35:34【问题描述】:我希望能够将GoogleCloudPlatform(GCP)Pubsub订阅的消息转发到另一个GCP主题,... 查看详情

GCP - 如何添加关于发送到 pubsub 死信队列的消息数量的警报?

】GCP-如何添加关于发送到pubsub死信队列的消息数量的警报?【英文标题】:GCP-howtoaddalertonnumberofmessagessenttoapubsubdeadletterqueue?【发布时间】:2020-10-2204:03:10【问题描述】:我的应用程序处理来自pubsub主题的消息,如果失败,消息... 查看详情

GCP PubSub - 使用 orderingKey (Nodejs) 进行批处理

】GCPPubSub-使用orderingKey(Nodejs)进行批处理【英文标题】:GCPPubSub-BatchingwithorderingKey(Nodejs)【发布时间】:2021-09-0519:48:52【问题描述】:使用googlePubSub批处理和orderingKey时出现不稳定的行为。如果我使用批量PubSub而不使用orderingKey,... 查看详情

在 Django 中一次更新所有模型

】在Django中一次更新所有模型【英文标题】:UpdateallmodelsatonceinDjango【发布时间】:2013-03-2014:46:33【问题描述】:我正在尝试在Django(python)中以特定顺序一次更新所有对象的位置字段。我现在就是这样做的,但问题是它会产生大... 查看详情

在 GCP PubSub 和 Spring Boot 中捕获发布错误

】在GCPPubSub和SpringBoot中捕获发布错误【英文标题】:CatchingpublisherrorsinGCPPubSubandSpringBoot【发布时间】:2020-10-1309:41:26【问题描述】:我有一个SpringBoot应用程序,它需要偶尔将消息发布到GCPPubSub。我按照SpringBoot页面(https://spring.io... 查看详情

使用 GCP pubsub 的 Spring Cloud Stream 消费者的并发设置

】使用GCPpubsub的SpringCloudStream消费者的并发设置【英文标题】:ConcurrencysettingsforSpringCloudStreamconsumerwithGCPpubsub【发布时间】:2020-06-0123:44:05【问题描述】:我的应用程序正在使用绑定到GCPpubsub的SpringCloudStream接收消息。我正在试... 查看详情

每个用户的 GCP PubSub(或 GCP 任务)同步处理

】每个用户的GCPPubSub(或GCP任务)同步处理【英文标题】:GCPPubSub(orGCPTasks)SynchronousProcessingperUser【发布时间】:2021-03-0419:26:29【问题描述】:我有一个需要处理一组事件的用例。我需要他们对每个用户进行整体并行处理但串行... 查看详情

GCP Pubsub 未传递消息的数量不会改变

...change【发布时间】:2021-08-2423:59:27【问题描述】:我正在使用pubsub来触发我定义为最多具有10个实例的云功能。当大约300条或更多消息到达主题并开始触发该功能时,突然未确认的消息数量停止增长,它只是没有改变,尽管我知... 查看详情

从现有的 GCP pubsub 订阅中消费

...【发布时间】:2020-03-0412:44:43【问题描述】:我正在尝试使用此存储库中托管的示例代码。https://github.com/spring-cloud/spring-cloud-gcp/tree/master/spring-cloud-gcp-samples/spring-cloud-gcp 查看详情