Azure Databricks Scala:如何替换相应层次结构之后的行

     2023-04-17     174

关键词:

【中文标题】Azure Databricks Scala:如何替换相应层次结构之后的行【英文标题】:Azure Databricks Scala : How to replace rows following a respective hirarchy 【发布时间】:2020-07-05 18:22:55 【问题描述】:

记住以下数据集:

我想获得

如您所见,基本上这个想法是遵循 ACTUAL_ID 列指示的路径,直到它为空(如果它还没有)

我尝试使用 udf 来传递完整的初始 Dataframe,并且递归会找到我想要的内容,但似乎无法将 Dataframes 传递给 UDF。我也考虑过替换一行的值,但似乎这是不可能的。

我最近的尝试:

def calculateLatestImdate(df: DataFrame, lookupId: String) : String = 
  var foundId = df.filter($"ID" === lookupId).select($"ACTUAL_ID").first.getAs[String]("ID");
  if (foundId == "" || foundId == null)
  
    lookupId
  
  else
  
    calculateLatestImdate(df, foundId);
  


val calculateLatestImdateUdf = udf((df:DataFrame, s:String) => 
  calculateLatestImdate(df,s)
)

val df = sc.parallelize(Seq(("1", "", "A"), ("2", "3", "B"), ("3", "6", "C"), ("4", "5", "D"), ("5", "", "E"), ("6", "", "F"))).toDF("ID","ACTUAL_ID", "DATA")

val finalDf = df.withColumn("FINAL_ID", when(isEmpty($"ACTUAL_ID"), $"ID").otherwise(calculateLatestImdateUdf(df, $"ACTUAL_ID")))

【问题讨论】:

【参考方案1】:

这对我来说有点像一个图形问题,所以我使用 Scala 和图形框架想出了一个答案。它利用了connectedComponents 算法和图框的outDegrees 方法。我假设根据您的样本数据,每棵树的末端都是唯一的,但需要检查这个假设。我有兴趣了解更多数据的性能如何,但请告诉我您对解决方案的看法。

完整的脚本:

// NB graphframes had to be installed separately with the right Scala version 
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.graphframes._


// Create the test data

// Vertices dataframe
val v2 = sqlContext.createDataFrame(List(
  ( 1, 0, "A" ), ( 2, 3, "B" ), ( 3, 6, "C" ),
    ( 4, 5, "D" ), ( 5, 0, "E" ), ( 6, 0, "F" )
)).toDF("id", "actual_id", "data")

// Edge dataframe
val e2 = sqlContext.createDataFrame(List(
  (2, 3, "is linked to"),
  (3, 6, "is linked to"),
  (4, 5, "is linked to")
)).toDF("src", "dst", "relationship")


// Create the graph frame
val g2 = GraphFrame(v2, e2)
print(g2)


// The connected components adds a component id to each 'group'
sc.setCheckpointDir("/tmp/graphframes-example-connected-components")

val components = g2.connectedComponents.run() // doesn't work on Spark 1.4
display(components)




// "end" of tree nodes have no outDegree, so add that in to the component df
val endOfTree = components.join(g2.outDegrees, Seq("id"), "left")
  .select("component", "data")
  .where("outDegree is null")

endOfTree.show()


components.as("c").join(endOfTree.as("t"), $"c.component" === $"t.component")
  .select($"c.id", $"c.component", $"t.data")
  .orderBy("id")
  .show()

我的结果:

如果您的数据已经在数据框中,只需使用selectwhere 过滤器就可以轻松地从原始数据框生成边缘数据框,例如

// Create the GraphFrame from the dataframe
val v2 = df

val e2 = df
  .select("id", "actual_id")
  .withColumn("rel", lit("is linked to"))
  .where("actual_id > 0")
  .toDF("src", "dst", "rel")

val g2 = GraphFrame(v2, e2)
print(g2)

g2.vertices.show()
g2.edges.show()

【讨论】:

您展示的内容似乎有效,我唯一不喜欢的是我需要将连接放在单独的数据框中(您的变量 e2),这对于一般情况是未知的但我想它可以通过一个简单的 select 来计算,其中 actual_id 为空。 我已经用一个例子更新了我的答案,但你是对的 - 在创建 GraphFrame 之前生成边缘数据框很容易,只需 selectwhere。我有兴趣用一些有意义的音量来尝试这两种方法——你在看什么样的音量?如果需要,我可能会创建一些虚拟数据。 这将用于 80MB(大约几十万行)的 csv。我还没有尝试过你的方法,但会尝试检查它。 嗨,你是怎么处理这个音量的?我很想知道它是如何执行的,以及图形方法是否最终成为解决这个特定问题的好方法?【参考方案2】:

相信我已经找到了问题的答案。

def calculateLatestId(df: DataFrame) : DataFrame = 
  var joinedDf = df.as("df1").join(df.as("df2"), $"df1.ACTUAL_ID" === $"df2.ID", "outer").withColumn("FINAL_ID", when($"df2.ID".isNull, $"df1.ID").when($"df2.ACTUAL_ID".isNotNull, $"df2.ACTUAL_ID").otherwise($"df2.ID")).select($"df1.*", $"FINAL_ID").filter($"df1.ID".isNotNull)

  val differentIds = joinedDf.filter($"df1.ACTUAL_ID" =!= $"FINAL_ID")

  joinedDf = joinedDf.withColumn("ACTUAL_ID", $"FINAL_ID").drop($"FINAL_ID")
  
  if(differentIds.count > 0)
  
    calculateLatestId(joinedDf)
  
  else
  
    joinedDf = joinedDf.as("df1").join(joinedDf.as("df2"), $"df1.ACTUAL_ID" === $"df2.ID", "inner").select($"df1.ID", $"df2.*").drop($"df2.ID")
    joinedDf
  

我相信可以通过某种方式提高性能,可能是通过减少每次迭代后的行数并在最后进行某种连接 + 清理。

【讨论】:

很高兴知道您的问题已解决。您可以接受它作为答案(单击答案旁边的复选标记以将其从灰色切换为已填充。)。这对其他社区成员可能是有益的。谢谢。

Azure Datalake Store Gen2 使用 scala spark 库从 Databricks 读取文件

】AzureDatalakeStoreGen2使用scalaspark库从Databricks读取文件【英文标题】:AzureDatalakeStoreGen2readfilesfromDatabricksusingascalasparklibrary【发布时间】:2020-06-2808:22:42【问题描述】:我正在尝试在AzureDatabricks(不是笔记本)上部署Scala库来执行... 查看详情

使用 6.4 版扩展支持(包括 Apache Spark 2.4.5、Scala 2.11)在 azure databricks 上启动集群时出现问题

】使用6.4版扩展支持(包括ApacheSpark2.4.5、Scala2.11)在azuredatabricks上启动集群时出现问题【英文标题】:Problemstartingclusteronazuredatabrickswithversion6.4ExtendedSupport(includesApacheSpark2.4.5,Scala2.11)【发布时间】:2022-01-2317:51:10【问题描述】:... 查看详情

Azure Databricks:如何在 Databricks 群集中添加 Spark 配置

】AzureDatabricks:如何在Databricks群集中添加Spark配置【英文标题】:AzureDatabricks:HowtoaddSparkconfigurationinDatabrickscluster【发布时间】:2020-02-2911:33:02【问题描述】:我正在使用SparkDatabricks集群并希望添加自定义的Spark配置。有一个关于... 查看详情

如何强制 Azure 数据工厂数据流使用 Databricks

】如何强制Azure数据工厂数据流使用Databricks【英文标题】:HowtoforceAzureDataFactoryDataFlowstouseDatabricks【发布时间】:2019-09-2819:36:06【问题描述】:我正在使用Azure数据工厂及其新的数据流功能。这是一个应该使用Databricks进行数据转... 查看详情

如何在 Databricks 的 PySpark 中使用在 Scala 中创建的 DataFrame

】如何在Databricks的PySpark中使用在Scala中创建的DataFrame【英文标题】:HowtoUseDataFrameCreatedinScalainDatabricks\'PySpark【发布时间】:2019-11-1712:50:22【问题描述】:我的Databricks笔记本使用Python。笔记本中的一些代码是用Scala编写的(使用%... 查看详情

作业终止后如何将生成的文件从 Azure Databricks 导出到 Azure DevOps?

】作业终止后如何将生成的文件从AzureDatabricks导出到AzureDevOps?【英文标题】:HowtoexportfilesgeneratedtoAzureDevOpsfromAzureDatabricksafterajobterminates?【发布时间】:2021-10-2103:07:23【问题描述】:我们正在使用AzureDevOps向Databricks提交训练作... 查看详情

如何获取 Azure Databricks 笔记本运行详细信息

】如何获取AzureDatabricks笔记本运行详细信息【英文标题】:HowtofetchAzureDatabricksnotebookrundetails【发布时间】:2020-10-0606:31:18【问题描述】:我正在使用Azure数据工厂运行我的databricks笔记本,它在运行时创建作业集群,现在我想知... 查看详情

如何列出 Azure Databricks 中的所有挂载点?

】如何列出AzureDatabricks中的所有挂载点?【英文标题】:HowtolistallthemountpointsinAzureDatabricks?【发布时间】:2020-09-2416:38:03【问题描述】:我尝试使用这个%fslsdbfs:/mnt,但我想知道这样做会给我所有的挂载点吗?【问题讨论】:【... 查看详情

Azure Databricks - 解释 databricks 中的安装语法

】AzureDatabricks-解释databricks中的安装语法【英文标题】:AzureDatabricks-Explainthemountingsyntaxindatabricks【发布时间】:2019-12-1206:55:43【问题描述】:我是azure和databricks的新手,我了解了如何安装blob和使用,但我有一些疑问,我还没有... 查看详情

如何在 Python 中从 Azure Databricks 插入 Azure SQL 数据库

】如何在Python中从AzureDatabricks插入AzureSQL数据库【英文标题】:HowtoINSERTINTOAzureSQLdatabasefromAzureDatabricksinPython【发布时间】:2019-04-0109:19:34【问题描述】:由于pyodbc无法安装到Azuredatabricks,我正在尝试使用jdbc通过Python将数据插入Az... 查看详情

如何使用 SAS 读取 Azure databricks 中的 blob

】如何使用SAS读取Azuredatabricks中的blob【英文标题】:HowtoreadablobinAzuredatabrickswithSAS【发布时间】:2020-07-0719:22:41【问题描述】:我是Databricks的新手。我编写示例代码来读取AzureDatabricks中的存储Blob。blob_account_name="sars"blob_container_n... 查看详情

使用 Python/Scala 的 Databricks 雪花表

】使用Python/Scala的Databricks雪花表【英文标题】:SnowflakeTablefromDatabricksusingPython/Scala【发布时间】:2021-10-1210:28:14【问题描述】:谁能帮帮我?我想使用Python/Scala从Databricks在Snowflake中创建一个表并将数据放入其中。下面是我的代... 查看详情

如何在 Azure Databricks PySpark 中执行存储过程?

】如何在AzureDatabricksPySpark中执行存储过程?【英文标题】:HowtoexecuteastoredprocedureinAzureDatabricksPySpark?【发布时间】:2020-06-0619:28:45【问题描述】:我能够在AzureDatabricks中使用PySpark执行简单的SQL语句,但我想改为执行存储过程。... 查看详情

如何使用 Azure Synapse 在 Databricks 上删除表或删除行?

】如何使用AzureSynapse在Databricks上删除表或删除行?【英文标题】:HowtodroptableordroprowonDatabrickswithAzureSynapse?【发布时间】:2021-08-2202:35:44【问题描述】:我想使用DROPTABLE删除我的表,但我的Databricks上出现错误JDBC,但如果我使用SE... 查看详情

如何在 Databricks 上将 Azure Synapse Dataframe 转换为 JSON?

】如何在Databricks上将AzureSynapseDataframe转换为JSON?【英文标题】:HowtoconvertAzureSynapseDataframeintoJSONonDatabricks?【发布时间】:2021-08-1812:24:04【问题描述】:我可以将我的AzureSynapseDataframe转换为JSON吗?因为当我尝试它时,它出错了。... 查看详情

如何从 Azure Databricks 在 Azure Blob 中创建一个空文件夹

】如何从AzureDatabricks在AzureBlob中创建一个空文件夹【英文标题】:HowtocreateaemptyfolderinAzureBlobfromAzuredatabricks【发布时间】:2020-10-1418:11:55【问题描述】:我有一个场景,我想列出AzureBlob目录中的所有文件夹。如果没有文件夹存在... 查看详情

如何在 Azure Databricks 中使用 JDBC 从 PostgreSQL 读取表数据?

】如何在AzureDatabricks中使用JDBC从PostgreSQL读取表数据?【英文标题】:HowtoreadtabledatafromPostgreSQLusingJDBCinAzureDatabricks?【发布时间】:2020-09-0503:03:32【问题描述】:我正在尝试使用pyspark读取Azure云订阅中可用的postgreSQL表,但出现以... 查看详情

如何在 Azure Databricks 中保存自定义 python 类对象?

】如何在AzureDatabricks中保存自定义python类对象?【英文标题】:HowdoyousaveacustompythonclassobjectinAzureDatabricks?【发布时间】:2021-12-1710:34:49【问题描述】:我已经编写了一个python类,用于在分类建模之前对数据进行预处理。我用sklear... 查看详情