如何使用当前的 pubsub 订阅者从 google Pub/Sub 系统获取消息

     2023-02-16     46

关键词:

【中文标题】如何使用当前的 pubsub 订阅者从 google Pub/Sub 系统获取消息【英文标题】:How to get messages from googles Pub/Sub sytsem by using the current pubsub subsciber 【发布时间】:2019-06-09 09:09:58 【问题描述】:

我需要使用基于 python 的订阅者从 googles Pub/Sub 系统接收已发布的消息。

为此,我做了以下步骤:

在 Web 控制台上,我创建了一个项目、一个注册表、一个遥测主题、一个设备,并将订阅主题附加到遥测主题 我的代码可以通过 mqtt 桥发布消息以及 pubsub 库的发布功能 我可以使用以下 cmd 在终端上提取此消息:
gcloud pubsub subscriptions pull --auto-ack projects/project_id/subscriptions/subscription_topic

在下面你会看到我的代码中重要的 sn-p。它基于 git-examples,但在 google-cloud-pubsub 包的 0.39.1 版本中似乎不再存在某些功能。一个例子是subscriber.subscription_path() 方法。

def receive_messages(subscription_path, service_account_json):
    import time
    from google.cloud import pubsub_v1
    subscriber = pubsub_v1.SubscriberClient(credentials=service_account_json)

    #subscription_path = subscriber.subscription_path(
    #   project_id, subscription_name)

    def callback(message):
        print('Received message: '.format(message))
        message.ack()

    subscriber.subscribe(subscription_path, callback=callback)

    print('Listening for messages on '.format(subscription_path))
    while True:
        time.sleep(60)

当我运行这个函数时,无数线程在后台一点一点启动,但似乎没有一个线程退出或启动回调函数。

我希望安装了所有要求:
pip3 freeze

asn1crypto==0.24.0
cachetools==3.0.0
certifi==2018.11.29
cffi==1.11.5
chardet==3.0.4
cryptography==2.4.2
google-api-core==1.7.0
google-api-python-client==1.7.5
google-auth==1.6.2
google-auth-httplib2==0.0.3
google-auth-oauthlib==0.2.0
google-cloud-bigquery==1.8.1
google-cloud-core==0.29.1
google-cloud-datastore==1.7.3
google-cloud-monitoring==0.31.1
google-cloud-pubsub==0.39.1
google-resumable-media==0.3.2
googleapis-common-protos==1.5.6
grpc-google-iam-v1==0.11.4
grpcio==1.17.1
httplib2==0.12.0
idna==2.8
keyring==10.1
keyrings.alt==1.3
oauthlib==3.0.0
paho-mqtt==1.4.0
protobuf==3.6.1
pyasn1==0.4.5
pyasn1-modules==0.2.3
pycparser==2.19
pycrypto==2.6.1
pycurl==7.43.0
pygobject==3.22.0
PyJWT==1.6.4
python-apt==1.4.0b3
pytz==2018.9
pyxdg==0.25
redis==3.0.1
requests==2.21.0
requests-oauthlib==1.2.0
RPi.GPIO==0.6.5
rsa==4.0
SecretStorage==2.3.1
six==1.12.0
unattended-upgrades==0.1
uritemplate==3.0.0
urllib3==1.24.1
virtualenv==16.2.0
我在 debian 上以及 Windows 10 上运行该代码并更新了 gcloud:
gcloud components update

在过去的一周里,我一直在尝试不同的解决方案,或者开始使用看似过时的谷歌示例。此外,似乎比代码示例更早的文档也无济于事。所以我希望这里有人可以帮助我最终通过 Pub/Sub-Sytsem 接收基于 python 的客户端消息。

我希望我能提供最重要的信息,并提前感谢您为帮助我所做的努力。

【问题讨论】:

也许这对你的github.com/googleapis/google-cloud-python有帮助 您好 Tamir,感谢您的帮助,但不幸的是,我已经知道这个存储库并且已经尝试保留其中描述的示例,但不幸的是它导致了完全相同的行为。我开始复制以前创建的主题,然后在后台启动了无限数量的线程,没有收到任何消息,也没有调用消息的回调函数。但是,如果我使用 gcloud 工具,我可以订阅并接收同一主题下的消息。最好的问候 【参考方案1】:

python 文档站点here 上维护的示例应该是最新的。在运行任何代码之前,请确保您已遵循“为了使用此库,您首先需要完成以下步骤”部分中的所有步骤。特别是,您可能没有正确设置身份验证,我认为您不应该手动传递凭据路径。

【讨论】:

您好丹尼尔,非常感谢您的帮助。当然,遵循“为了使用此库,您首先需要执行以下步骤”中列出的步骤。我还可以使用下面的“Cloud Tools for PowerShell”发布和订阅。 # Publish: gcloud pubsub topics publish projects / project_id / topics / topic_id --message = "Test Message" # Pull subsirbe: gcloud pubsub subscriptions pull --auto-ack projects / project_id / subscriptions / subscription_id 我已经创建了一个订阅客户端和一个公共客户端,如以下文档中所述:googleapis.github.io/google-cloud-python/latest/pubsub # 发布者客户端将无限期挂在调用“future = publisher. publish" (topic_name, b'My first message !, "spam = 'eggs')" 没有发布过消息。 # 订阅者客户端要么在函数“subscription_respond =subscriber.create_subscription() 中无限期保留name = subscription_name, topic = topic_name)”,要么在没有这个函数的情况下启动future =subscriber.subscribe (subscription_name, callback) 在后台无限多任务而无需调用回调。 快速问题:你说上面是你代码的“重要部分”。在您代码的其他部分的任何时候,您是否正在使用多处理创建子流程?分叉是known 导致grpc 挂起。我的另一个建议是在创建订阅后确保您的待办事项中有未完成的消息,以便订阅者可以阅读。【参考方案2】:
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
print(f"Received message.")
message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, 
callback=callback)
print(f"Listening for messages on subscription_path..\n")


try:

    streaming_pull_future.result(timeout=timeout)
except TimeoutError:
    streaming_pull_future.cancel()  # Trigger the shutdown.
    streaming_pull_future.result()  # Block until the shutdown is 

【讨论】:

您的答案可以通过额外的支持信息得到改进。请edit 添加更多详细信息,例如引用或文档,以便其他人可以确认您的答案是正确的。你可以找到更多关于如何写好答案的信息in the help center。

如何通过 terraform 使用服务帐户创建 google cloud pubsub pull 订阅?

】如何通过terraform使用服务帐户创建googlecloudpubsubpull订阅?【英文标题】:Howtocreateagooglecloudpubsubpullsubscriptionswithserviceaccountbyterraform?【发布时间】:2021-10-1902:49:34【问题描述】:在google_pubsub_subscription的terraform文档中,它提到在... 查看详情

如何通过 terraform 使用服务帐户创建谷歌云 pubsub 订阅?

】如何通过terraform使用服务帐户创建谷歌云pubsub订阅?【英文标题】:Howtocreateagooglecloudpubsubsubscriptionswithserviceaccountbyterraform?【发布时间】:2021-04-1120:26:31【问题描述】:从这个文档中,我想通过terraform创建同样的东西。https://c... 查看详情

Google Pubsub - 接收推送订阅的传递尝试

...由Pubsub推送订阅触发的Google云功能。我想知道给定消息的当前传递尝试。在请求订阅中,它通过设置死信主题来工作,但是我无法在推送订阅消息属性中获得传递尝试。尝试配置死信主题,deliver 查看详情

Pubsub 拉取订阅和并发

...已经阅读了来自Google的关于pubsub并发的文档。他们的示例使用Executor订阅主题。这被配置为具有4个线程,默认为1个拉拔器(因此2个拉拔器将使用8个线程)。当我startAsync时,我认为客户端打开了一个流式 查看详情

Google PubSub 订阅无法从 StatusCode.UNAVAILABLE [code=8a75] 错误中恢复

】GooglePubSub订阅无法从StatusCode.UNAVAILABLE[code=8a75]错误中恢复【英文标题】:GooglePubSubSubscriptioncannotrecoverfromStatusCode.UNAVAILABLE[code=8a75]error【发布时间】:2018-08-3013:07:12【问题描述】:例外情况:线程Thread-ConsumeBidirectionalStream中的... 查看详情

如何使用 GCP 在 pubsub 模型中一次向所有订阅者发送消息

】如何使用GCP在pubsub模型中一次向所有订阅者发送消息【英文标题】:HowtosendmessagetoallsubscribersatonceinpubsubmodelusingGCP【发布时间】:2021-03-1602:53:30【问题描述】:使用google云平台实现pubsub模型,使用函数创建topic、subscriber、publish... 查看详情

将频道与 google pubsub poll 订阅者一起使用

】将频道与googlepubsubpoll订阅者一起使用【英文标题】:usingchannelswithgooglepubsubpollsubscriber【发布时间】:2019-01-1400:05:00【问题描述】:我正在尝试在golang中创建一个googlepubsub订阅者,我一次接收100条消息,然后将它们写入influx。... 查看详情

Google PubSub:使用 AppEngine 推送端点订阅时出现 SSL 错误

】GooglePubSub:使用AppEngine推送端点订阅时出现SSL错误【英文标题】:GooglePubSub:SSLerrorwhensubscribingusinganAppEnginepushendpoint【发布时间】:2015-05-2422:12:11【问题描述】:我创建了一个使用推送端点订阅的主题。端点是一个简单的AppEngin... 查看详情

尝试在 Google PubSub python 中创建主题订阅时出错

...间】:2021-03-1701:40:59【问题描述】:我正在尝试在python中使用Google的pubsub_v1库创建对主题的订阅。我已经使用库成功创建了一个主题(创建后我可以在云控制台中看到它)。但是,我在尝试创建订阅时遇到问题 查看详情

如何在 Flask Server 上的 Google Pubsub 订阅回调中屈服以进行流式传输

...1:36【问题描述】:我想通过Flask端点获得流,但事件是在订阅者的回调中发送到主题(GoogleCloudPubSub)的。请看下面的代码:defa_s 查看详情

从 google pubsub 到 spark 流的数据摄取速度很慢

...isslow【发布时间】:2019-11-3009:36:36【问题描述】:我正在使用谷歌云DataprocSpark集群来运行Spark流式传输作业,该作业从多个PubSub订阅中读取数据并写入BigQuery。PubSub有500万个元素,滑动窗口为2分钟,批次/窗口为3 查看详情

Google Pubsub Python 客户端库订阅者随机崩溃

】GooglePubsubPython客户端库订阅者随机崩溃【英文标题】:GooglePubsubPythonClientlibrarysubscribercrashesrandomly【发布时间】:2018-08-2702:30:40【问题描述】:有人可以帮助我使用GooglePubsubPython客户端库吗?我正在密切关注https://cloud.google.com/... 查看详情

Google pubsub 死字在 golang 中不起作用

...golang【发布时间】:2020-08-2410:42:21【问题描述】:我尝试使用googlepubsub死字。我使用控制台云为1个订阅启用死信。我已经将死信主题和maxAttemptDelivery属性设置为5。我的期望是如果1条消息在1个订阅中重新传递超过5次,则该消息... 查看详情

GCP PubSub:Python 中的同步拉取订阅者?

】GCPPubSub:Python中的同步拉取订阅者?【英文标题】:GCPPubSub:SynchronousPullSubscriberinPython?【发布时间】:2018-06-2004:04:31【问题描述】:全部,我正在尝试学习如何使用GCPPubSub,并且我可以通过CLI命令(创建主题、订阅、发布到主... 查看详情

pubsub 订阅者出错:超出最大消息大小

】pubsub订阅者出错:超出最大消息大小【英文标题】:Errorinpubsubsubscriber:Maxmessagesizeexceeded【发布时间】:2017-06-2318:45:27【问题描述】:我正在为我的应用程序使用GoogleCloudPubsub。pubsub主题的订阅者是用Javascript编写的,并在Nodejs... 查看详情

没有消息时的 Google Cloud PubSub 费用

...019-07-3112:47:58【问题描述】:我正在查看Pub/Sub定价,如果订阅者点击端点但没有收到任何消息,我无法获得价格。因为我的想法是24小时运行工人。【问题讨论】:【参考方案1】:使用今天的定价模式,如果您的订阅者发送Pull... 查看详情

Google pubsub_v1 订阅者拉“打开的文件太多”

】Googlepubsub_v1订阅者拉“打开的文件太多”【英文标题】:Googlepubsub_v1subscriberpull"toomanyfilesopen"【发布时间】:2019-09-2618:17:15【问题描述】:好像有问题谷歌-云-pubsub==0.39.1google-api-python-client==1.7.8当凭据变坏时执行拉入循... 查看详情

从 PubSub 推送订阅认证到云功能

...ion【发布时间】:2021-05-0603:05:49【问题描述】:我们正在使用PubSub进行排队,利用指向http触发云功能的推送订阅。根据this文档,CloudRun和AppEngine都将对来自PubSub的请求进行身份验证,但未列出云功能。我们使用了其他谷歌服务... 查看详情