Pubsub.pull 请求无法正常工作 - 去吧

     2023-02-16     215

关键词:

【中文标题】Pubsub.pull 请求无法正常工作 - 去吧【英文标题】:Pubsub.pull request does not working properly - go 【发布时间】:2020-02-17 10:19:36 【问题描述】:

我正在尝试使用 go 客户端库一次从 pub-sub 订阅中提取 1 条消息。但是即使订阅中存在消息,消息也不会拉取请求请求。订阅者正在等待处理所有消息。

我正在尝试使用基本代码,一次提取一条消息。我使用了两个实例,并在两个实例的后台运行脚本(创建订阅者)4 次。我已将 ack_deadline 设置为 10 秒。 我期待的结果是每个订阅者都应该在一条消息确认后从订阅中获取下一条消息。但是在最后一条消息处理完成之前,消息不会拉入实例。 为什么在一条消息处理完成后没有拉取消息?据我所知,不应该对实例或订阅者有任何依赖。 让 mi 知道需要设置的任何其他更改或参数。 提前致谢。

这是一个实例的日志:

2019/10/21 05:22:07 Got message: Message 0 at 2019-10-21 05:22:07.022772532 
2019/10/21 05:22:11 Got message: Message 1 at 2019-10-21 05:22:11.330566981 
2019/10/21 05:22:14 Got message: Message 2 at 2019-10-21 05:22:14.803031569 
2019/10/21 05:22:18 Got message: Message 3 at 2019-10-21 05:22:18.452912271 
2019/10/21 05:38:39 Acking message: Message 3 at 2019-10-21 05:38:39.471739478 
2019/10/21 05:39:10 Acking message: Message 0 at 2019-10-21 05:39:10.039336794 
2019/10/21 05:41:22 Acking message: Message 1 at 2019-10-21 05:41:22.351124342 
2019/10/21 05:50:31 Acking message: Message 2 at 2019-10-21 05:50:31.829087762 
2019/10/21 05:50:39 Got message: Message 13 at 2019-10-21 05:50:39.005916608
2019/10/21 05:50:39 Got message: Message 11 at 2019-10-21 05:50:39.00623238 
2019/10/21 05:50:39 Got message: Message 15 at 2019-10-21 05:50:39.007216256
2019/10/21 05:50:39 Got message: Message 12 at 2019-10-21 05:50:39.008066257 

第二个实例的日志:

2019/10/21 05:22:29 Got message: Message 4 at 2019-10-21 05:22:29.331569077 
2019/10/21 05:22:33 Got message: Message 5 at 2019-10-21 05:22:33.018801275 
2019/10/21 05:22:36 Got message: Message 6 at 2019-10-21 05:22:36.803434547 
2019/10/21 05:22:40 Got message: Message 7 at 2019-10-21 05:22:40.409314927 
2019/10/21 05:39:38 Acking message: Message 4 at 2019-10-21 05:39:38.349619635 
2019/10/21 05:42:42 Acking message: Message 6 at 2019-10-21 05:42:42.819874065 
2019/10/21 05:47:40 Acking message: Message 5 at 2019-10-21 05:47:40.049128075 
2019/10/21 05:50:38 Acking message: Message 7 at 2019-10-21 05:50:38.42874031 
2019/10/21 05:50:39 Got message: Message 8 at 2019-10-21 05:50:39.005090906 
2019/10/21 05:50:39 Got message: Message 9 at 2019-10-21 05:50:39.005334146 
2019/10/21 05:50:39 Got message: Message 16 at 2019-10-21 05:50:39.006427796 
2019/10/21 05:50:39 Got message: Message 14 at 2019-10-21 05:50:39.007231713 
package main
// [START pubsub_publish_with_error_handling_that_scales]
import (
    "context"
    "fmt"
    "os"
    "log"
    "time"
    "math/rand"
    pubsub "cloud.google.com/go/pubsub/apiv1"
    pubsubpb "google.golang.org/genproto/googleapis/pubsub/v1"
)


func main()
    f, _:= os.OpenFile("testlogfile", os.O_RDWR | os.O_CREATE | os.O_APPEND, 0666)
    defer f.Close()
    log.SetOutput(f)
    rand.Seed(time.Now().UTC().UnixNano())
    pullMsgs("sureline-dev-1264", "sub7")


func random(min, max int) int 
    return rand.Intn(max - min) + min


func pullMsgs(projectID, subscriptionID string) error 
    ctx := context.Background()
    client, err := pubsub.NewSubscriberClient(ctx)
    if err != nil 
        log.Fatal(err)
    
    defer client.Close()
    sub := fmt.Sprintf("projects/%s/subscriptions/%s", projectID, subscriptionID)
// Be sure to tune the MaxMessages parameter per your project's needs, and accordingly
// adjust the ack behavior below to batch acknowledgements.
    req := pubsubpb.PullRequest
        Subscription: sub,
        MaxMessages:  1,
    

    fmt.Println("Listening..")

    for 
        res, err := client.Pull(ctx, &req)
        if err != nil 
            log.Fatal(err)
        

    // client.Pull returns an empty list if there are no messages available in the
    // backlog. We should skip processing steps when that happens.
        if len(res.ReceivedMessages) == 0 
            continue
        

        var recvdAckIDs []string
        for _, m := range res.ReceivedMessages 
            recvdAckIDs = append(recvdAckIDs, m.AckId)
        

        var done = make(chan struct)
        var delay = 0 * time.Second // Tick immediately upon reception
        var ackDeadline = 10 * time.Second

    // Continuously notify the server that processing is still happening on this batch.
        go func() 
            for 
                select 
                case <-ctx.Done():
                    return
                case <-done:
                    return
                case <-time.After(delay):
                    err := client.ModifyAckDeadline(ctx, &pubsubpb.ModifyAckDeadlineRequest
                        Subscription:       sub,
                        AckIds:             recvdAckIDs,
                        AckDeadlineSeconds: int32(ackDeadline.Seconds()),
                    )
                    if err != nil 
                        log.Fatal(err)
                    
                    delay = ackDeadline - 5*time.Second // 5 seconds grace period.
                
            
        ()

        for _, m := range res.ReceivedMessages 
            // Process the message here, possibly in a goroutine.
            log.Printf("Got message: %s at %v", string(m.Message.Data), time.Now())
            fmt.Printf("Got message: %s at %v", string(m.Message.Data), time.Now())
            myrand := random(240, 420)
            log.Printf("Sleeping %d seconds...\n", myrand)
            time.Sleep(time.Duration(myrand)*time.Second)
            err := client.Acknowledge(ctx, &pubsubpb.AcknowledgeRequest
                Subscription: sub,
                AckIds:       []stringm.AckId,
            )
            log.Printf("Acking message: %s at %v", string(m.Message.Data), time.Now())
            fmt.Printf("Acking message: %s at %v", string(m.Message.Data), time.Now())
            if err != nil 
                log.Fatal(err)
            
        

        close(done)
    

我希望输出类似于在完成第一条消息处理后从订阅中获取下一条消息。它不应该依赖于任何其他实例。

【问题讨论】:

我删除了我的答案,我没有很好地理解你的问题,我没有很好地看到日志时间条目。 【参考方案1】:

尝试以这种方式一次提取一条消息是 Cloud Pub/Sub 的反模式。在您的情况下,您的订阅者可能最终与不同的服务器进行通信,这些服务器分配了要发送给连接到它们的订阅者的消息。 Cloud Pub/Sub 期望同时从使用此方法接收消息的客户端接收多个拉取请求。因此,您应该同时处理多个未完成的拉取请求,或者您应该使用asynchronous pull via the Receive method。

【讨论】:

Ajax 请求在 Django 中无法正常工作

】Ajax请求在Django中无法正常工作【英文标题】:Ajaxrequestnotworkingproperlyindjango【发布时间】:2018-12-0301:20:47【问题描述】:ajax请求的JS代码<scripttype="text/javascript">$(document).ready(function()console.log("IN")varcartUpdateForm=$(\'.js_cart_ 查看详情

Flutter HTTP 发布请求 JSON 处理无法正常工作

】FlutterHTTP发布请求JSON处理无法正常工作【英文标题】:FlutterHTTPpostrequestJSONhandlingnotworkingproperly【发布时间】:2021-11-1423:27:07【问题描述】:我正在使用带有颤振应用程序的nodejs后端。我使用flutterhttp包发送请求。我想通过请求... 查看详情

Google PubSub Pull 与 Streaming Pull 的区别

】GooglePubSubPull与StreamingPull的区别【英文标题】:GooglePubSubPullvsStreamingPulldifferences【发布时间】:2019-10-0502:05:50【问题描述】:我正在阅读来自Google的documentation关于拉取与流拉取的内容,但不太明白。有人可以向我解释其中的区... 查看详情

jQuery AJAX 获取请求无法正常工作,返回值无法在控制台显示

】jQueryAJAX获取请求无法正常工作,返回值无法在控制台显示【英文标题】:jQueryAJAXgetrequestdoesn\'tworknormallyandthereturnvaluecan\'tbedisplayedintheconsole【发布时间】:2020-05-1407:18:27【问题描述】:我是jQuery新手,我正在尝试使用AJAX构建... 查看详情

谷歌地图 api findplace 请求无法正常工作

】谷歌地图apifindplace请求无法正常工作【英文标题】:googlemapsapifindplacerequestnotworkingproperly【发布时间】:2021-10-2716:04:32【问题描述】:我想在我的网页上显示来自已显示药房名称的JSON中的位置。关键是谷歌地图API有点超出我的... 查看详情

在zuul过滤器中修改请求正文无法正常工作

】在zuul过滤器中修改请求正文无法正常工作【英文标题】:modifyingrequestbodydoesn\'tworkproperlyinzuulfilter【发布时间】:2018-01-1023:15:17【问题描述】:我使用springcloudzuul作为API网关,在前置过滤器中,它解密来自前端应用程序的请求... 查看详情

Rails3 中的 Ajax 请求,无法正常工作

】Rails3中的Ajax请求,无法正常工作【英文标题】:AjaxrequestinRails3,can\'tgetittowork【发布时间】:2012-05-1001:06:26【问题描述】:我尝试使用AJAX重构我的项目。我有一个链接...=link_to("Pleasework","show_recent_chart",:remote=>true)...xxx控制器... 查看详情

CSRF(跨站请求伪造)在 Laravel 中无法正常工作

】CSRF(跨站请求伪造)在Laravel中无法正常工作【英文标题】:CSRF(CrossSiteRequestForgery)doesnotworkasexpectedinLaravel【发布时间】:2015-10-0410:53:15【问题描述】:我正在使用“LaravelCodeBright”学习Laravel。在本书的“表单安全”部分,它... 查看详情

Spring CorsFilter 似乎无法正常工作,但仍会收到 401 的预检请求

】SpringCorsFilter似乎无法正常工作,但仍会收到401的预检请求【英文标题】:SpringCorsFilterdoesnotseemtoworkstillreceivinga401onpreflightrequests【发布时间】:2016-11-3019:12:19【问题描述】:您好,我是SpringSecurity的新手,正在尝试弄清楚如何... 查看详情

Spring Data REST - PUT 请求自 v.2.5.7 以来无法正常工作

】SpringDataREST-PUT请求自v.2.5.7以来无法正常工作【英文标题】:SpringDataREST-PUTrequestdoesnotworkproperlysincev.2.5.7【发布时间】:2018-01-1902:03:22【问题描述】:由于版本2.5.7SpringDataREST无法正确执行PUT请求来更新具有关联资源的资源。与... 查看详情

此页面无法正常工作 www.janmukti.com 目前无法处理此请求。 HTTP 错误 500

】此页面无法正常工作www.janmukti.com目前无法处理此请求。HTTP错误500【英文标题】:Thispageisn’tworkingwww.janmukti.comiscurrentlyunabletohandlethisrequest.HTTPERROR500【发布时间】:2017-10-2113:38:09【问题描述】:这是我第一次将我的laravel项目放... 查看详情

KSOAP 生成的 SOAP 请求不会导致任何错误但无法正常工作

】KSOAP生成的SOAP请求不会导致任何错误但无法正常工作【英文标题】:KSOAPgeneratedSOAPrequestdon\'tcauseanyerrorbutnotworking【发布时间】:2017-02-1507:38:04【问题描述】:这是我的问题:由KSOAP23.6.1android生成:不会导致任何XML/SOAP错误,但W... 查看详情

Tomcat ExpiresFilter 无法正常工作

...正确过期定义的图像。浏览器不断发送已下载图像的获取请求,Tomcat以304响应。我想要的是Tomcat将使用适当的expires标头响应初始请求,并且没有任何Last-modified标头,因此浏览器将使 查看详情

Magnolia:启用缓存过滤器导致 Facebook 共享无法正常工作时,范围请求不提供内容

...a:启用缓存过滤器导致Facebook共享无法正常工作时,范围请求不提供内容【英文标题】:Magnolia:Rangerequestdoesn\'tservecontentwhencachefilterenabledresultinginFacebookSharingnottowork【发布时间】:2019-12-1909:14:48【问题描述】:当向Magnolia发送带... 查看详情

如何通过 terraform 使用服务帐户创建 google cloud pubsub pull 订阅?

】如何通过terraform使用服务帐户创建googlecloudpubsubpull订阅?【英文标题】:Howtocreateagooglecloudpubsubpullsubscriptionswithserviceaccountbyterraform?【发布时间】:2021-10-1902:49:34【问题描述】:在google_pubsub_subscription的terraform文档中,它提到在... 查看详情

Desktop Bridge allowElevation 受限功能无法正常工作,UAC 后出现错误“请求的操作需要提升”

...pBridgeallowElevation受限功能无法正常工作,UAC后出现错误“请求的操作需要提升”【英文标题】:DesktopBridgeallowElevationrestrictedcapabilitynotworkingproperly,error"requestedoperationrequireselevation"afterUAC【发布时间】:2021-05-1804:10:09【问题... 查看详情

如何处理 GCP WordPress 错误“此页面无法正常工作 example.com 当前无法处理此请求。HTTP ERROR 500

...rdPress错误“此页面无法正常工作example.com当前无法处理此请求。HTTPERROR500【英文标题】:HowtodealwithGCPWordPresserror"Thispageisn’tworkingexample.comiscurrentlyunabletohandlethisrequest.HTTPERROR500【发布时间】:2019-02-1412:36:12【问题描述】:起... 查看详情

带有 Like 语句的 MS-Access 上的 SQL 请求似乎永远无法正常工作

】带有Like语句的MS-Access上的SQL请求似乎永远无法正常工作【英文标题】:SQLrequestonMS-AccesswithLikestatementseemstobeneverworking【发布时间】:2017-07-0307:00:09【问题描述】:我在一个请求中尝试了几种“Like”语法(我使用的是VB.NET和一... 查看详情