在 Spark Dataframe 中实现 Window 的重叠分区

     2023-04-18     78

关键词:

【中文标题】在 Spark Dataframe 中实现 Window 的重叠分区【英文标题】:Achieve overlapping partitions for Window in Spark Dataframe 【发布时间】:2019-07-31 07:49:23 【问题描述】:

我的情况如下: 我有一个由符号(分类)值的时间序列组成的数据框。它看起来类似于: idx symbol partition 0 A 0 1 C 0 2 B 0 3 C 0 4 A 0 5 C 1 6 B 1 7 D 1 8 C 1 9 B 1

我现在的目标是制作一个滑动窗口并将 n 个前导值收集到一个数组中。

我通过以下方式实现了这一目标:

sliding_window = Window.partitionBy("partition").orderBy("idx").rowsBetween(Window.currentRow, 2)
sliding_df = df.withColumn("sliding", collect_list("symbol").over(sliding_window))

这导致以下数据框:

    idx    symbol    partition    sliding
    0      A         0            [A, C, B]
    1      C         0            [C, B, C]
    2      B         0            [B, C, A]
    3      C         0               [C, A]
    4      A         0                  [A]
    5      C         1            [C, B, D]
    6      B         1            [B, D, C]
    7      D         1            [D, C, B]
    8      C         1               [C, B]
    9      B         1                  [B]

到目前为止一切顺利。由于 Spark 中的分区性质,滑动数组在到达分区末尾时会变得更短,因为缺少另一个分区中存在的前导行的信息。对于无法避免的时间序列的结尾,但希望滑动窗口不会错过中间的任何信息(本例中的索引 3 和 4)。

所需的 Dataframe 如下所示:

    idx    symbol    partition    sliding
    0      A         0            [A, C, B]
    1      C         0            [C, B, C]
    2      B         0            [B, C, A]
    3      C         0            [C, A, C]
    4      A         0            [A, C, B]
    5      C         1            [C, B, D]
    6      B         1            [B, D, C]
    7      D         1            [D, C, B]
    8      C         1               [C, B]
    9      B         1                  [B]

最好是有重叠的分区,这样索引 5 和 6 在两个分区中都存在冗余,我可以计算所需的滑动窗口。有什么方法可以实现吗?

使用重叠数据,原始 Dataframe 将如下所示:

    idx    symbol    partition    
    0      A         0        
    1      C         0        
    2      B         0        
    3      C         0        
    4      A         0
    5      C         0
    6      B         0
    5      C         1        
    6      B         1        
    7      D         1        
    8      C         1           
    9      B         1              

所以基本上分区 1 的前两行将被复制并附加为分区 0 的最后一行。

我考虑过过滤分区边界信息并在本地计算必要的信息,然后再加入原始数据帧,但我希望有一个更简单的方法。

【问题讨论】:

你能代表你想要的数据框吗,措辞没有太多想法。 我添加了我想要的输出。 【参考方案1】:

在您的示例中,如果您只是不对窗口进行分区,它将为您提供所需的内容

sliding_window = Window.orderBy("idx").rowsBetween(Window.currentRow, 2)
sliding_df = df.withColumn("sliding", collect_list("symbol").over(sliding_window))

给予

 idx    symbol    block    sliding
    0      A         0        [A, C, B]
    1      C         0        [C, B, C]
    2      B         0        [B, C, A]
    3      C         0        [C, A, C]
    4      A         0        [A, C, B]
    5      C         1        [C, B, D]
    6      B         1        [B, D, C]
    7      D         1        [D, C, B]
    8      C         1           [C, B]
    9      B         1              [B]

另外,请注意,collect_list() 不遵守顺序(由于 spark 的分布式特性),因此您的符号会在列表中混淆。

【讨论】:

问题是我明确想要执行滑动窗口分布(这就是我创建“块”列的原因,以在窗口聚合期间保持分区)。不定义分区会将所有数据移动到不可扩展的单个分区。 您可以使用此技巧复制一行:***.com/questions/50624745/… 要确定您需要复制哪些行,您可以使用 max("idx").over(Window.partitionBy(" block")) 并复制那些 idx == max 或 idx == max-1 我也想过这个问题,但只是复制行(5 和 6)并不能将它们带到正确的分区。它们将在第 1 块中,但我需要它们在第 0 块中来计算滑动窗口。 我假设您事先在代码中使用了 repartition() ?如果在这段代码之后你不能移动它,我认为它是 rip。 在此之前没有重新分区调用。块列是使用 spark_partition_id 函数预先创建的。它旨在在使用窗口函数时保留现有的分区。我有一个想法

在 spark 中实现 informatica 逻辑

】在spark中实现informatica逻辑【英文标题】:Implementinginformaticalogicinspark【发布时间】:2018-06-2411:30:11【问题描述】:我们如何在spark中实现以下逻辑?如果列值为空,那么它应该返回\'\'如果ltrim(rtrim(column))为null那么它应该返回\'\... 查看详情

如何在 Apache Spark 中实现递归算法?

】如何在ApacheSpark中实现递归算法?【英文标题】:HowtoimplementrecursivealgorithmsinApacheSpark?【发布时间】:2021-09-0706:10:17【问题描述】:我有一个问题,我想在Spark中实现递归算法,并查看是否有任何建议可以在Spark中构建它,或者... 查看详情

在 Spark GraphX 中实现拓扑排序

】在SparkGraphX中实现拓扑排序【英文标题】:ImplementingtopologicalsortinSparkGraphX【发布时间】:2017-02-2718:55:31【问题描述】:我正在尝试使用Spark\'sGraphX库来实现topologicalsort。这是我目前写的代码:MyObject.scalaimportjava.util.ArrayListimports... 查看详情

如何在 Spark 中实现“交叉连接”?

】如何在Spark中实现“交叉连接”?【英文标题】:Howtoimplement"CrossJoin"inSpark?【发布时间】:2014-09-1113:38:22【问题描述】:我们计划将ApachePig代码迁移到新的Spark平台。Pig具有“Bag/Tuple/Field”概念,其行为类似于关系数据... 查看详情

从 Kafka 上的 JSON 消息在 Spark Streaming 中创建 Spark DataFrame

】从Kafka上的JSON消息在SparkStreaming中创建SparkDataFrame【英文标题】:CreateSparkDataFrameinSparkStreamingfromJSONMessageonKafka【发布时间】:2015-09-1314:05:50【问题描述】:我正在Scala中实现SparkStreaming,我从Kafka主题中提取JSON字符串,并希望将... 查看详情

如何在 Spark SQL(PySpark) 中实现自增

】如何在SparkSQL(PySpark)中实现自增【英文标题】:HowtoimplementautoincrementinsparkSQL(PySpark)【发布时间】:2016-10-2504:20:43【问题描述】:我需要在我的sparksql表中实现一个自动增量列,我该怎么做。请指导我。我正在使用pyspark2.0谢谢卡... 查看详情

使用 ForeachWriter 在 Spark 流中实现 Cassandra 接收器

】使用ForeachWriter在Spark流中实现Cassandra接收器【英文标题】:ImplementationofCassandrasinkinSparkstreamingusingForeachWriter【发布时间】:2019-02-2317:01:03【问题描述】:显然,Spark流中没有对Cassandra接收器的内置支持。我在网上找到了这个示... 查看详情

如何在 Spark UDAF 中实现 fastutils 映射?

】如何在SparkUDAF中实现fastutils映射?【英文标题】:HowdoIimplementafastutilsmapinaSparkUDAF?【发布时间】:2019-02-0523:47:53【问题描述】:我正在构建一个SparkUDAF,我将中间数据存储在一个fastutils映射中。架构如下所示:defbufferSchema=newStr... 查看详情

使用 scala 在 spark 中实现类似 MergeSort 的功能

】使用scala在spark中实现类似MergeSort的功能【英文标题】:ImplementaMergeSortlikefeatureinsparkwithscala【发布时间】:2016-05-1023:58:31【问题描述】:Spark版本1.2.1Scala版本2.10.4我有2个由数字字段关联的SchemaRDD:RDD1:(Bigtable-aboutamillionrecords)[A,... 查看详情

如何在 spark scala 中实现 uniqueConcatenate、uniqueCount [关闭]

】如何在sparkscala中实现uniqueConcatenate、uniqueCount[关闭]【英文标题】:howtoimplementuniqueConcatenate,uniqueCountinsparkscala[closed]【发布时间】:2022-01-1017:01:58【问题描述】:我正在尝试转换数据,旧代码在Tibco中并使用uniqueConcatenate、uniqueC... 查看详情

Scala - 如何在 Spark 的 map 函数中实现 Try

】Scala-如何在Spark的map函数中实现Try【英文标题】:Scala-howtoimplementTryinsideamapfunctioninSpark【发布时间】:2019-03-2106:45:03【问题描述】:由于map转换中的函数抛出java.lang.NullPointerException,我的Spark作业的一个阶段失败。我的想法是... 查看详情

在Scala spark中实现动态字符串插值?

】在Scalaspark中实现动态字符串插值?【英文标题】:AchivedynamicstringinterpolationinScalaspark?【发布时间】:2020-01-1910:03:05【问题描述】:我有一个字符串,其中包含需要进入我预期数据帧的.agg函数的函数。我的数据数据框看起来像v... 查看详情

如何在 Kubernetes 环境中实现 spark-cassandra 连接器的“repartitionByCassandraReplica”?

】如何在Kubernetes环境中实现spark-cassandra连接器的“repartitionByCassandraReplica”?【英文标题】:Howtoachievespark-cassandraconnector\'s"repartitionByCassandraReplica"inKubernetesenvironment?【发布时间】:2021-09-1206:08:05【问题描述】:我看到了... 查看详情

如何在 Apache Spark Java 或 Scala 中实现这一点?

】如何在ApacheSparkJava或Scala中实现这一点?【英文标题】:HowdoIachievethisinApacheSparkJavaorScala?【发布时间】:2018-08-0102:14:44【问题描述】:汽车上的设备在行程开始时不会发送TRIPID,但会在行程结束时发送一个。如何将对应的TRIPIDS... 查看详情

任何人都可以在 Spark 中实现 CombineByKey() 而不是 GroupByKey() 来对元素进行分组吗?

】任何人都可以在Spark中实现CombineByKey()而不是GroupByKey()来对元素进行分组吗?【英文标题】:CananyoneimplementCombineByKey()insteadofGroupByKey()inSparkinordertogroupelements?【发布时间】:2020-03-2614:05:05【问题描述】:我正在尝试对我创建的RDD... 查看详情

如何在 Win32 窗口中实现 MFC 资源?

】如何在Win32窗口中实现MFC资源?【英文标题】:HowcanIimplementaMFCresourcewithinaWin32window?【发布时间】:2018-03-2102:19:44【问题描述】:我有两个项目,一个使用Win32API的旧项目,它有一个使用Win32图形的旧工具栏,另一个使用具有更... 查看详情

使用Apache Spark实现python功能

...时间】:2015-12-1906:28:12【问题描述】:我有一个要在Spark中实现的python代码,但是我无法为在Spark1.1版本中实现的RDD获得正确的逻辑。这段代码在Python中完美运行,但我想用这段代码在Spark中实现。importlxml.etreeimportcs 查看详情

如何在 Spark DataFrame 中添加常量列?

】如何在SparkDataFrame中添加常量列?【英文标题】:HowtoaddaconstantcolumninaSparkDataFrame?【发布时间】:2015-09-2518:17:14【问题描述】:我想在DataFrame中添加一个具有任意值的列(每一行都相同)。当我使用withColumn时出现错误,如下所... 查看详情