无法在 Airflow Python 3 中发布 Pubsub 消息

     2023-02-16     77

关键词:

【中文标题】无法在 Airflow Python 3 中发布 Pubsub 消息【英文标题】:Unable to publish Pubsub message in Airflow Python 3 【发布时间】:2019-08-07 00:18:10 【问题描述】:

我无法在带有 Python 3 的 Airflow 中使用 PubSubHook 发布。在 Python 2 中一切正常,但在 Python 3 中我收到此错误 models.py:1760 ERROR - Object of type 'bytes' is not JSON serializable。似乎在 Python 3 中对消息进行编码会导致 JSON 序列化程序无法处理的字节。

以下在 Python 2 中可以正常工作:

def send_message_to_pubsub(message):
    pubsub_message = 'data': b64encode(message)
    hook = PubSubHook(gcp_conn_id='google_cloud_default')
    hook.publish('project-name', 'topic-name', [pubsub_message])

示例here 不适用于 Python 3。

更新 1:

尝试了以下但得到错误:

def send_message_to_pubsub():
    message = 'Test message'
    pubsub_message = 'data': b64encode(message).decode()
    hook = PubSubHook(gcp_conn_id='google_cloud_default')
    hook.publish('project-name', 'topic-name', [pubsub_message])

base_task_runner.py:101 INFO - Job 1962: Subtask pub_sub_test [2019-03-18 17:10:28,903] models.py:1760 ERROR - a bytes-like object is required, not 'str'
base_task_runner.py:101 INFO - Job 1962: Subtask pub_sub_test Traceback (most recent call last):
base_task_runner.py:101 INFO - Job 1962: Subtask pub_sub_test   File "/usr/local/lib/airflow/airflow/models.py", line 1659, in _run_raw_task
base_task_runner.py:101 INFO - Job 1962: Subtask pub_sub_test     result = task_copy.execute(context=context)
base_task_runner.py:101 INFO - Job 1962: Subtask pub_sub_test   File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 95, in execute
base_task_runner.py:101 INFO - Job 1962: Subtask pub_sub_test     return_value = self.execute_callable()
base_task_runner.py:101 INFO - Job 1962: Subtask pub_sub_test   File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 100, in execute_callable
base_task_runner.py:101 INFO - Job 1962: Subtask pub_sub_test     return self.python_callable(*self.op_args, **self.op_kwargs)
base_task_runner.py:101 INFO - Job 1962: Subtask pub_sub_test   File "/home/airflow/gcs/dags/pubsub-test-dag.py", line 31, in send_message_to_pubsub
base_task_runner.py:101 INFO - Job 1962: Subtask pub_sub_test     pubsub_message = 'data': b64encode(message).decode()
base_task_runner.py:101 INFO - Job 1962: Subtask pub_sub_test   File "/opt/python3.6/lib/python3.6/base64.py", line 58, in b64encode
base_task_runner.py:101 INFO - Job 1962: Subtask pub_sub_test     encoded = binascii.b2a_base64(s, newline=False)
base_task_runner.py:101 INFO - Job 1962: Subtask pub_sub_test TypeError: a bytes-like object is required, not 'str'

更新 2:

尝试使用以下方法,导致不同的错误。这次来自 JSON 序列化器:

def send_message_to_pubsub():
    message = 'Test message'
    pubsub_message = 'data': b64encode(message.encode())
    hook = PubSubHook(gcp_conn_id='google_cloud_default')
    hook.publish('project', 'topic', [pubsub_message]) 

[2019-03-19 10:44:29,845] base_task_runner.py:101 INFO - Job 2172: Subtask pub_sub_test [2019-03-19 10:44:29,841] models.py:1760 ERROR - Object of type 'bytes' is not JSON serializable
[2019-03-19 10:44:29,846] base_task_runner.py:101 INFO - Job 2172: Subtask pub_sub_test Traceback (most recent call last):
[2019-03-19 10:44:29,846] base_task_runner.py:101 INFO - Job 2172: Subtask pub_sub_test   File "/usr/local/lib/airflow/airflow/models.py", line 1659, in _run_raw_task
[2019-03-19 10:44:29,847] base_task_runner.py:101 INFO - Job 2172: Subtask pub_sub_test     result = task_copy.execute(context=context)
[2019-03-19 10:44:29,847] base_task_runner.py:101 INFO - Job 2172: Subtask pub_sub_test   File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 95, in execute
[2019-03-19 10:44:29,847] base_task_runner.py:101 INFO - Job 2172: Subtask pub_sub_test     return_value = self.execute_callable()
[2019-03-19 10:44:29,847] base_task_runner.py:101 INFO - Job 2172: Subtask pub_sub_test   File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 100, in execute_callable
[2019-03-19 10:44:29,848] base_task_runner.py:101 INFO - Job 2172: Subtask pub_sub_test     return self.python_callable(*self.op_args, **self.op_kwargs)
[2019-03-19 10:44:29,848] base_task_runner.py:101 INFO - Job 2172: Subtask pub_sub_test   File "/home/airflow/gcs/dags/pubsub-test-dag.py", line 33, in send_message_to_pubsub
[2019-03-19 10:44:29,848] base_task_runner.py:101 INFO - Job 2172: Subtask pub_sub_test     hook.publish('project', 'topic', [pubsub_message])
[2019-03-19 10:44:29,848] base_task_runner.py:101 INFO - Job 2172: Subtask pub_sub_test   File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_pubsub_hook.py", line 75, in publish
[2019-03-19 10:44:29,849] base_task_runner.py:101 INFO - Job 2172: Subtask pub_sub_test     topic=full_topic, body=body)
[2019-03-19 10:44:29,849] base_task_runner.py:101 INFO - Job 2172: Subtask pub_sub_test   File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/discovery.py", line 795, in method
[2019-03-19 10:44:29,849] base_task_runner.py:101 INFO - Job 2172: Subtask pub_sub_test     actual_path_params, actual_query_params, body_value)
[2019-03-19 10:44:29,850] base_task_runner.py:101 INFO - Job 2172: Subtask pub_sub_test   File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/model.py", line 151, in request
[2019-03-19 10:44:29,850] base_task_runner.py:101 INFO - Job 2172: Subtask pub_sub_test     body_value = self.serialize(body_value)
[2019-03-19 10:44:29,850] base_task_runner.py:101 INFO - Job 2172: Subtask pub_sub_test   File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/model.py", line 260, in serialize
[2019-03-19 10:44:29,850] base_task_runner.py:101 INFO - Job 2172: Subtask pub_sub_test     return json.dumps(body_value)
[2019-03-19 10:44:29,851] base_task_runner.py:101 INFO - Job 2172: Subtask pub_sub_test   File "/opt/python3.6/lib/python3.6/json/__init__.py", line 231, in dumps
[2019-03-19 10:44:29,851] base_task_runner.py:101 INFO - Job 2172: Subtask pub_sub_test     return _default_encoder.encode(obj)
[2019-03-19 10:44:29,853] base_task_runner.py:101 INFO - Job 2172: Subtask pub_sub_test   File "/opt/python3.6/lib/python3.6/json/encoder.py", line 199, in encode
[2019-03-19 10:44:29,853] base_task_runner.py:101 INFO - Job 2172: Subtask pub_sub_test     chunks = self.iterencode(o, _one_shot=True)
[2019-03-19 10:44:29,853] base_task_runner.py:101 INFO - Job 2172: Subtask pub_sub_test   File "/opt/python3.6/lib/python3.6/json/encoder.py", line 257, in iterencode
[2019-03-19 10:44:29,854] base_task_runner.py:101 INFO - Job 2172: Subtask pub_sub_test     return _iterencode(o, 0)
[2019-03-19 10:44:29,852] models.py:1791 INFO - Marking task as FAILED.

【问题讨论】:

您的代码 sn-p 中的哪一行代码抛出了该错误消息? 如上所述,它在 models.py 中出错。似乎错误的原因是字符串在 Python 2 和 Python 3 中的存储方式。气流似乎正在使用 models.py 中的 JSON 序列化程序序列化请求(到 PubSub 的 REST 端点)。但是,JSON 序列化程序仅适用于字符串数据,并且在 Python 3 中对数据进行编码,如上所述,创建消息的字节版本。 您使用的是哪个版本的 Airflow? Airflow 版本 1.10.1 通过 GCP Cloud Composer,Python 版本 3.6 b64encode(message) 的结果是 Python 3 的字节数。将其更改为 b64encode(message).decode() 【参考方案1】:

这个问题有两个方面。

    根据base64 documentation,您的消息必须是bytes 类型而不是str。要验证这一点,请尝试assert isinstance(message, bytes)。这会出错。

解决方案取决于您的消息来自何处。

如果您的消息是一个字符串,您应该在发送到 base64 之前将其编码为字节:
b64encode(message.encode())
如果您的消息应该是 bytes 类型,您应该更改在 Python 中的读取方式。
    根据 Python 中的 JSON module documentation,不支持 byte 类型。它们必须是 str 类型。这意味着您发送到 PubSub API 的任何内容都必须是字符串。所以你可以把它解码成这样的字符串:
pubsub_message = 'data': b64encode(message.encode()).decode()

【讨论】:

根据更新 1 中的代码摘录,消息是纯文本。尝试b64encode(message.encode()),但这次出现了不同的错误。请参阅更新 2 两种解决方案都试过了吗? pubsub_message = 'data': b64encode(message.encode()).decode() 应该按照您的第二个建议进行尝试。它适用于pubsub_message = 'data': b64encode(message.encode()).decode()。非常感谢您的帮助!!!

使用 Python 和 Airflow 在电子邮件中发送 Redshift 查询结果

】使用Python和Airflow在电子邮件中发送Redshift查询结果【英文标题】:SendingRedshiftQueryResultinEmailwithPythonandAirflow【发布时间】:2021-08-2913:35:03【问题描述】:我正在制作每日DAG,它将在Redshift中运行查询并将结果表通过电子邮件发... 查看详情

Airflow Bigquery Hook:如何将结果保存在 python 变量中?

】AirflowBigqueryHook:如何将结果保存在python变量中?【英文标题】:AirflowBigqueryHook:howtosaveresultsinpythonvariable?【发布时间】:2021-04-0712:53:45【问题描述】:我在气流代码中使用了bigquery钩子。查询示例:selectcount(*)from\'table-name\';所... 查看详情

如何使用 Python 在 Airflow 中成功触发另一个 DAG 时触发 DAG?

】如何使用Python在Airflow中成功触发另一个DAG时触发DAG?【英文标题】:HowtoTriggeraDAGonthesuccessofaanotherDAGinAirflowusingPython?【发布时间】:2020-08-1407:32:12【问题描述】:我有一个pythonDAGParentJob和DAGChildJob。ChildJob中的任务应在成功完... 查看详情

Google Cloud Composer (Apache Airflow) 无法访问日志文件

】GoogleCloudComposer(ApacheAirflow)无法访问日志文件【英文标题】:GoogleCloudComposer(ApacheAirflow)cannotaccesslogfiles【发布时间】:2020-04-2102:20:10【问题描述】:我在GoogleCloudComposer(托管Airflow)中运行DAG,它在本地Airflow中运行良好。它所... 查看详情

无法在带有气流的 jinja 模板中使用 python 变量

...python变量【英文标题】:Can\'tusepythonvariableinjinjatemplatewithAirflow【发布时间】:2021-02-0311:16:50【问题描述】:我正在尝试使用Airflow在AWSEMR上运行11步,并遵循此code作为参考。由于使用EmrAddStepsOperator和EmrStepSensor进行11步会重复太多... 查看详情

如何使用 AirFlow 运行 python 文件的文件夹?

】如何使用AirFlow运行python文件的文件夹?【英文标题】:HowtouseAirFlowtorunafolderofpythonfiles?【发布时间】:2017-01-2908:03:24【问题描述】:我在一个Python文件文件夹中有一系列Python任务:file1.py、file2.py、...我阅读了Airflow文档,但没... 查看详情

在puckel/docker-airflow中启用凭据(代码片段)

我正在使用puckel/docker-airflow来部署气流。目前,网络服务器不要求任何登录凭据。如何添加用户?也许我必须在docker-compose.yml中添加一些环境变量,但我无法找到它。docker-compose文件是here提前致谢。答案创建您自己的airflow.cfg(... 查看详情

如何在 Airflow 中创建条件任务

】如何在Airflow中创建条件任务【英文标题】:HowtocreateaconditionaltaskinAirflow【发布时间】:2017-09-2611:11:29【问题描述】:我想在Airflow中创建一个条件任务,如下面的架构中所述。预期的情况如下:任务1执行如果Task1成功,则执行T... 查看详情

在 Airflow DAG 中导入本地模块(python 脚本)

】在AirflowDAG中导入本地模块(python脚本)【英文标题】:Importinglocalmodule(pythonscript)inAirflowDAG【发布时间】:2018-10-1311:49:26【问题描述】:我正在尝试将本地模块(python脚本)导入我的DAG。目录结构:airflow/├──dag│  ├──_... 查看详情

如何在 Airflow 中运行 Spark 代码?

】如何在Airflow中运行Spark代码?【英文标题】:HowtorunSparkcodeinAirflow?【发布时间】:2017-02-1104:10:08【问题描述】:地球人你好!我正在使用Airflow来安排和运行Spark任务。这次我发现的只是Airflow可以管理的pythonDAG。DAG示例:spark_co... 查看详情

无法导入 Airflow 插件

】无法导入Airflow插件【英文标题】:Can\'timportAirflowplugins【发布时间】:2017-10-1000:10:54【问题描述】:跟随气流教程here。问题:网络服务器返回以下错误BrokenDAG:[/usr/local/airflow/dags/test_operator.py]cannotimportnameMyFirstOperator注意事项:... 查看详情

无法从 Airflow 应用程序访问 Vault 服务器

】无法从Airflow应用程序访问Vault服务器【英文标题】:UnabletoreachVaultserverfromAirflowapplication【发布时间】:2021-09-2414:39:46【问题描述】:我正在尝试使用docker-compose在我的本地计算机上使用Airflow将Vault设置为ssecrets后端,但无法建... 查看详情

airflow是啥意思

空调滤芯上airflow表示空气的流动方法的意思,现在市面上买到的空调滤芯,大部分的侧面都有箭头指示,最常见的是用airflow和箭头的方式指示,另一种是用UP和箭头的方式指示。airflow标志是帮助安装的技师区分正反面的,装反... 查看详情

无法在 Python 3.6 中运行 dlib 模块

】无法在Python3.6中运行dlib模块【英文标题】:CantrundlibmoduleinPython3.6【发布时间】:2018-07-2606:55:16【问题描述】:我已经在我的conda环境中为我的python3.6安装了dlib...但是当我在我的pythonidle中运行一组代码时,它显示没有为\'dlib\'... 查看详情

无法在 python 3.8 中访问嵌套的 JSON

】无法在python3.8中访问嵌套的JSON【英文标题】:Can\'treachnestedJSONinpython3.8【发布时间】:2020-05-0822:05:27【问题描述】:处理来自Websockets订阅的响应。响应如下:\'jsonrpc\':\'2.0\',\'method\':\'subscription\',\'params\':\'channel\':\'book.BTC-PERPETU... 查看详情

如何使用 Python 访问 Amazon EMR 错误消息

...发布时间】:2021-06-0401:08:52【问题描述】:我正在运行由Airflow启动的EMR集群,我需要某种方式将错误消息传递回Airflow。Airflow在Python中运行,所以我需要在python中完成。目前,错误日志位于配置详细信息下的“日志URI”部分。访... 查看详情

airflow常见问题汇总(代码片段)

airflow常见问题的排查记录如下:airflow的scheduler进程在执行一个任务后就挂起进入假死状态出现这个情况的一般原因是scheduler调度器生成了任务,但是无法发布出去。而日志中又没有什么错误信息。可能原因是Borker连接依赖库没安... 查看详情

无法在 Python 3.x 中登录

】无法在Python3.x中登录【英文标题】:Can\'tmakelogininPython3.x【发布时间】:2018-07-1803:55:56【问题描述】:我已经创建了一个数据库,需要检查那个数据库是否有相应的用户名和密码……如果有,它应该打印“成功”,否则不会…... 查看详情