关键词:
考察spark自定义排序
方式一:自定义一个类继承Ordered和序列化,Driver端将数据变成RDD,整理数据转成自定义类类型的RDD,使用本身排序即可。
package com.rz.spark.base import org.apache.spark.rdd.RDD import org.apache.spark.SparkConf, SparkContext // 自定义排序 object CustomSort1 def main(args: Array[String]): Unit = val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]") val sc = new SparkContext(conf) // 排序规则:首先按照颜值的降序,如果产值相等,再按照年龄的升序 val users = Array("xiaohong 30 50","xiaobai 90 50","xiaozhang 78 100", "xiaolong 66 66") // 将Driver端的数据并行化变成RDD val lines:RDD[String] = sc.parallelize(users) // 切分整理数据 val userRDD: RDD[User] = lines.map(line => val fields = line.split(" ") val name = fields(0) val age = fields(1).toInt val fv = fields(2).toInt //(name, age, fv) new User(name, age, fv) ) // 不满足要求 // tpRDD.sortBy(tp=> tp._3, false) // 将RDD里面封装在User类型的数据进行排序 val sorted: RDD[User] = userRDD.sortBy(u=>u) val result = sorted.collect() println(result.toBuffer) sc.stop() // shuffle时数据要通过网络传输,需要对数据进行序列化 class User(val name:String, val age:Int, val fv:Int) extends Ordered[User] with Serializable override def compare(that: User): Int = if (this.fv == that.fv) this.age - that.age else -(this.fv - that.fv) override def toString: String = s"name: $name, age: $age, fv: $fv"
方式2:自定义一个类继承Ordered和序列化,Driver端将数据变成RDD,整理数据转成元组类型的RDD,使用就自定义类做排序规则。
package com.rz.spark.base import org.apache.spark.SparkConf, SparkContext import org.apache.spark.rdd.RDD object CustomSort2 def main(args: Array[String]): Unit = val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]") val sc = new SparkContext(conf) // 排序规则:首先按照颜值的降序,如果产值相等,再按照年龄的升序 val users = Array("xiaohong 30 50","xiaobai 90 50","xiaozhang 66 50", "xiaolong 66 66") // 将Driver端的数据并行化变成RDD val lines:RDD[String] = sc.parallelize(users) // 切分整理数据 val userRDD: RDD[(String, Int, Int)] = lines.map(line => val fields = line.split(" ") val name = fields(0) val age = fields(1).toInt val fv = fields(2).toInt (name, age, fv) ) // 排序(传入了一个排序规则, 不会改变数据的格式,只会以改变顺序) class Boy不是多例 val sorted: RDD[(String, Int, Int)] = userRDD.sortBy(tp=> new Boy(tp._2,tp._3)) val result = sorted.collect() println(result.toBuffer) sc.stop() // shuffle时数据要通过网络传输,需要对数据进行序列化 class Boy(val age:Int, val fv:Int) extends Ordered[Boy] with Serializable override def compare(that: Boy): Int = if (this.fv == that.fv) this.age - that.age else -(this.fv - that.fv)
方式3:作用多例的case class来做排序规则
package com.rz.spark.base import org.apache.spark.SparkConf, SparkContext import org.apache.spark.rdd.RDD object CustomSort3 def main(args: Array[String]): Unit = val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]") val sc = new SparkContext(conf) // 排序规则:首先按照颜值的降序,如果产值相等,再按照年龄的升序 val users = Array("xiaohong 30 50","xiaobai 90 50","xiaozhang 66 50", "xiaolong 66 66") // 将Driver端的数据并行化变成RDD val lines:RDD[String] = sc.parallelize(users) // 切分整理数据 val userRDD: RDD[(String, Int, Int)] = lines.map(line => val fields = line.split(" ") val name = fields(0) val age = fields(1).toInt val fv = fields(2).toInt (name, age, fv) ) // 排序(传入了一个排序规则, 不会改变数据的格式,只会以改变顺序) val sorted: RDD[(String, Int, Int)] = userRDD.sortBy(tp=> Man(tp._2,tp._3)) val result = sorted.collect() println(result.toBuffer) sc.stop() // shuffle时数据要通过网络传输,需要对数据进行序列化 // case class 本身已经实现序列化且多例 (缺点是规则写死,无法用新的规则排序,可用隐式转换实现) case class Man(age:Int, fv:Int) extends Ordered[Man] override def compare(that: Man): Int = if (this.fv == that.fv) this.age - that.age else -(this.fv - that.fv)
方式4,通过隐式参数指定灵活的排序规则
package com.rz.spark.base import org.apache.spark.SparkConf, SparkContext import org.apache.spark.rdd.RDD object CustomSort4 def main(args: Array[String]): Unit = val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]") val sc = new SparkContext(conf) // 排序规则:首先按照颜值的降序,如果产值相等,再按照年龄的升序 val users = Array("xiaohong 30 50","xiaobai 90 50","xiaozhang 66 50", "xiaolong 66 66") // 将Driver端的数据并行化变成RDD val lines:RDD[String] = sc.parallelize(users) // 切分整理数据 val userRDD: RDD[(String, Int, Int)] = lines.map(line => val fields = line.split(" ") val name = fields(0) val age = fields(1).toInt val fv = fields(2).toInt (name, age, fv) ) // 排序(传入了一个排序规则, 不会改变数据的格式,只会以改变顺序) // 传入一个Ordering类型的隐式参数 import SortRules.OrderingHero val sorted: RDD[(String, Int, Int)] = userRDD.sortBy(tp=> Hero(tp._2,tp._3)) val result = sorted.collect() println(result.toBuffer) sc.stop() // shuffle时数据要通过网络传输,需要对数据进行序列化 // case class 本身已经实现序列化,不指定固定的排序规则,由隐式参数指定 case class Hero(age:Int, fv:Int)
方式5:元组有自己的compareTo方法,充分利用元组的比较规则,元组的比较规则:先比第一,相等再比第二个。如果还满足不了再自定义排序的类来排序
package com.rz.spark.base import org.apache.spark.SparkConf, SparkContext import org.apache.spark.rdd.RDD object CustomSort5 def main(args: Array[String]): Unit = val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]") val sc = new SparkContext(conf) // 排序规则:首先按照颜值的降序,如果产值相等,再按照年龄的升序 val users = Array("xiaohong 30 50","xiaobai 90 50","xiaozhang 66 50", "xiaolong 66 66") // 将Driver端的数据并行化变成RDD val lines:RDD[String] = sc.parallelize(users) // 切分整理数据 val userRDD: RDD[(String, Int, Int)] = lines.map(line => val fields = line.split(" ") val name = fields(0) val age = fields(1).toInt val fv = fields(2).toInt (name, age, fv) ) // 排序(传入了一个排序规则, 不会改变数据的格式,只会以改变顺序) // 充分利用元组的比较规则,元组的比较规则:先比第一,相等再比第二个 val sorted: RDD[(String, Int, Int)] = userRDD.sortBy(tp=> (-tp._3,tp._2)) val result = sorted.collect() println(result.toBuffer) sc.stop()
方式6:和方式5相似,但是用到自定义的隐式参数作排序规则
package com.rz.spark.base import org.apache.spark.rdd.RDD import org.apache.spark.SparkConf, SparkContext object CustomSort6 def main(args: Array[String]): Unit = val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]") val sc = new SparkContext(conf) // 排序规则:首先按照颜值的降序,如果产值相等,再按照年龄的升序 val users = Array("xiaohong 30 50","xiaobai 90 50","xiaozhang 66 50", "xiaolong 66 66") // 将Driver端的数据并行化变成RDD val lines:RDD[String] = sc.parallelize(users) // 切分整理数据 val userRDD: RDD[(String, Int, Int)] = lines.map(line => val fields = line.split(" ") val name = fields(0) val age = fields(1).toInt val fv = fields(2).toInt (name, age, fv) ) // 排序(传入了一个排序规则, 不会改变数据的格式,只会以改变顺序) // 充分利用元组的比较规则,元组的比较规则:先比第一,相等再比第二个 val sorted: RDD[(String, Int, Int)] = userRDD.sortBy(tp=> (-tp._3,tp._2)) val result = sorted.collect() println(result.toBuffer) sc.stop()
spark-自定义排序(代码片段)
一、自定义排序规则-封装类importorg.apache.spark.rdd.RDDimportorg.apache.spark.SparkConf,SparkContext/***实现自定义的排序*/objectMySort1defmain(args:Array[String]):Unit=//1.spark程序的入口valconf:SparkConf=newSparkConf().setAppName("MySort1").setMaster("local[2]")valsc:SparkC... 查看详情
sparksql自定义函数(代码片段)
目录1SparkSQL自定义函数1.1自定义函数分类1.2自定义UDF1.3自定义UDAF2开窗函数2.1、概述2.2.准备工作2.3.聚合开窗函数2.4.排序开窗函数2.4.1ROW_NUMBER顺序排序2.4.2RANK跳跃排序2.4.3DENSE_RANK连续排序2.4.4NTILE分组排名1SparkSQL自定义函数1.1自定... 查看详情
spark自定义分区器实现(代码片段)
...的分区器会产生数据倾斜等原因,这个时候就需要我们自定义分区,按照我们指定的字段进行分区。具体的流程步骤如下:1、创建一个自定义的分区类,并继承Partitioner,注意这个partitioner是spark的partitioner2、重写partitioner中的方... 查看详情
Spark UDF 到自定义排序结构数组
】SparkUDF到自定义排序结构数组【英文标题】:SparkUDFtocustomsortarrayofstructs【发布时间】:2020-01-2418:35:24【问题描述】:我正在尝试使用UDF根据我定义的自定义排序对结构数组进行排序。这是我希望获得的结果类型的示例:input_tbl... 查看详情
spark学习之路(十九)sparksql的自定义函数udf(代码片段)
讨论QQ:1586558083在Spark中,也支持Hive中的自定义函数。自定义函数大致可以分为三种:UDF(User-Defined-Function),即最基本的自定义函数,类似to_char,to_date等UDAF(User-DefinedAggregationFuncation),用户自定义聚合函数,类似在groupby之后使... 查看详情
spark学习rdd自定义分区和缓存(代码片段)
一,简介二,自定义分区规则 2.1 普通的分组TopN实现 2.2 自定义分区规则TopN实现三,RDD的缓存 3.1RDD缓存简介 3.2RDD缓存方式 正文一,简介 在之前的文章中,我们知道RDD的有一个特征:... 查看详情
textphp自定义排序(代码片段)
vbscriptvba:自定义排序(代码片段)
spark基础知识四(代码片段)
...绕spark的其他特性和应用。主要包括以下几个方面spark自定义分区spark中的共享变量spark程序的序列化问题spark中的application/job/stage/task之间的关系sparkonyarn原理和机制spark的资源分配方式1.spark自定义分区1.1自定义分区说明在对RDD数... 查看详情
phpfacetwp自定义排序方面(代码片段)
phpfacetwp自定义排序方面(代码片段)
collections自定义list排序规则,进行自定义排序(代码片段)
//这里的顺序,是我自己定义的一个List<String>String[]regulation="诸葛亮","鲁班","貂蝉","吕布";finalList<String>regulationOrder=Arrays.asList(regulation);String[]ordered="貂蝉","诸葛亮","吕布","貂蝉","鲁班","诸葛亮","貂蝉","鲁班" 查看详情
php自定义查询-订单-排序(代码片段)
htmlpods-按自定义日期排序(代码片段)
php订购(排序)自定义查询(代码片段)
phppods按自定义日期排序(代码片段)
spark自定义udf输入类型为array报错(代码片段)
定义udf如下vallist2string=udf(style:Array[String],num:Array[Long])=> style.zip(num).map(t=>t._1+":"+t._2).mkString("<br>")输入为两个数组,输出为string 报错如下Causedby:java.lang.ClassCastEx 查看详情