spark常用编程技巧(代码片段)

fansy1990 fansy1990     2022-12-07     459

关键词:

Spark 常用编程技巧

文章目录


Spark Version : 2.2.0

1. 基础篇

1.1 Scala 篇

  1. 求均值

描述: 求一个Double类型的均值,含有NaN类型(去掉NaN后求均值);

scala> val data = Array(1,2,3.0)
data: Array[Double] = Array(1.0, 2.0, 3.0)

scala> def sum_num(arr:Array[Double]) = arr.foldLeft((0.0,0))((acc,elem) => if(elem.equals(Double.NaN)) acc else (acc._1+ elem,acc._2+1))
sum_num: (arr: Array[Double])(Double, Int)

scala> def avg(arr:Array[Double]) :Double= 
     |     val (sum , num) = sum_num(arr)
     |     sum/num
     |   
avg: (arr: Array[Double])Double

scala> avg(data)
res51: Double = 2.0

scala> val data1 = Array(1,2,3.0,Double.NaN)
data1: Array[Double] = Array(1.0, 2.0, 3.0, NaN)

scala> avg(data1)
res52: Double = 2.0

使用 foldLeft 实现
2.

1.2 Spark 篇

  1. 生成 DataFrame
scala> case class P(name:String,age:Int,salary:Double)
defined class P

scala> val data = sc.makeRDD(Array(P("tom",23,19888),P("kate",56,2300))).toDF
data: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]

scala> data.show
+----+---+-------+
|name|age| salary|
+----+---+-------+
| tom| 23|19888.0|
|kate| 56| 2300.0|
+----+---+-------+

  1. 字符串截取

描述: 针对DataFrame的某个字符串字段,截取其中的某一段

scala> case class A(name:String)
defined class A

scala> val data = sc.makeRDD(Array(A("123456"),A("abcdef"))).toDF
data: org.apache.spark.sql.DataFrame = [name: string]

scala> data.select(substring(col("name") ,0 , 3)).show
+---------------------+                                             
|substring(name, 0, 3)|
+---------------------+
|                  123|
|                  abc|
+---------------------+

scala> data.select(substring(col("name") ,1 , 3)).show
+---------------------+
|substring(name, 1, 3)|
+---------------------+
|                  123|
|                  abc|
+---------------------+

substring(列名,开始值,截取长度), 其中开始值其实是从 1 开始的,所以写 0 和 1 的结果是一样的;
2. DataFarme collect后,获取其中的值

scala> case class P(name:String,age:Int,salary:Double)
defined class P

scala> val data = sc.makeRDD(Array(P("tom",23,19888),P("kate",56,2300))).toDF
data: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]

scala> data.collect
res62: Array[org.apache.spark.sql.Row] = Array([tom,23,19888.0], [kate,56,2300.0])

scala> data.collect.map(row => (row.getString(0),row.getInt(1),row.getDouble(2)))
res61: Array[(String, Int, Double)] = Array((tom,23,19888.0), (kate,56,2300.0))

scala> data.collect.map(row => (row.getString(0),row.getInt(1),row.getDouble(2))).foreach(println(_))
(tom,23,19888.0)
(kate,56,2300.0)

DataFrame 通过Action后,得到的是 Array[Row] 类型,Row 类型获取值需要通过 getXXX() 的形式来获得,而 XXX 对应的就是其类型,如 Double 类型,那么就是 getDouble() ;
3. 自定义udf,处理基本类型列

这里的基本类型是指:Double,String,Int,Float。。。

scala> case class P(name:String,age:Int,salary:Double)
defined class P

scala> val data = sc.makeRDD(Array(P("tom",23,19888),P("kate",56,2300))).toDF

scala> val handle_cols = udf (name:String,age:Int,salary:Double) => name+"_"+age+"_" + salary 
handle_cols: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function3>,StringType,Some(List(StringType, IntegerType, DoubleType)))

scala> data.select(handle_cols(col("name"),col("age"),col("salary"))).show()
+-------------------------+
|UDF(name, age, salary)|
+-------------------------+
|           tom_23_19888.0|
|           kate_56_2300.0|
+-------------------------+

  1. 自定义udf,处理基本类型列时,传入固定值(数据参考3)
scala> val handle_cols = udf (name:String,age:Int,salary:Double,splitter:String) => name+splitter+age+splitter + salary 
handle_cols: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function4>,StringType,Some(List(StringType, IntegerType, DoubleType,StringType)))

scala> data.select(handle_cols(col("name"),col("age"),col("salary"),lit("_"))).show()
+-------------------------+
|UDF(name, age, salary, _)|
+-------------------------+
|           tom_23_19888.0|
|           kate_56_2300.0|
+-------------------------+

出入固定值,使用 lit 函数
5. 自定义udf后,新列名重命名(数据参考4)


scala> data.select(handle_cols(col("name"),col("age"),col("salary"),lit("_")) as "all").show()
+--------------+
|           all|
+--------------+
|tom_23_19888.0|
|kate_56_2300.0|
+--------------+

重命名使用 as 函数
6. 按照某个列进行分组,获取分组后的数据(Array类型)

scala> case class YV(y:String, v:Double)
defined class YV

scala> val data = sc.makeRDD(Array(YV("2018",1.0),YV("2018",5.0),YV("2018",4.0),YV("2019",1.0),YV("2019",3.0))).toDFdata: org.apache.spark.sql.DataFrame = [y: string, v: double]

scala> data.show()
+----+---+
|   y|  v|
+----+---+
|2018|1.0|
|2018|5.0|
|2018|4.0|
|2019|1.0|
|2019|3.0|
+----+---+
scala> data.groupBy("y").agg(collect_list(col("v")))
res66: org.apache.spark.sql.DataFrame = [y: string, collect_list(v): array<double>]

scala> data.groupBy("y").agg(collect_list(col("v"))).show
+----+---------------+
|   y|collect_list(v)|
+----+---------------+
|2019|     [1.0, 3.0]|
|2018|[1.0, 5.0, 4.0]|
+----+---------------+
  1. 针对DataFrame的某个数组类型Flatten为基本类型(数据使用6.)
scala> data.groupBy("y").agg(collect_list(col("v")) as "v_list").withColumn("v_list",explode(col("v_list")))
res72: org.apache.spark.sql.DataFrame = [y: string, v_list: double]

scala> data.groupBy("y").agg(collect_list(col("v")) as "v_list").withColumn("v_list",explode(col("v_list"))).show
+----+------+
|   y|v_list|
+----+------+
|2019|   1.0|
|2019|   3.0|
|2018|   1.0|
|2018|   5.0|
|2018|   4.0|
+----+------+

2. 进阶篇

2.1 Spark篇

  1. 自定义udf,对数组类型进行处理
scala> case class YV(y:String, v:Double)
defined class YV

scala> val data = sc.makeRDD(Array(YV("2018",1.0),YV("2018",5.0),YV("2018",4.0),YV("2019",1.0),YV("2019",3.0))).toDFdata: org.apache.spark.sql.DataFrame = [y: string, v: double]

scala> val handle_arr= udf(arr:scala.collection.mutable.WrappedArray[Double]) => arr.sorted
handle_arr: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,ArrayType(DoubleType,false),Some(List(ArrayType(DoubleType,false))))

scala> data.groupBy("y").agg(collect_list(col("v")) as "v_list").show
+----+---------------+
|   y|         v_list|
+----+---------------+
|2019|     [1.0, 3.0]|
|2018|[1.0, 5.0, 4.0]|
+----+---------------+

scala> data.groupBy("y").agg(collect_list(col("v")) as "v_list").withColumn("v_list", handle_arr(col("v_list")))
res69: org.apache.spark.sql.DataFrame = [y: string, v_list: array<double>]

scala> data.groupBy("y").agg(collect_list(col("v")) as "v_list").withColumn("v_list", handle_arr(col("v_list"))).show()
+----+---------------+
|   y|         v_list|
+----+---------------+
|2019|     [1.0, 3.0]|
|2018|[1.0, 4.0, 5.0]|
+----+---------------+

这里的 v_list 列就是数组类型,如果自定义函数处理这个列,那么就需要把 udf 的类型定义为: scala.collection.mutable.WrappedArray[Double]
2. 合并基本数据类型为struct类型

scala> case class YVV(y:String, v1:Double,v2:Double)
defined class YVV

scala> val data = sc.makeRDD(Array(YVV("2018",1.0,2.0),YVV("2018",5.0,4.0),YVV("2018",4.0,4.5),YVV("2019",1.0,0.3),YVV("2019",3.0,9.7))).toDF
data: org.apache.spark.sql.DataFrame = [y: string, v1: double ... 1 more field]

scala> data.show
+----+---+---+
|   y| v1| v2|
+----+---+---+
|2018|1.0|2.0|
|2018|5.0|4.0|
|2018|4.0|4.5|
|2019|1.0|0.3|
|2019|3.0|9.7|
+----+---+---+
scala> data.select(col("y"),struct("v1","v2") as "v1_v2").show()
+----+---------+
|   y|    v1_v2|
+----+---------+
|2018|[1.0,2.0]|
|2018|[5.0,4.0]|
|2018|[4.0,4.5]|
|2019|[1.0,0.3]|
|2019|[3.0,9.7]|
+----+---------+

scala> data.select(col("y"),struct("v1","v2") as "v1_v2")
res76: org.apache.spark.sql.DataFrame = [y: string, v1_v2: struct<v1: double, v2: double>]

scala> data.select(col("y"),struct("v1","v2") as "v1_v2").select(col("y"),col("v1_v2.v1"))
res77: org.apache.spark.sql.DataFrame = [y: string, v1: double]

scala> data.select(col("y"),struct("v1","v2") as "v1_v2").select(col("y"),col("v1_v2.v1")).show
+----+---+
|   y| v1|
+----+---+
|2018|1.0|
|2018|5.0|
|2018|4.0|
|2019|1.0|
|2019|3.0|
+----+---+

合并两个基本数据类型使用 ***struct***函数,取得 struct 类型的数据使用 新合并列名.原列名 的方式获得;
3. 自定义udf,处理struct类型数据(数据使用2.)

scala> val handle_struct = udf(x:org.apache.spark.sql.Row) => Array(x.getDouble(0),x.getDouble(1)).max
handle_struct: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,DoubleType,None)

scala> data.select(col("y"),struct("v1","v2") as "v1_v2").withColumn("v1_v2",handle_struct(col("v1_v2")))
res79: org.apache.spark.sql.DataFrame = [y: string, v1_v2: double]

scala> data.select(col("y"),struct("v1","v2") as "v1_v2").withColumn("v1_v2",handle_struct(col("v1_v2"))).show
+----+-----+
|   y|v1_v2|
+----+-----+
|2018|  2.0|
|2018|  5.0|
|2018|  4.5|
|2019|  1.0|
|2019|  9.7|
+----+-----+

这里使用自定义函数,求 struct 类型中的数据的 v1 , v2 中的最大值
4. 自定义udf,处理Array[Struct] 类型数据(数据使用2.)

scala> data.select(col("y"),struct("v1","v2") as "v1_v2").groupBy("y").agg(collect_list(col("v1_v2")) as "v_list")
res81: org.apache.spark.sql.DataFrame = [y: string, v_list: array<struct<v1:double,v2:double>>]

scala> data.select(col("y"),struct("v1","v2") as "v1_v2").groupBy("y").agg(collect_list(col("v1_v2")) as "v_list").show(false)
+----+---------------------------------+
|y   |v_list                           |
+----+---------------------------------+
|2019|[[1.0,0.3], [3.0,9.7]]           |
|2018|[[1.0,2.0], [5.0,4.0], [4.0,4.5]]|
+----+---------------------------------+

scala> val handle_arr_struct= udf(arr:Seq[org.apache.spark.sql.Row]) => (arr.map(x=>x.getDouble(0)).max, arr.map(x=>x.getDouble(1)).max)
handle_arr_struct: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StructType(StructField(_1,DoubleType,false), StructField(_2,DoubleType,false)),None)

scala> data.select(col("y"),struct("v1","v2") as "v1_v2").groupBy("y").agg(collect_list(col("v1_v2")) as "v_list").withColumn("v_list",handle_arr_struct(col("v_list")))
res84: org.apache.spark.sql.DataFrame = [y: string, v_list: struct<_1: double, _2: double>]

scala> data.select(col("y"),struct("v1","v2") as "v1_v2").groupBy("y").agg(collect_list(col("v1_v2")) as "v_list").withColumn("v_list",handle_arr_struct(col("v_list"))).show()
+----+---------+
|   y|   v_list|
+----+---------+
|2019|[3.0,9.7]|
|2018|[5.0,4.5]|
+----+---------+

这里要注意,使用udf的参数类型需要是 Seq[Row] 类型,同时在函数里面,获取每个Row对应的值,使用 getXXX() 实现;

想一想上面的函数处理得到的结果是什么?

Spark 常用编程技巧

Spark Version : 2.2.0

1. 基础篇

1.1 Scala 篇

  1. 求均值

描述: 求一个Double类型的均值,含有NaN类型(去掉NaN后求均值);

scala> val data = Array(1,2,3.0)
data: Array[Double] = Array(1.0, 2.0, 3.0)

scala> def sum_num(arr:Array[Double]) = arr.foldLeft((0.0,0))((acc,elem) => if(elem.equals(Double.NaN)) acc else (acc._1+ elem,acc._2+1))
sum_num: (arr: Array[Double])(Double, Int)

scala> def avg(arr:Array[Double]) :Double= 
     |     val (sum , num) = sum_num(arr)
     |     sum/num
     |   
avg: (arr: Array[Double])Double

scala> avg(data)
res51: Double = 2.0

scala> val data1 = Array(1,2,3.0,Double.NaN)
data1: Array[Double] = Array(1.0, 2.0, 3.0, NaN)

scala> avg(data1)
res52: Double = 2.0

使用 foldLeft 实现
2.

1.2 Spark 篇

  1. 生成 DataFrame
scala> case class P(name:String,age:Int,salary:Double)
defined class P

scala> val data = sc.makeRDD(Array(P("tom",23,19888),P("kate",56,2300))).toDF
data: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]

scala> data.show
+----+---+-------+
|name|age| salary|
+----+---+-------+
| tom| 23|19888.0|
|kate| 56| 2300.0|
+----+---+-------+

  1. 字符串截取

描述: 针对DataFrame的某个字符串字段,截取其中的某一段

scala> case class A(name:String)
defined class A

scala> val data = sc.makeRDD(Array(A("123456"),A("abcdef"))).toDF
data: org.apache.spark.sql.DataFrame = [name: string]

scala> data.select(substring(col("name") ,0 , 3)).show
+---------------------+                                             
|substring(name, 0, 3)|
+---------------------+
|                  123|
|                  abc|
+---------------------+

scala> data.select(substring(col("name") ,1 , 3)).show
+---------------------+
|substring(name, 1, 3)|
+---------------------+
|                  123|
|                  abc|
+---------------------+

substring(列名,开始值,截取长度), 其中开始值其实是从 1 开始的,所以写 0 和 1 的结果是一样的;
2. DataFarme collect后,获取其中的值

scala> case class P(name:String,age:Int,salary:Double)
defined class P

scala> val data = sc.makeRDD(Array(P("tom",23,19888),P("kate",56,2300))).toDF
data: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]

scala> data.collect
res62: Array[org.apache.spark.sql.Row] = Array([tom,23,19888.0], [kate,56,2300.0])

scala> data.collect.map(row => (row.getString(0),row.getInt(1),row.getDouble(2)))
res61: Array[(String, Int, Double)] = Array((tom,23,19888.0), (kate,56,2300.0))

scala> data.collect.map(row => (row.getString(0),row.getInt(1),row.getDouble(2))).foreach(println(_))
(tom,23,19888.0)
(kate,56,2300.0)

DataFrame 通过Action后,得到的是 Array[Row] 类型,Row 类型获取值需要通过 getXXX() 的形式来获得,而 XXX 对应的就是其类型,如 Double 类型,那么就是 getDouble() ;
3. 自定义udf,处理基本类型列

这里的基本类型是指:Double,String,Int,Float。。。

scala> case class P(name:String,age:Int,salary:Double)
defined class P

scala> val data = sc.makeRDD(Array(P("tom",23,19888),P("kate",56,2300))).toDF

scala> val handle_cols = udf (name:String,age:Int,salary:Double) => name+"_"+age+"_" + salary 
handle_cols: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function3>,StringType,Some(List(StringType, IntegerType, DoubleType)))

scala> data.select(handle_cols(col("name"),col("age"),col("salary"))).show()
+-------------------------+
|UDF(name, age, salary)|
+-------------------------+
|           tom_23_19888.0|
|           kate_56_2300.0|
+-------------------------+

  1. 自定义udf,处理基本类型列时,传入固定值(数据参考3)
scala> val handle_cols = udf (name:String,age:Int,salary:Double,splitter:String) => name+splitter+age+splitter + salary 
handle_cols: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function4>,StringType,Some(List(StringType, IntegerType, DoubleType,StringType)))

scala> data.select(handle_cols(col("name"),col("age"),col("salary"),lit("_"))).show()
+-------------------------+
|UDF(name, age, salary, _)|
+-------------------------+
|           tom_23_19888.0|
|           kate_56_2300.0|
+-------------------------+

出入固定值,使用 lit 函数
5. 自定义udf后,新列名重命名(数据参考4)


scala> data.select(handle_cols(col("name"),col("age"),col("salary"),lit("_")) as "all").show()
+--------------+
|           all|
+--------------+
|tom_23_19888.0|
|kate_56_2300.0|
+--------------+

重命名使用 as 函数
6. 按照某个列进行分组,获取分组后的数据(Array类型)

scala> case class YV(y:String, v:Double)
defined class YV

scala> val data = sc.makeRDD(Array(YV("2018",1.0),YV("2018",5.0),YV("2018",4.0),YV("2019",1.0),YV("2019",3.0))).toDFdata: org.apache.spark.sql.DataFrame = [y: string, v: double]

scala> data.show()
+----+---+
|   y|  v|
+----+---+
|2018|1.0|
|2018|5.0|
|2018|4.0|
|2019|1.0|
|2019|3.0|
+----+---+
scala> data.groupBy("y").agg(collect_list(col("v")))
res66: org.apache.spark.sql.DataFrame = [y: string, collect_list(v): array<double>]

scala> data.groupBy("y").agg(collect_list(col("v"))).show
+----+---------------+
|   y|collect_list(v)|
+----+---------------+
|2019|     [1.0, 3.0]|
|2018|[1.0, 5.0, 4.0]|
+----+---------------+
  1. 针对DataFrame的某个数组类型Flatten为基本类型(数据使用6.)
scala> data.groupBy("y").agg(collect_list(col("v")) as "v_list").withColumn("v_list",explode(col("v_list")))
res72: org.apache.spark.sql.DataFrame = [y: string, v_list: double]

scala> data.groupBy("y").agg(collect_list(col("v")) as "v_list").withColumn("v_list",explode(col("v_list"))).show
+----+------+
|   y|v_list|
+----+------+
|2019|   1.0|
|2019|   3.0|
|2018|   1.0|
|2018|   5.0|
|2018|   4.0|
+----+------+

2. 进阶篇

2.1 Spark篇

  1. 自定义udf,对数组类型进行处理
scala> case class YV(y:String, v:Double)
defined class YV

scala> val data = sc.makeRDD(Array(YV("2018",1.0),YV("2018",5.0),YV("2018",4.0),YV("2019",1.0),YV("2019",3.0))).toDFdata: org.apache.spark.sql.DataFrame = [y: string, v: double]

scala> val handle_arr= udf(arr:scala.collection.mutable.WrappedArray[Double]) => arr.sorted
handle_arr: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,ArrayType(DoubleType,false),Some(List(ArrayType(DoubleType,false))))

scala> data.groupBy("y").agg(collect_list(col("v")) as "v_list").show
+----+---------------+
|   y|         v_list|
+----+---------------+
|2019|     [1.0, 3.0]|
|2018|[1.0, 5.0, 4.0]|
+----+---------------+

scala> data.groupBy("y").agg(collect_list(col("v")) as "v_list").withColumn("v_list", handle_arr(col("v_list")))
res69: org.apache.spark.sql.DataFrame = [y: string, v_list: array<double>]

scala> data.groupBy("y").agg(collect_list(col("v")) as "v_list").withColumn("v_list", handle_arr(col("v_list"))).show()
+----+---------------+
|   y|         v_list|
+----+---------------+
|2019|     [1.0, 3.0]|
|2018|[1.0, 4.0, 5.0]|
+----+---------------+

这里的 v_list 列就是数组类型,如果自定义函数处理这个列,那么就需要把 udf 的类型定义为: scala.collection.mutable.WrappedArray[Double]
2. 合并基本数据类型为struct类型

scala> case class YVV(y:String, v1:Double,v2:Double)
defined class YVV

scala> val data = sc.makeRDD(Array(YVV("2018",1.0,2.0),YVV("2018",5.0,4.0),YVV("2018",4.0,4.5),YVV("2019",1.0,0.3),YVV("2019",3.0,9.7))).toDF
data: org.apache.spark.sql.DataFrame = [y: string, v1: double ... 1 more field]

scala> data.show
+----+---+---+
|   y| v1| v2|
+----+---+---+
|2018|1.0|2.0|
|2018|5.0|4.0|
|2018|4.0|4.5|
|2019|1.0|0.3|
|2019|3.0|9.7|
+----+---+---+
scala> data.select(col("y"),struct("v1","v2") as "v1_v2").show()
+----+---------+
|   y|    v1_v2|
+----+---------+
|2018|[1.0,2.0]|
|2018|[5.0,4.0]|
|2018|[4.0,4.5]|
|2019|[1.0,0.3]|
|2019|[3.0,9.7]|
+----+---------+

scala> data.select(col("y"),struct("v1","v2") as "v1_v2")
res76: org.apache.spark.sql.DataFrame = [y: string, v1_v2: struct<v1: double, v2: double>]

scala> data.select(col("y"),struct("v1","v2") as "v1_v2").select(col("y"),col("v1_v2.v1"))
res77: org.apache.spark.sql.DataFrame = [y: string, v1: double]

scala> data.select(col("y"),struct("v1","v2") as "v1_v2").select(col("y"),col("v1_v2.v1")).show
+----+---+
|   y| v1|
+----+---+
|2018|1.0|
|2018|5.0|
|2018|4.0|
|2019|1.0|
|2019|3.0|
+----+---+

合并两个基本数据类型使用 ***struct***函数,取得 struct 类型的数据使用 新合并列名.原列名 的方式获得;
3. 自定义udf,处理struct类型数据(数据使用2.)

scala> val handle_struct = udf(x:org.apache.spark.sql.Row) => Array(x.getDouble(0),x.getDouble(1)).max
handle_struct: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,DoubleType,None)

scala> data.select(col("y"),struct("v1","v2") as "v1_v2").withColumn("v1_v2",handle_struct(col("v1_v2")))
res79: org.apache.spark.sql.DataFrame = [y: string, v1_v2: double]

scala> data.select(col("y"),struct("v1","v2") as "v1_v2").withColumn("v1_v2",handle_struct(col("v1_v2"))).show
+----+-----+
|   y|v1_v2|
+----+-----+
|2018|  2.0|
|2018|  5.0|
|2018|  4.5|
|2019|  1.0|
|2019|  9.7|
+----+-----+

这里使用自定义函数,求 struct 类型中的数据的 v1 , v2 中的最大值
4. 自定义udf,处理Array[Struct] 类型数据(数据使用2.)

scala> data.select(col("y"),struct("v1","v2") as "v1_v2").groupBy("y").agg(collect_list(col("v1_v2")) as "v_list")
res81: org.apache.spark.sql.DataFrame = [y: string, v_list: array<struct<v1:double,v2:double>>]

scala> data.select(col("y"),struct("v1","v2") as "v1_v2").groupBy("y").agg(collect_list(col("v1_v2")) as "v_list").show(false)
+----+---------------------------------+
|y   |v_list                           |
+----+---------------------------------+
|2019|[[1.0,0.3], [3.0,9.7]]           |
|2018|[[1.0,2.0], [5.0,4.0], [4.0,4.5]]|
+----+---------------------------------+

scala> val handle_arr_struct= udf(arr:Seq[org.apache.spark.sql.Row]) => (arr.map(x=>x.getDouble(0)).max, arr.map(x=>x.getDouble(1)).max)
handle_arr_struct: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StructType(StructField(_1,DoubleType,false), StructField(_2,DoubleType,false)),None)

scala> data.select(col("y"),struct("v1","v2") as "v1_v2").groupBy("y").agg(collect_list(col("v1_v2")) as "v_list").withColumn("v_list",handle_arr_struct(col("v_list")))
res84: org.apache.spark.sql.DataFrame = [y: string, v_list: struct<_1: double, _2: double>]

scala> data.select(col("y"),struct("v1","v2") as "v1_v2").groupBy("y").agg(collect_list(col("v1_v2")) as "v_list").withColumn("v_list",handle_arr_struct(col("v_list"))).show()
+----+---------+
|   y|   v_list|
+----+---------+
|2019|[3.0,9.7]|
|2018|[5.0,4.5]|
+----+---------+

这里要注意,使用udf的参数类型需要是 Seq[Row] 类型,同时在函数里面,获取每个Row对应的值,使用 getXXX() 实现;

想一想上面的函数处理得到的结果是什么?

也可以关注:https://github.com/fansy1990/article/blob/master/Spark/Spark常用编程技巧V1.0.md 不定期更新。

spark常用操作(代码片段)

//spark读取数据Dataset<Row>df=spark.read().textFile(currentSrcPath,1);Dataset<Row>df=spark.read().json(path);Dataset<Row>df=spark.read().orc(path);Dataset<Row>parquet=spark.read(). 查看详情

spark常用参数(代码片段)

valconf=newSparkConf().setAppName("WordCount_groupBy").setMaster("local")//.set("spark.default.parallelism","100")//1.调节并行度.set("spark.executor.memory","4g").set("spark.executor.cores","5").set("spark.executor.nums","4")//1//静态内存机制.set("spark.memory.useLegacyMode","false").set(... 查看详情

转-spark编程指南(代码片段)

Spark编程指南概述Spark依赖初始化Spark使用Shell弹性分布式数据集(RDDs)并行集合外部Datasets(数据集)RDD操作基础传递Functions(函数)给Spark理解闭包示例Local(本地)vs.cluster(集群)模式打印RDD的elements与Key-ValuePairs一起使用Transfor... 查看详情

spark编程指南(代码片段)

备注:根据官网http://spark.apache.org/docs/latest/rdd-programming-guide.html进行对比更新Spark编程指南 (写在前面,本文是翻译自2015年7月29日的http://spark.apache.org/docs/latest/programming-guide.html,由于水平所限࿰ 查看详情

spark常用算子合集一文搞定spark中的常用转换与行动算子(代码片段)

🚀作者:“大数据小禅”🚀文章简介:本篇文章属于Spark系列文章,专栏将会记录从spark基础到进阶的内容🚀内容涉及到Spark的入门集群搭建,核心组件,RDD,算子的使用,底层原理,Sp... 查看详情

spark编程基础总结(代码片段)

初始化Spark//创建spark配置valconf=newSparkConf().setAppName(appName).setMaster(master)//SparkContext上下文对象newSparkContext(conf)RDDSSpark核心的概念是ResilientDistributedDataset(RDD):一个可并行操作的有容错机制的数据集合。有2种 查看详情

spark编程基础总结(代码片段)

初始化Spark//创建spark配置valconf=newSparkConf().setAppName(appName).setMaster(master)//SparkContext上下文对象newSparkContext(conf)RDDSSpark核心的概念是ResilientDistributedDataset(RDD):一个可并行操作的有容错机制的数据集合。有2种 查看详情

spark编程基础总结(代码片段)

初始化Spark//创建spark配置valconf=newSparkConf().setAppName(appName).setMaster(master)//SparkContext上下文对象newSparkContext(conf)RDDSSpark核心的概念是ResilientDistributedDataset(RDD):一个可并行操作的有容错机制的数据集合。有2种 查看详情

spark编程指南(代码片段)

本文翻译文章http://spark.apache.org/docs/latest/programming-guide.html可以通过很多支持的API去操作RDD也还需要浪费很多笔墨来组织这一块的操作,因为这一块实在是非常的重要。RDDssupporttwotypesofoperations:transformations,whichcreateanewdatasetfroma... 查看详情

spark浅谈:spark核心编程(代码片段)

一、SPARK-CORE  1.spark核心模块是整个项目的基础。提供了分布式的任务分发,调度以及基本的IO功能,Spark使用基础的数据结构,叫做RDD(弹性分布式数据集),是一个逻辑的数据分区的集合,可以跨机器。RDD可以通过两种方式进行... 查看详情

spark编程基础(代码片段)

ISBN9787115488169之前我们已经完成了Hadoop+Spark集群的搭建,下面改用这本书。教材,讲义,实验sbtSpark应用程序开发可以采用Scala+sbt,Java+Maven,或Python直接spark-submit三种方式。这里介绍通过sbt编译打包Scala程序的命令行方法,也可... 查看详情

spark常用特殊符号详解(代码片段)

=>(匿名函数)在Spark中函数也是一个对象可以赋值给一个变量。Spark的匿名函数定义格式:(形参列表)=>函数体所以,=>的作用就是创建一个匿名函数实例。比如:(x:Int)=>x+1,就等同... 查看详情

spark的python编程环境(代码片段)

Spark编程环境Spark可以独立安装使用,也可以和Hadoop一起安装使用。在安装Spark之前,首先确保你的电脑上已经安装了Java8或者更高的版本。Spark安装访问Spark下载页面,并选择最新版本的Spark直接下载,当前的最新版本是2.4.2。下... 查看详情

spark基础-scala学习(代码片段)

...赋值给变量匿名函数高阶函数高级函数的类型推断scala的常用高阶函数闭包sam转换currying函数return将函数赋值给变量scala中的函数是一等公民,可以独立定义,独立存在,而且可以直接将函数作为值赋值给变量scala>defsayHello(name:S... 查看详情

spark算子系列第0篇:spark常用算子详解(代码片段)

...行,需要等到有Action操作的时候才会真正触发运算。常用操作有:map,filter,flatMap,u 查看详情

spark编程模型(上)(代码片段)

初识RDD什么是RDD?定义:Resilientdistributeddatasets(RDD),anefficient,general-purposeandfault-tolerantabstractionforsharingdatainclusterapplications.  RDD是只读的。    RDD是分区记录的集合。  RDD是容错的。---lineageRDD是高效的。  RDD不 查看详情

spark编程环境搭建及wordcount实例(代码片段)

  基于IntellijIDEA搭建Spark开发环境搭建 基于IntellijIDEA搭建Spark开发环境搭——参考文档  ●参考文档http://spark.apache.org/docs/latest/programming-guide.html  ●操作步骤·a)创建maven项目·b)引入依赖(Spark依赖、... 查看详情

spark之rdd编程(代码片段)

  Spark对数据的核心抽象——弹性分布式数据集(ResilientDistributedDataset),简称RDD。RDD其实就是分布式的元素集合。在Spark中,对数据的所有操作不外乎创建RDD、转化已有RDD以及调用RDD操作进行求值。而在这背后,Spark会... 查看详情