如何组合两个结果并将其传递到 apache-beam 管道中的下一步

     2023-02-16     187

关键词:

【中文标题】如何组合两个结果并将其传递到 apache-beam 管道中的下一步【英文标题】:How to combine two results and pipe it to next step in apache-beam pipeline 【发布时间】:2020-11-12 22:35:09 【问题描述】:

见下面代码sn-p, 我希望["metric1", "metric2"] 成为我对 RunTask.process 的输入。但是它分别用“metric1”和“metric2”运行了两次

def run():
  
  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
  p = beam.Pipeline(options=pipeline_options)

  root = p | 'Get source' >> beam.Create([
      "source_name" # maybe ["source_name"] makes more sense since my process function takes an array as an input?
  ])

  metric1 = root | "compute1" >> beam.ParDo(RunLongCompute(myarg="1")) #let's say it returns ["metic1"]
  metric2 = root | "compute2" >> beam.ParDo(RunLongCompute(myarg="2")) #let's say it returns ["metic2"]

  metric3 = (metric1, metric2) | beam.Flatten() | beam.ParDo(RunTask()) # I want ["metric1", "metric2"] to be my input for RunTask.process. However it was run twice with "metric1" and "metric2" respectively

  

【问题讨论】:

【参考方案1】:

我了解您希望以遵循以下语法的方式加入两个 PCollection:['element1','element2']。为了实现这一点,您可以使用CoGroupByKey() 而不是Flatten()。

考虑到您的代码 sn-p,语法将:

def run():
  
  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
  p = beam.Pipeline(options=pipeline_options)

  root = p | 'Get source' >> beam.Create([
      "source_name" # maybe ["source_name"] makes more sense since my process function takes an array as an input?
  ])

  metric1 = root | "compute1" >> beam.ParDo(RunLongCompute(myarg="1")) #let's say it returns ["metic1"]
  metric2 = root | "compute2" >> beam.ParDo(RunLongCompute(myarg="2")) #let's say it returns ["metic2"]

  metric3 = (
       (metric1, metric2) 
       | beam.CoGroupByKey() 
       | beam.ParDo(RunTask()) 
 )

我想指出 Flatten() 和 CoGroupByKey() 之间的区别。

1) Flatten()接收两个或多个存储相同数据类型的PCollection,并将它们合并为一个逻辑PCollection。例如,

import apache_beam as beam

from apache_beam import Flatten, Create, ParDo, Map

p = beam.Pipeline()

adress_list = [
    ('leo', 'George St. 32'),
    ('ralph', 'Pyrmont St. 30'),
    ('mary', '10th Av.'),
    ('carly', 'Marina Bay 1'),
]
city_list = [
    ('leo', 'Sydney'),
    ('ralph', 'Sydney'),
    ('mary', 'NYC'),
    ('carly', 'Brisbane'),
]

street = p | 'CreateEmails' >> beam.Create(adress_list)
city = p | 'CreatePhones' >> beam.Create(city_list)

resul =(
    (street,city)
    |beam.Flatten()
    |ParDo(print)
)

p.run()

还有输出,

('leo', 'George St. 32')
('ralph', 'Pyrmont St. 30')
('mary', '10th Av.')
('carly', 'Marina Bay 1')
('leo', 'Sydney')
('ralph', 'Sydney')
('mary', 'NYC')
('carly', 'Brisbane')

请注意,两个 PCollection 都在输出中。但是,一个附加到另一个。

2) CoGroupByKey() 执行两个或多个具有相同键类型的键值 PCollection 之间的关系连接。使用此方法,您将通过键执行连接,而不是像 Flatten() 中所做的那样追加。下面是一个例子,

import apache_beam as beam

from apache_beam import Flatten, Create, ParDo, Map

p = beam.Pipeline()

address_list = [
    ('leo', 'George St. 32'),
    ('ralph', 'Pyrmont St. 30'),
    ('mary', '10th Av.'),
    ('carly', 'Marina Bay 1'),
]
city_list = [
    ('leo', 'Sydney'),
    ('ralph', 'Sydney'),
    ('mary', 'NYC'),
    ('carly', 'Brisbane'),
]

street = p | 'CreateEmails' >> beam.Create(address_list)
city = p | 'CreatePhones' >> beam.Create(city_list)

results = (
    (street, city)
    | beam.CoGroupByKey()
    |ParDo(print)
    #| beam.io.WriteToText('delete.txt')
    
)

p.run()

还有输出,

('leo', (['George St. 32'], ['Sydney']))
('ralph', (['Pyrmont St. 30'], ['Sydney']))
('mary', (['10th Av.'], ['NYC']))
('carly', (['Marina Bay 1'], ['Brisbane']))

请注意,您需要一个 主键 才能加入结果。此外,此输出是您所期望的。

【讨论】:

【参考方案2】:

或者,使用侧面输入:

metrics3 = metric1 | beam.ParDo(RunTask(), metric2=beam.pvalue.AsIter(metric2))

在 RunTask 进程()中:

def process(self, element_from_metric1, metric2):
  ...

【讨论】:

如何在json android中返回多行并将其传递给sqlite

】如何在jsonandroid中返回多行并将其传递给sqlite【英文标题】:howtogetmultiplerowsreturnedinjsonandroidandpassittosqlite【发布时间】:2017-12-1504:09:57【问题描述】:我正在尝试通过JSON将下面提到的phpjson结果的输出输入到我的android应用程序... 查看详情

传递在 Bootstrap 日期选择器中选择的日期的结果,并将其放在 URL 链接中

...的“入住”和“退房”日期的酒店网站。单击按钮时,这两个日期都应传递到“预订”网站。例如,如 查看详情

如何遍历 recyclerview 适配器中的项目获取结果并将其传递给片段

】如何遍历recyclerview适配器中的项目获取结果并将其传递给片段【英文标题】:Howtoiterateoveritemsinsidearecyclerviewadaptergetaresultandpassittoafragment【发布时间】:2021-11-2919:01:31【问题描述】:下面是我的recyclerview适配器,每当我的recycle... 查看详情

如何添加 Croppie 结果以上传 php 并将结果传递到其他 php 页面

】如何添加Croppie结果以上传php并将结果传递到其他php页面【英文标题】:HowtoaddCroppieresulttouploadphpandpasstheresulttootherphppage【发布时间】:2016-11-0412:58:01【问题描述】:我正在尝试开发一个应用程序,该应用程序可以从facebooksdk或... 查看详情

拆分数据集并将子集并行传递给函数,然后重新组合结果

】拆分数据集并将子集并行传递给函数,然后重新组合结果【英文标题】:Splitdatasetandpassthesubsetsinparalleltofunctionthenrecombinetheresults【发布时间】:2013-06-2509:50:30【问题描述】:这是我尝试使用foreach包所做的事情。我有600行和58000... 查看详情

如何将变量从 Activity 传递到 Fragment,并将其传回?

】如何将变量从Activity传递到Fragment,并将其传回?【英文标题】:HowtopassavariablefromActivitytoFragment,andpassitback?【发布时间】:2013-06-3010:52:08【问题描述】:我目前正在制作一个安卓应用程序,我想在活动和片段之间传递一个日期... 查看详情

如何创建对象的新实例并将其传递到数组 SwiftUI

】如何创建对象的新实例并将其传递到数组SwiftUI【英文标题】:HowtocreatenewinstanceofobjectandpassitintoarraySwiftUI【发布时间】:2021-04-1019:02:54【问题描述】:我想创建一个简单的程序来编辑这个JSON:https://pastebin.com/7jXyvi6Y我创建了Smoo... 查看详情

Swift 4 - 如何传递设备令牌并将其发送到参数中的 sql

】Swift4-如何传递设备令牌并将其发送到参数中的sql【英文标题】:Swift4-Howtopassdevicetokenandsendittosqlintheparameter【发布时间】:2018-03-1311:14:52【问题描述】:您好,我在将device_token传递给我的参数swift4Xcode9.2时遇到问题视图控制器ov... 查看详情

如何将图像从前端(reactjs)传递到后端(nodejs)并将其上传到firebase存储?

】如何将图像从前端(reactjs)传递到后端(nodejs)并将其上传到firebase存储?【英文标题】:Howtopassimagefromfrontend(reactjs)tobackend(nodejs)anduploadittofirebasestorage?【发布时间】:2020-12-2513:09:51【问题描述】:我想将图像形式的前端(反... 查看详情

迭代每一行并将两列的值传递给查询并将每个结果附加到表中

】迭代每一行并将两列的值传递给查询并将每个结果附加到表中【英文标题】:ierateeachrowandpassthevaluesoftwocolumnstoaqueryandappendeachresulttothetable【发布时间】:2020-11-1915:11:16【问题描述】:我有一个表格,其结构如下,名称为table1sid... 查看详情

我如何将两个数据框列值作为键传递给2键到一个值字典,然后将结果传递到另一列?(代码片段)

...10000.0等)在我的数据框中,我有'UnitA','UnitB',乘数列。如何将值从UnitA和UnitB传递到字典并将值放入'乘数?我可以通过以下方法对单个键值字典myDict('A':'1),('B':2)执行此操作:df['Co 查看详情

如何组合不同的特征并将其提供给文本分类算法

】如何组合不同的特征并将其提供给文本分类算法【英文标题】:howtocombineandfeeddifferentfeaturestoanalgorithmfortextclassification【发布时间】:2016-04-1518:50:56【问题描述】:我有一些120k文本文件和12个类别,我想将这些文档分类到其中... 查看详情

如何传递 HttpInputStream 的内容并将其发送到 c# .net 中的另一个 Rest Api

】如何传递HttpInputStream的内容并将其发送到c#.net中的另一个RestApi【英文标题】:HowtopassonthecontentofaHttpInputStreamandsendittoanotherRestApiinc#.net【发布时间】:2019-07-0823:01:56【问题描述】:我有一个RESTAPIWeb服务,它充当中间件,将调用... 查看详情

使用闭包获取 API 结果并将其传递给另一个 ViewController

】使用闭包获取API结果并将其传递给另一个ViewController【英文标题】:GettingAPIResultwithclosureandpassingitintoanotherViewController【发布时间】:2022-01-1400:30:18【问题描述】:我在viewModel中得到一个API结果,如下所示:classHomePageViewModelvarap... 查看详情

如何将数据源从一个模块引用到另一个模块并将其作为变量传递给根模块?

】如何将数据源从一个模块引用到另一个模块并将其作为变量传递给根模块?【英文标题】:Howtoreferenceadatasourcefromamoduletoanothermoduleandpassitasavariabletorootmodule?【发布时间】:2020-03-2316:08:11【问题描述】:我的terraform目录结构如下... 查看详情

如何从服务器端加密数据并将其传递到客户端(javascript)并解密和使用它

】如何从服务器端加密数据并将其传递到客户端(javascript)并解密和使用它【英文标题】:Howtoencryptdatafromserversideandpassittoclientside(javascript)anddecryptanduseit【发布时间】:2014-04-2408:18:53【问题描述】:我在mvc的服务器端生成了一... 查看详情

如何将管道任务中的变量传递给 terraform 任务并将其应用到我的 terraform 代码中?

】如何将管道任务中的变量传递给terraform任务并将其应用到我的terraform代码中?【英文标题】:HowdoIpassavariablefromapipelinetaskintoaterraformtaskandapplyitinmyterraformcode?【发布时间】:2021-12-2822:52:48【问题描述】:所以我有一个带有任务... 查看详情

如何保留我的距离表示分数字段并将其映射到我的结果实体中?

】如何保留我的距离表示分数字段并将其映射到我的结果实体中?【英文标题】:HowcanIkeepmy,distancerepresenting,scorefieldandmapitintomyresultingentity?【发布时间】:2019-12-0719:28:51【问题描述】:我正在使用以下查询:NativeSearchQuerynsq=newNat... 查看详情