按最近的时间戳对 Spark DataFrame 中的行进行重复数据删除

     2023-04-15     218

关键词:

【中文标题】按最近的时间戳对 Spark DataFrame 中的行进行重复数据删除【英文标题】:Dedupe rows in Spark DataFrame by most recent timestamp 【发布时间】:2018-09-20 17:45:51 【问题描述】:

我有一个DataFrame,其架构如下:

root
|- documentId
|- timestamp
|- anotherField

例如,

"d1", "2018-09-20 10:00:00", "blah1"
"d2", "2018-09-20 09:00:00", "blah2"
"d1", "2018-09-20 10:01:00", "blahnew"

请注意,为了便于理解(以及我的方便),我将时间戳显示为字符串。它实际上是一个long,表示自纪元以来的毫秒数。

如此处所示,有重复的行(第 1 行和第 3 行)具有相同的 documentId 但不同的 timestamp(以及可能不同的其他字段)。我想对每个 documentId 进行重复数据删除并仅保留最近的(基于 timestamp)行。

一个简单的df.groupBy("documentId").agg(max("timestamp), ...) 似乎不太可能在这里工作,因为我不知道如何保留对应于满足max("timestamp") 的行中的其他字段。

所以,我想出了一个复杂的方法。

// first find the max timestamp corresponding to each documentId
val mostRecent = df
    .select("documentId", "timestamp")
      .groupBy("documentId")
        .agg(max("timestamp"))

// now join with the original df on timestamp to retain
val dedupedDf = df.join(mostRecent, Seq("documentId", "timestamp"), "inner")

生成的dedupedDf 应该只包含与每个documentId 的最新条目相对应的那些行。

虽然这可行,但我认为这不是正确(或有效)的方法,因为我使用的是 join,这似乎没有必要。

我怎样才能做得更好?我正在寻找纯粹的基于“DataFrame”的解决方案,而不是基于 RDD 的方法(因为 DataBricks 人员在研讨会上反复告诉我们使用 DataFrames 而不是 RDDs)。

【问题讨论】:

您没有使用正常意义上的“重复(行)”。请使用足够的单词和句子来清楚地表达您的意思。这里显然有多行具有相同的子行值。 【参考方案1】:

查看下面的代码可以帮助您实现目标,

val df = Seq(
  ("d1", "2018-09-20 10:00:00", "blah1"),
  ("d2", "2018-09-20 09:00:00", "blah2"),
  ("d1", "2018-09-20 10:01:00", "blahnew")
).toDF("documentId","timestamp","anotherField")

import org.apache.spark.sql.functions.row_number
import org.apache.spark.sql.expressions.Window

val w = Window.partitionBy($"documentId").orderBy($"timestamp".desc)
val Resultdf = df.withColumn("rownum", row_number.over(w))
     .where($"rownum" === 1).drop("rownum")

Resultdf.show()

输入:

+----------+-------------------+------------+
|documentId|          timestamp|anotherField|
+----------+-------------------+------------+
|        d1|2018-09-20 10:00:00|       blah1|
|        d2|2018-09-20 09:00:00|       blah2|
|        d1|2018-09-20 10:01:00|     blahnew|
+----------+-------------------+------------+

输出:

+----------+-------------------+------------+
|documentId|          timestamp|anotherField|
+----------+-------------------+------------+
|        d2|2018-09-20 09:00:00|       blah2|
|        d1|2018-09-20 10:01:00|     blahnew|
+----------+-------------------+------------+

【讨论】:

这是实现结果的最有效方式吗?

按日期排序 Spark DataFrame 列的数组

】按日期排序SparkDataFrame列的数组【英文标题】:SortbydateanArrayofaSparkDataFrameColumn【发布时间】:2016-11-1411:10:08【问题描述】:我的DataFrame格式如下:+---+------------------------------------------------------+|Id|DateInfos|+---+---------------------- 查看详情

Spark:保存按“虚拟”列分区的 DataFrame

】Spark:保存按“虚拟”列分区的DataFrame【英文标题】:Spark:saveDataFramepartitionedby"virtual"column【发布时间】:2016-02-1616:07:50【问题描述】:我正在使用PySpark执行经典的ETL作业(加载数据集、处理它、保存它),并希望将我... 查看详情

如何按时间戳对数组进行排序?

】如何按时间戳对数组进行排序?【英文标题】:Howtosortanarraybytimestamp?【发布时间】:2020-01-0121:12:06【问题描述】:我正在尝试按时间戳值对array进行排序。我想按升序对它们进行排序,如果它们中的任何一个具有不确定的属性... 查看详情

在 Cloudkit.JS 中按时间戳对记录进行排序

】在Cloudkit.JS中按时间戳对记录进行排序【英文标题】:SortrecordsbytimestampinCloudkit.JS【发布时间】:2015-06-2618:48:21【问题描述】:我正在尝试按创建的时间戳对这些记录进行排序,但我使用cloudkit-js示例代码作为基础。我看到他们... 查看详情

Flutter 按时间戳对 Firebase 快照进行排序

】Flutter按时间戳对Firebase快照进行排序【英文标题】:FluttersortFirebasesnapshotbytimestamp【发布时间】:2019-02-0823:59:57【问题描述】:我正在尝试按时间戳对快照进行排序,但返回原始顺序。数据结构看起来像这样我有两个快照,时... 查看详情

在 SQL 中:按相关元素时间戳对唯一元素进行排序

】在SQL中:按相关元素时间戳对唯一元素进行排序【英文标题】:InSQL:orderuniqueelementsbyrelatedelementstimestamp【发布时间】:2017-03-2110:01:52【问题描述】:我有两个表“贷款”和“付款”。贷款具有贷款状态,并且可能附有付款。付... 查看详情

如何在 Spark Dataframe 上获取按结果分组的元组?

】如何在SparkDataframe上获取按结果分组的元组?【英文标题】:HowtogetaTupleforthegroupedbyresultonaSparkDataframe?【发布时间】:2018-03-1304:29:16【问题描述】:我正在尝试根据id对实体进行分组,运行以下代码我有这个数据框:valpet_type_cou... 查看详情

按时间戳对组内的行进行排序

】按时间戳对组内的行进行排序【英文标题】:orderrowswithingroupbytimestamp【发布时间】:2013-10-2423:36:07【问题描述】:我正在尝试为私人消息构建和收件箱。$sqlInbox="SELECTsender,receiver,message,parent,rViewFROMmessagesWHEREreceiver=\'$log_username\... 查看详情

Spark DataFrame通过GroupBy删除重复项保持第一

】SparkDataFrame通过GroupBy删除重复项保持第一【英文标题】:SparkDataFrameRemovingduplicatesviaGroupBykeepfirst【发布时间】:2016-11-2300:30:05【问题描述】:我正在使用groupBy函数从sparkDataFrame中删除重复项。对于每个组,我只想占据第一行,... 查看详情

如何按 Seq[org.apache.spark.sql.Column] 降序排序 spark DataFrame?

】如何按Seq[org.apache.spark.sql.Column]降序排序sparkDataFrame?【英文标题】:HowtosortsparkDataFramebySeq[org.apache.spark.sql.Column]indescendingorder?【发布时间】:2020-01-0904:13:36【问题描述】:有如下DataFrame:importspark.implicits._valdf=List(("id 查看详情

当时间戳未被归类为索引时,如何按时间戳对数据帧进行切片?

】当时间戳未被归类为索引时,如何按时间戳对数据帧进行切片?【英文标题】:HowcanIsliceadataframebytimestamp,whentimestampisn\'tclassifiedasindex?【发布时间】:2016-10-0509:59:23【问题描述】:如何使用时间戳拆分我的pandas数据帧?我拨打d... 查看详情

如何在 Spark SQL(DataFrame)的 UDF 中使用常量值

】如何在SparkSQL(DataFrame)的UDF中使用常量值【英文标题】:HowtouseconstantvalueinUDFofSparkSQL(DataFrame)【发布时间】:2015-04-0207:01:56【问题描述】:我有一个包含timestamp的数据框。要按时间(分钟、小时或天)聚合,我尝试过:valtoSeg... 查看详情

如何保证 Spark Dataframe 中的重新分区

】如何保证SparkDataframe中的重新分区【英文标题】:HowtoguaranteerepartitioninginSparkDataframe【发布时间】:2016-08-1615:38:51【问题描述】:我对ApacheSpark很陌生,我正在尝试按美国州重新分区数据框。然后我想将每个分区分成自己的RDD并... 查看详情

根据文件中的时间戳对文件进行排序

】根据文件中的时间戳对文件进行排序【英文标题】:Sortingfilesbasedonatimestampwithinthefile【发布时间】:2012-02-0920:31:55【问题描述】:我有一堆文件,其中包含一个带有时间戳的ascii标头WITHIN文件,然后是一大块二进制数据。我想... 查看详情

从 DataFrame 中按分区收集集合

】从DataFrame中按分区收集集合【英文标题】:CollectcollectionsbypartitionsfromDataFrame【发布时间】:2022-01-1615:00:40【问题描述】:我有按列分区的DataFrame:valdfDL=spark.read.option("delimiter",",").option("header",true).csv(file.getPath.toUri.getPath).repar 查看详情

如何从 Spark 2.0 中的 DataFrame 列创建数据集?

】如何从Spark2.0中的DataFrame列创建数据集?【英文标题】:HowtocreateDatasetfromDataFramecolumninSpark2.0?【发布时间】:2016-12-0610:32:02【问题描述】:是否可以从Spark2.0中的Dataframe列创建数据集?我有以下问题:我想从按日期分区的镶木... 查看详情

Spark Dataframe GroupBy 和计算复杂聚合函数

】SparkDataframeGroupBy和计算复杂聚合函数【英文标题】:SparkDataframeGroupByandcomputeComplexaggregatefunction【发布时间】:2017-10-0407:51:45【问题描述】:使用Spark数据框,我需要使用以下方法计算百分比复杂的公式:按“KEY”分组并计算“... 查看详情

使用 DataFrame API 时,自联接无法按预期工作

】使用DataFrameAPI时,自联接无法按预期工作【英文标题】:Self-joinnotworkingasexpectedwiththeDataFrameAPI【发布时间】:2017-10-3107:42:28【问题描述】:我正在尝试使用自联接从表中获取最新记录。它可以使用spark-sql工作,但不能使用sparkD... 查看详情