Google Cloud PubSub 不确认消息

     2023-02-16     262

关键词:

【中文标题】Google Cloud PubSub 不确认消息【英文标题】:Google Cloud PubSub not ack messages 【发布时间】:2019-07-02 23:38:09 【问题描述】:

我们有基于 GCP PubSub 的发布者和订阅者系统。订阅者处理单个消息的时间很长,大约 1 分钟。我们已经将订阅者确认截止时间设置为 600 秒(10 分钟)(最长 1 秒),以确保 pubsub 不会过早开始重新交付,因为基本上我们这里有长时间运行的操作。

我看到了 PubSub 的这种行为。当代码发送 ack 和监视器确认 PubSub 确认请求已被接受并且确认本身已完成并成功状态时,未确认消息的总数仍然相同。

图表上的指标对总和、计数和均值聚合调整器显示相同。上图中的 aligner 是平均值,没有启用减速器。

我正在使用 @google-cloud/pubsub Node.js 库。已经尝试过不同的版本(0.18.1、0.22.2、0.24.1),但我想问题不在其中。

下面的类可以用来检查。

TypeScript 3.1.1,节点 8.x.x - 10.x.x

import  exponential, Backoff  from "backoff";

const pubsub = require("@google-cloud/pubsub");

export interface IMessageHandler 
    handle (message): Promise<void>;


export class PubSubSyncListener 
    private readonly client;

    private listener: Backoff;

    private runningOperations: Promise<unknown>[] = [];

    constructor (
        private readonly handler: IMessageHandler,
        private readonly options: 
            /**
             * Maximal messages number to be processed simultaniosly.
             * Listener will try to keep processing number as close to provided value
             * as possible.
             */
            maxMessages: number;
            /**
             * Formatted full subscrption name /projects/projectName/subscriptions/subscriptionName
             */
            subscriptionName: string;
            /**
             * In milliseconds
             */
            minimalListenTimeout?: number;
            /**
             * In milliseconds
             */
            maximalListenTimeout?: number;
        
    ) 
        this.client = new pubsub.v1.SubscriberClient();

        this.options = Object.assign(
            minimalListenTimeout: 300,
            maximalListenTimeout: 30000
        , this.options);
    

    public async listen () 
        this.listener = exponential(
            maxDelay: this.options.maximalListenTimeout,
            initialDelay: this.options.minimalListenTimeout
        );

        this.listener.on("ready", async () => 
            if (this.runningOperations.length < this.options.maxMessages) 
                const [response] = await this.client.pull(
                    subscription: this.options.subscriptionName,
                    maxMessages: this.options.maxMessages - this.runningOperations.length
                );

                for (const m of response.receivedMessages) 
                    this.startMessageProcessing(m);
                
                this.listener.reset();
                this.listener.backoff();
             else 
                this.listener.backoff();
            
        );

        this.listener.backoff();
    

    private startMessageProcessing (message) 
        const index = this.runningOperations.length;

        const removeFromRunning = () => 
            this.runningOperations.splice(index, 1);
        ;

        this.runningOperations.push(
            this.handler.handle(this.getHandlerMessage(message))
                .then(removeFromRunning, removeFromRunning)
        );
    

    private getHandlerMessage (message) 
        message.message.ack = async () => 
            const ackRequest = 
                subscription: this.options.subscriptionName,
                ackIds: [message.ackId]
            ;

            await this.client.acknowledge(ackRequest);
        ;

        return message.message;
    

    public async stop () 
        this.listener.reset();
        this.listener = null;
        await Promise.all(
            this.runningOperations
        );
    

这基本上是异步拉取消息和立即确认的部分实现。因为建议的解决方案之一是使用同步拉动。

如果我没记错问题的症状,我在 java 存储库中发现了类似的报告问题。

https://github.com/googleapis/google-cloud-java/issues/3567

这里的最后一个细节是,确认似乎适用于少量请求。如果我在 pubsub 中触发单个消息然后立即处理它,未传递的消息数量会减少(下降到 0,因为之前只有一条消息)。

问题本身 - 发生了什么以及为什么未确认的消息数量没有在收到确认后减少?

【问题讨论】:

【参考方案1】:

引用the documentation 的话,您使用的订阅/num_undelivered_messages 指标是“订阅中未确认的消息(也称为积压消息)的数量。每 60 秒采样一次。采样后,数据不是最长可见 120 秒。"

您不应期望该指标会在确认消息后立即降低。此外,听起来好像您正在尝试将 pubsub 用于只发送一次的情况,试图确保不会再次发送消息。 Cloud Pub/Sub 不提供这些语义。它至少提供一次语义。换句话说,即使您收到了一个值,确认了它,收到了确认响应,并且看到指标从 1 下降到 0,但同一个工作人员或另一个工作人员仍然有可能并且正确地接收到该消息的完全相同的副本.尽管在实践中这不太可能,但您应该专注于构建一个允许重复的系统,而不是试图确保您的 ack 成功,这样您的消息就不会被重新传递。

【讨论】:

如何在 Firebase Cloud Functions 中确认 PubSub 消息?

】如何在FirebaseCloudFunctions中确认PubSub消息?【英文标题】:HowcanyouacknowledgeaPubSubmessageinFirebaseCloudFunctions?【发布时间】:2020-05-0502:13:05【问题描述】:我想知道确认PubSub消息(来自推送订阅者)的正确方法是什么,这是一个Fireba... 查看详情

使用 Google Cloud PubSub 不断收到“向 Cloud PubSub 发送测试消息时出错...”

】使用GoogleCloudPubSub不断收到“向CloudPubSub发送测试消息时出错...”【英文标题】:Keepgetting\'ErrorsendingtestmessagetoCloudPubSub...\'withGoogleCloudPubSub【发布时间】:2015-11-1005:14:03【问题描述】:我正在尝试将Google的推送PubSub设置到我的... 查看详情

Google PubSub Request 消息即使确认?

】GooglePubSubRequest消息即使确认?【英文标题】:GooglrPubSubRequestmessagesevenifacknowledged?【发布时间】:2021-06-1815:35:39【问题描述】:我正在接收来自我创建的订阅的消息:subscriber=pubsub_v1.SubscriberClient(credentials=credentials)subscriber.create... 查看详情

通过 Google Cloud Dataflow 将 PubSub 消息插入 BigQuery

】通过GoogleCloudDataflow将PubSub消息插入BigQuery【英文标题】:InsertPubSubmessagesintoBigQuerythroughGoogleCloudDataflow【发布时间】:2015-12-1410:57:58【问题描述】:我想使用GoogleCloudDataflow将来自某个主题的PubSub消息数据插入到BigQuery表中。一... 查看详情

获取 Google Cloud PubSub 中单条消息的大小

】获取GoogleCloudPubSub中单条消息的大小【英文标题】:GetthesizeofasinglemessageinGoogleCloudPubSub【发布时间】:2021-05-1806:00:34【问题描述】:我有一个设置,我将消息发布到GoogleCloudPubSub服务。我希望获取我发布到PubSub的每条消息的大... 查看详情

没有消息时的 Google Cloud PubSub 费用

】没有消息时的GoogleCloudPubSub费用【英文标题】:CostofGoogleCloudPubSubwhentherearenomessages【发布时间】:2019-07-3112:47:58【问题描述】:我正在查看Pub/Sub定价,如果订阅者点击端点但没有收到任何消息,我无法获得价格。因为我的想法... 查看详情

如何使用 Node.js 控制 Cloud PubSub 中的确认

】如何使用Node.js控制CloudPubSub中的确认【英文标题】:HowcanIcontrolacknowledgementinCloudPubSubusingNode.js【发布时间】:2018-03-3000:29:00【问题描述】:基本上我已经创建了一个云函数(编写了一个Node.js代码),它将触发云pubsub主题的消... 查看详情

PubSub 不确认消息

】PubSub不确认消息【英文标题】:PubSubisn\'tacknowledgingmessages【发布时间】:2020-08-0801:08:00【问题描述】:我有一个pubsub订阅(除了go-routines的数量之外的所有默认设置都是1000),并且由于某种原因,消息永远不会得到确认,因此... 查看详情

推送订阅发送多条消息远早于消息确认截止日期

...:05:21【问题描述】:我为GCS设置了Pubsub通知。(https://cloud.google.com/storage/docs/pubsub-notifications)我将P 查看详情

如何在 Google Cloud App Engine 上使用 PubSub 创建订阅者,该订阅者通过 Publisher 从 Google Cloud App Engine Flex 收听消息?

】如何在GoogleCloudAppEngine上使用PubSub创建订阅者,该订阅者通过Publisher从GoogleCloudAppEngineFlex收听消息?【英文标题】:HowtocreateasubscriberwithPubSubonGoogleCloudAppEnginethatlistenstoamessageviaPublisherfromaGoogleCloudAppEngineFlex?【发布时间】:2020- 查看详情

获取 google cloud pubsub 的指标

】获取googlecloudpubsub的指标【英文标题】:getmetricsforgooglecloudpubsub【发布时间】:2016-12-3102:20:01【问题描述】:我正在使用googlecloudpubsub,我想知道如何在pubsub中获取未发送、已发送和未发送消息的数量。googlepubsub是否为此提供... 查看详情

排空或清除 Google Cloud pubsub 主题的最佳做法 [关闭]

】排空或清除GoogleCloudpubsub主题的最佳做法[关闭]【英文标题】:BestpracticesfordrainingorclearingaGoogleCloudpubsubtopic[closed]【发布时间】:2017-01-1619:31:30【问题描述】:对于消息数量在~100k范围内的pubsub主题,使用gcloud-javaSDK排出/丢弃/清... 查看详情

google.cloud.pubsub_v1 和 google.cloud.pubsub 有啥区别?

】google.cloud.pubsub_v1和google.cloud.pubsub有啥区别?【英文标题】:Whatisthedifferencebetweengoogle.cloud.pubsub_v1andgoogle.cloud.pubsub?google.cloud.pubsub_v1和google.cloud.pubsub有什么区别?【发布时间】:2018-06-2005:04:42【问题描述】:我看到两者都在Goo... 查看详情

[已解决]Pubsub 推送订阅不确认消息

】[已解决]Pubsub推送订阅不确认消息【英文标题】:Pubsubpushsubscriptionnotacknowledgingmessages【发布时间】:2021-03-0502:55:27【问题描述】:这是我的设置。订阅A是一种推送订阅,可将消息发布到云运行部署。该部署公开了一个HTTP端点... 查看详情

Go GCP Cloud PubSub 不批量发布消息

】GoGCPCloudPubSub不批量发布消息【英文标题】:GoGCPCloudPubSubnotbatchpublishingmessages【发布时间】:2019-07-0321:11:05【问题描述】:我正在开发一个示例项目,该项目从bigquery获取输出并将其发布到pubsub。bigquery的行输出可能大于100,000。... 查看详情

如何使用 Google Cloud PubSub 和 Run 处理资源密集型长时间运行的任务?

】如何使用GoogleCloudPubSub和Run处理资源密集型长时间运行的任务?【英文标题】:HowtouseGoogleCloudPubSubandRuntohandleresource-intensivelong-runningtasks?【发布时间】:2019-12-1308:35:32【问题描述】:我有一个GoogleCloudPubSub主题,该主题有时有... 查看详情

Firebase:我可以在 Firebase 云功能中“不确认”一条 PubSub 消息吗?

】Firebase:我可以在Firebase云功能中“不确认”一条PubSub消息吗?【英文标题】:Firebase:CanI"notacknowledge"aPubSubmessageinsideaFirebasecloudfunction?【发布时间】:2018-01-1012:10:04【问题描述】:我有一个Firebase云函数,它通过PubSub消... 查看详情

C++ 中的 Google Cloud Pubsub Async Streaming API

】C++中的GoogleCloudPubsubAsyncStreamingAPI【英文标题】:GoogleCloudPubsubAsyncStreamingAPIinC++【发布时间】:2020-10-1114:33:36【问题描述】:我正在尝试查找有关通过异步grpc使用PubsubStreamingAPI的文档,但找不到任何文档。我有这个简单的代码... 查看详情