Spark 2.0 数据集与数据帧

     2023-04-15     190

关键词:

【中文标题】Spark 2.0 数据集与数据帧【英文标题】:Spark 2.0 Dataset vs DataFrame 【发布时间】:2016-11-14 19:44:36 【问题描述】:

从 spark 2.0.1 开始我有一些问题。我阅读了很多文档,但到目前为止找不到足够的答案:

两者有什么区别 df.select("foo") df.select($"foo") 我是否理解正确 myDataSet.map(foo.someVal) 是类型安全的,不会转换为 RDD,但会保留在 DataSet 表示/没有额外开销(2.0.0 的性能明智) 所有其他命令,例如select, .. 只是语法糖。它们不是类型安全的,可以使用映射。如果没有 map 语句,我怎么能 df.select("foo") 类型安全? 为什么我应该使用 UDF / UADF 而不是地图(假设地图保留在数据集表示中)?

【问题讨论】:

有一个项目旨在为 Spark 提供更多类型安全性,同时保持高效执行路径:typelevel/frameless 【参考方案1】:
    df.select("foo")df.select($"foo") 之间的区别在于签名。前一个取至少一个String,后一个取零个或多个Columns。除此之外没有实际区别。

    myDataSet.map(foo.someVal) 类型检查,但由于任何Dataset 操作都使用RDD 对象,并且与DataFrame 操作相比,开销很大。我们来看一个简单的例子:

    case class FooBar(foo: Int, bar: String)
    val ds = Seq(FooBar(1, "x")).toDS
    ds.map(_.foo).explain
    
    == Physical Plan ==
    *SerializeFromObject [input[0, int, true] AS value#123]
    +- *MapElements <function1>, obj#122: int
       +- *DeserializeToObject newInstance(class $line67.$read$$iw$$iw$FooBar), obj#121: $line67.$read$$iw$$iw$FooBar
          +- LocalTableScan [foo#117, bar#118]
    

    如您所见,此执行计划需要访问所有字段并且必须访问DeserializeToObject

    没有。一般来说,其他方法不是语法糖,并且会生成明显不同的执行计划。例如:

    ds.select($"foo").explain
    
    == Physical Plan ==
    LocalTableScan [foo#117]
    

    与之前显示的计划相比,它可以直接访问列。与其说是 API 的限制,不如说是操作语义不同的结果。

    如果没有 map 语句,我如何 df.select("foo") 类型安全?

    没有这样的选择。虽然类型化列允许您将静态 Dataset 转换为另一个静态类型化 Dataset

    ds.select($"bar".as[Int])
    

    没有类型安全。还有一些其他尝试包括类型安全优化操作,like typed aggregations,但这个实验性 API。

    为什么我应该使用 UDF / UADF 而不是地图

    这完全取决于您。 Spark 中的每个分布式数据结构都有自己的优点和缺点(参见例如Spark UDAF with ArrayType as bufferSchema performance issues)。

就我个人而言,我发现静态类型的 Dataset 最没用:

不提供与 Dataset[Row] 相同的优化范围(尽管它们共享存储格式和一些执行计划优化,但它并不能完全受益于代码生成或堆外存储),也不能访问所有DataFrame 的分析能力。

类型化转换是黑盒,有效地为优化器创建分析障碍。例如,选择(过滤器)不能被推送到类型转换:

ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].filter(x => true).where($"foo" === 1).explain
== Physical Plan ==
*Filter (foo#133 = 1)
+- *Filter <function1>.apply
   +- *HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))])
      +- Exchange hashpartitioning(foo#133, 200)
         +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))])
            +- LocalTableScan [foo#133, bar#134]

相比:

ds.groupBy("foo").agg(sum($"bar") as "bar").as[FooBar].where($"foo" === 1).explain
== Physical Plan ==
*HashAggregate(keys=[foo#133], functions=[sum(cast(bar#134 as double))])
+- Exchange hashpartitioning(foo#133, 200)
   +- *HashAggregate(keys=[foo#133], functions=[partial_sum(cast(bar#134 as double))])
      +- *Filter (foo#133 = 1)
         +- LocalTableScan [foo#133, bar#134] 

这会影响谓词下推或投影下推等功能。

没有RDDs 那样灵活,仅原生支持一小部分类型。

当使用as 方法转换Dataset 时,带有Encoders 的“类型安全”是有争议的。由于未使用签名对数据形状进行编码,因此编译器只能验证 Encoder 是否存在。

相关问题:

Perform a typed join in Scala with Spark Datasets Spark 2.0 DataSets groupByKey and divide operation and type safety

【讨论】:

感谢您的这篇非常有用的帖子。你认为你可以详细说明关于编码器类型安全的最后一点。你的意思是编译器可以基于 Encoder 对你的操作进行类型检查,但是不能保证你读取的数据(比如从 parquet 文件中)确实匹配这个 schema? @allstar 差不多。 as 应该比 as[U]: Dataset[U] 更多 as[U]: Try[Dataset[U]]。就像asInstanceOf... 但这只是冰山一角 - 很难保持在“可检查”路径中,具有复杂的管道(连接,Aggregators...)【参考方案2】:

Spark Dataset 比 Spark Dataframe 更强大。小例子 - 您只能创建 RowTuple 或任何原始数据类型的 Dataframe,但 Dataset 也可以让您创建任何非原始类型的 Dataset。即您可以从字面上创建对象类型的Dataset

例如:

case class Employee(id:Int,name:String)

Dataset[Employee]   // is valid
Dataframe[Employee] // is invalid

【讨论】:

我可以知道拒绝我的答案的原因吗?我分享我的实践经验。你的回答是正确的,并不代表我的回答不正确。 我相信过去已经有人指出了这一点,但不知何故,评论已被删除 - 你在这里提出的这一点是无效的,因为你比较了不同类型的对象。 Dataset 是一个类型构造函数。 DataFame 是一个类型,更具体地说,它是一个别名 Dataset[Row]。您真正应该比较的是Dataset[Row]Dataset[U],其中不是Row 的子类。 @user6910411 完全同意。【参考方案3】:

DATAFRAME:DataFrame 是一种抽象,允许数据的架构视图。

案例类人(姓名:字符串,年龄:整数,地址:字符串)

定义类人

scala > val df = List (Person (“Sumanth”, 23, “BNG”)

DATAFRAME VS DATASET

DATASET:Data Set 是 Dataframe API 的扩展,是最新的抽象,它试图提供 RDD 和 Dataframe 的最佳性能。

【讨论】:

Spark 2.0 将 json 读入带有引号的数据帧中 - 与 spark 1.6 不同的行为......错误?

】Spark2.0将json读入带有引号的数据帧中-与spark1.6不同的行为......错误?【英文标题】:Spark2.0readingjsonintoadataframewithquotesinakey-differentbehaviourthanspark1.6...bug?【发布时间】:2016-08-1019:56:54【问题描述】:不幸的是,我们不得不处理杂... 查看详情

Apache Spark - 注册 UDF - 返回数据帧

】ApacheSpark-注册UDF-返回数据帧【英文标题】:ApacheSpark-registeringaUDF-returningdataframe【发布时间】:2016-12-2010:24:47【问题描述】:我有一个返回数据框的UDF。类似下面的那个scala>predict_churn(Vectors.dense(2.0,1.0,0.0,3.0,4.0,4.0,0.0,4.0,5.0,2.0... 查看详情

Spark 2.0+ 即使数据帧被缓存,如果它的源之一发生变化,它会重新计算吗?

】Spark2.0+即使数据帧被缓存,如果它的源之一发生变化,它会重新计算吗?【英文标题】:Spark2.0+Eventhedataframeiscached,ifoneofitssourcechanges,itwouldrecompute?【发布时间】:2019-05-1902:18:17【问题描述】:这是我的用例。有多个来源df1到df4... 查看详情

如何将 Spark 数据集与 Thrift 一起使用

】如何将Spark数据集与Thrift一起使用【英文标题】:HowcanIuseSparkDatasetwithThrift【发布时间】:2016-02-0610:37:43【问题描述】:我的数据格式是用apachethrift定义的,代码由scrooge生成。我使用parquet将其存储在spark中,非常类似于blog中的... 查看详情

将 spark 数据帧转换为 dask 数据帧

】将spark数据帧转换为dask数据帧【英文标题】:Convertsparkdataframetodaskdataframe【发布时间】:2018-07-1820:11:19【问题描述】:有没有办法直接将Spark数据帧转换为Dask数据帧?我目前正在使用Spark的.toPandas()函数将其转换为pandas数据帧... 查看详情

Spark 从超级数据帧优化方法生成子数据帧

】Spark从超级数据帧优化方法生成子数据帧【英文标题】:SparkgenerateSubDataframesfromasuperDataframeOptimizedapproach【发布时间】:2017-09-1909:43:14【问题描述】:这是输入Spark数据帧。根据“代码类型”,我需要将此数据帧拆分为子数据帧... 查看详情

加入大型 Spark 数据帧

】加入大型Spark数据帧【英文标题】:JoiningLargeSparkdataframes【发布时间】:2021-01-2901:52:27【问题描述】:我有两个数据框数据帧1:|--name:string(nullable=true)|--items:array(nullable=true)||--element:string(containsNull=true)数据帧2|--item:string(nullable=... 查看详情

Spark 数据帧 CSV 与 Parquet

】Spark数据帧CSV与Parquet【英文标题】:SparkdataframeCSVvsParquet【发布时间】:2018-02-1100:36:01【问题描述】:我是Spark的初学者,并试图了解Spark数据帧的机制。当从csv和parquet加载数据时,我正在比较sparksql数据帧上的sql查询的性能。... 查看详情

缓存后正在重新评估 Spark 数据帧

】缓存后正在重新评估Spark数据帧【英文标题】:Sparkdataframeisbeingre-evaluatedafteracache【发布时间】:2019-10-0417:13:32【问题描述】:我在spark数据帧上使用缓存时遇到了一些问题。我的期望是,在数据帧上缓存之后,数据帧会在第一... 查看详情

Spark:数据帧聚合(Scala)

】Spark:数据帧聚合(Scala)【英文标题】:Spark:DataFrameAggregation(Scala)【发布时间】:2018-01-2407:01:15【问题描述】:我有以下要求在scala中聚合Spark数据帧上的数据。而且,我有两个数据集。数据集1包含分布在几个不同列上的每个... 查看详情

Spark 数据帧执行

】Spark数据帧执行【英文标题】:Sparkdataframeexecution【发布时间】:2017-03-2607:56:59【问题描述】:我想了解spark数据帧的执行。我已经浏览了日志并解释了计划,但我不清楚。我的问题是说我有spark程序,并且我有一些如下所示的... 查看详情

如何使用 spark 2.1 将联合数据帧并行到一个数据帧

】如何使用spark2.1将联合数据帧并行到一个数据帧【英文标题】:howtoparalleluniondataframestoonedataframewithspark2.1【发布时间】:2019-08-0103:32:06【问题描述】:我希望将数据合并到另一个数据帧的foreach循环中的一个数据帧中,但似乎有... 查看详情

从 Pandas 数据帧到 Spark 数据帧的转换需要大量时间

】从Pandas数据帧到Spark数据帧的转换需要大量时间【英文标题】:ConversionfromPandasdataframetoSparkdataframetakeshugeamountoftime【发布时间】:2021-03-0207:26:38【问题描述】:我使用48核远程机器,但是对于大小为(1009224,232)的pandas数据帧,如... 查看详情

Spark 数据帧分区计数

】Spark数据帧分区计数【英文标题】:Sparkdataframepartitioncount【发布时间】:2018-06-0703:21:03【问题描述】:我对spark如何在spark数据框中创建分区感到困惑。这是步骤列表和分区大小i_df=sqlContext.read.json("jsonfiles")//numpartitionsreturnedis4,... 查看详情

将 Pandas 数据帧转换为 Spark 数据帧的 TypeError

】将Pandas数据帧转换为Spark数据帧的TypeError【英文标题】:TypeErrorconvertingPandasdataframetoSparkdataframe【发布时间】:2016-06-2419:03:47【问题描述】:我有一个名为pdf的Pandas数据框,它只是四列float64。以下是前五行:pdf[:5]x1x2x3y09.08206012... 查看详情

将大型 Spark 数据帧从数据块写入 csv 失败

】将大型Spark数据帧从数据块写入csv失败【英文标题】:Failureinwritinglargesparkdataframestocsvfromdatabricks【发布时间】:2020-03-2417:06:08【问题描述】:我正在处理数据块中的大型spark数据帧,当我尝试将最终数据帧写入csv格式时,它给... 查看详情

将 spark 数据帧写入 postgres 数据库

】将spark数据帧写入postgres数据库【英文标题】:WritesparkdataframetopostgresDatabase【发布时间】:2016-08-0809:40:13【问题描述】:spark集群设置如下:conf[\'SparkConfiguration\']=SparkConf()\\.setMaster(\'yarn-client\')\\.setAppName("test")\\.set("spark.execu 查看详情

使用 Python 将 Dask 数据帧转换为 Spark 数据帧

】使用Python将Dask数据帧转换为Spark数据帧【英文标题】:ConvertDaskDataframetoSparkdataframeusingPython【发布时间】:2021-02-2513:31:15【问题描述】:我想将DaskDataframe转换为SparkDataframe。让我们考虑这个例子:importdask.dataframeasdddask_df=dd.read_... 查看详情