spark-自定义排序(代码片段)

rzcong rzcong     2023-03-12     181

关键词:

考察spark自定义排序

方式一:自定义一个类继承Ordered和序列化,Driver端将数据变成RDD,整理数据转成自定义类类型的RDD,使用本身排序即可。

package com.rz.spark.base

import org.apache.spark.rdd.RDD
import org.apache.spark.SparkConf, SparkContext

// 自定义排序
object CustomSort1 

  def main(args: Array[String]): Unit = 
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
    val sc = new SparkContext(conf)

    // 排序规则:首先按照颜值的降序,如果产值相等,再按照年龄的升序
    val users = Array("xiaohong 30 50","xiaobai 90 50","xiaozhang 78 100", "xiaolong 66 66")

    // 将Driver端的数据并行化变成RDD
    val lines:RDD[String] = sc.parallelize(users)

    // 切分整理数据
    val userRDD: RDD[User] = lines.map(line => 
      val fields = line.split(" ")
      val name = fields(0)
      val age = fields(1).toInt
      val fv = fields(2).toInt
      //(name, age, fv)
      new User(name, age, fv)
    )

    // 不满足要求
    // tpRDD.sortBy(tp=> tp._3, false)

    // 将RDD里面封装在User类型的数据进行排序
    val sorted: RDD[User] = userRDD.sortBy(u=>u)
    val result = sorted.collect()
    println(result.toBuffer)


    sc.stop()
  


// shuffle时数据要通过网络传输,需要对数据进行序列化
class User(val name:String, val age:Int, val fv:Int) extends Ordered[User] with Serializable 
  override def compare(that: User): Int = 
    if (this.fv == that.fv)
      this.age - that.age
    else
      -(this.fv - that.fv)
    
  

  override def toString: String = s"name: $name, age: $age, fv: $fv"

方式2:自定义一个类继承Ordered和序列化,Driver端将数据变成RDD,整理数据转成元组类型的RDD,使用就自定义类做排序规则。

package com.rz.spark.base

import org.apache.spark.SparkConf, SparkContext
import org.apache.spark.rdd.RDD

object CustomSort2 
  def main(args: Array[String]): Unit = 
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
    val sc = new SparkContext(conf)

    // 排序规则:首先按照颜值的降序,如果产值相等,再按照年龄的升序
    val users = Array("xiaohong 30 50","xiaobai 90 50","xiaozhang 66 50", "xiaolong 66 66")

    // 将Driver端的数据并行化变成RDD
    val lines:RDD[String] = sc.parallelize(users)

    // 切分整理数据
    val userRDD: RDD[(String, Int, Int)] = lines.map(line => 
      val fields = line.split(" ")
      val name = fields(0)
      val age = fields(1).toInt
      val fv = fields(2).toInt
      (name, age, fv)
    )

    // 排序(传入了一个排序规则, 不会改变数据的格式,只会以改变顺序)  class Boy不是多例
    val sorted: RDD[(String, Int, Int)] = userRDD.sortBy(tp=> new Boy(tp._2,tp._3))
    val result = sorted.collect()
    println(result.toBuffer)

    sc.stop()
  


// shuffle时数据要通过网络传输,需要对数据进行序列化
class Boy(val age:Int, val fv:Int) extends Ordered[Boy] with Serializable 
  override def compare(that: Boy): Int = 
    if (this.fv == that.fv)
      this.age - that.age
    else
      -(this.fv - that.fv)
    
  

方式3:作用多例的case class来做排序规则

package com.rz.spark.base

import org.apache.spark.SparkConf, SparkContext
import org.apache.spark.rdd.RDD

object CustomSort3 
  def main(args: Array[String]): Unit = 
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
    val sc = new SparkContext(conf)

    // 排序规则:首先按照颜值的降序,如果产值相等,再按照年龄的升序
    val users = Array("xiaohong 30 50","xiaobai 90 50","xiaozhang 66 50", "xiaolong 66 66")

    // 将Driver端的数据并行化变成RDD
    val lines:RDD[String] = sc.parallelize(users)

    // 切分整理数据
    val userRDD: RDD[(String, Int, Int)] = lines.map(line => 
      val fields = line.split(" ")
      val name = fields(0)
      val age = fields(1).toInt
      val fv = fields(2).toInt
      (name, age, fv)
    )

    // 排序(传入了一个排序规则, 不会改变数据的格式,只会以改变顺序)
    val sorted: RDD[(String, Int, Int)] = userRDD.sortBy(tp=> Man(tp._2,tp._3))
    val result = sorted.collect()
    println(result.toBuffer)

    sc.stop()
  


// shuffle时数据要通过网络传输,需要对数据进行序列化
// case class 本身已经实现序列化且多例 (缺点是规则写死,无法用新的规则排序,可用隐式转换实现)
case class Man(age:Int, fv:Int) extends Ordered[Man]
  override def compare(that: Man): Int = 
    if (this.fv == that.fv)
      this.age - that.age
    else
      -(this.fv - that.fv)
    
  

方式4,通过隐式参数指定灵活的排序规则

package com.rz.spark.base

import org.apache.spark.SparkConf, SparkContext
import org.apache.spark.rdd.RDD

object CustomSort4 
  def main(args: Array[String]): Unit = 
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
    val sc = new SparkContext(conf)

    // 排序规则:首先按照颜值的降序,如果产值相等,再按照年龄的升序
    val users = Array("xiaohong 30 50","xiaobai 90 50","xiaozhang 66 50", "xiaolong 66 66")

    // 将Driver端的数据并行化变成RDD
    val lines:RDD[String] = sc.parallelize(users)

    // 切分整理数据
    val userRDD: RDD[(String, Int, Int)] = lines.map(line => 
      val fields = line.split(" ")
      val name = fields(0)
      val age = fields(1).toInt
      val fv = fields(2).toInt
      (name, age, fv)
    )

    // 排序(传入了一个排序规则, 不会改变数据的格式,只会以改变顺序)
    // 传入一个Ordering类型的隐式参数
    import SortRules.OrderingHero
    val sorted: RDD[(String, Int, Int)] = userRDD.sortBy(tp=> Hero(tp._2,tp._3))
    val result = sorted.collect()
    println(result.toBuffer)

    sc.stop()
  


// shuffle时数据要通过网络传输,需要对数据进行序列化
// case class 本身已经实现序列化,不指定固定的排序规则,由隐式参数指定
case class Hero(age:Int, fv:Int)

方式5:元组有自己的compareTo方法,充分利用元组的比较规则,元组的比较规则:先比第一,相等再比第二个。如果还满足不了再自定义排序的类来排序

package com.rz.spark.base

import org.apache.spark.SparkConf, SparkContext
import org.apache.spark.rdd.RDD

object CustomSort5 
  def main(args: Array[String]): Unit = 
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
    val sc = new SparkContext(conf)

    // 排序规则:首先按照颜值的降序,如果产值相等,再按照年龄的升序
    val users = Array("xiaohong 30 50","xiaobai 90 50","xiaozhang 66 50", "xiaolong 66 66")

    // 将Driver端的数据并行化变成RDD
    val lines:RDD[String] = sc.parallelize(users)

    // 切分整理数据
    val userRDD: RDD[(String, Int, Int)] = lines.map(line => 
      val fields = line.split(" ")
      val name = fields(0)
      val age = fields(1).toInt
      val fv = fields(2).toInt
      (name, age, fv)
    )

    // 排序(传入了一个排序规则, 不会改变数据的格式,只会以改变顺序)
    // 充分利用元组的比较规则,元组的比较规则:先比第一,相等再比第二个
    val sorted: RDD[(String, Int, Int)] = userRDD.sortBy(tp=> (-tp._3,tp._2))
    val result = sorted.collect()
    println(result.toBuffer)

    sc.stop()
  

方式6:和方式5相似,但是用到自定义的隐式参数作排序规则

package com.rz.spark.base

import org.apache.spark.rdd.RDD
import org.apache.spark.SparkConf, SparkContext

object CustomSort6   
  def main(args: Array[String]): Unit = 
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
    val sc = new SparkContext(conf)

    // 排序规则:首先按照颜值的降序,如果产值相等,再按照年龄的升序
    val users = Array("xiaohong 30 50","xiaobai 90 50","xiaozhang 66 50", "xiaolong 66 66")

    // 将Driver端的数据并行化变成RDD
    val lines:RDD[String] = sc.parallelize(users)

    // 切分整理数据
    val userRDD: RDD[(String, Int, Int)] = lines.map(line => 
      val fields = line.split(" ")
      val name = fields(0)
      val age = fields(1).toInt
      val fv = fields(2).toInt
      (name, age, fv)
    )

    // 排序(传入了一个排序规则, 不会改变数据的格式,只会以改变顺序)
    // 充分利用元组的比较规则,元组的比较规则:先比第一,相等再比第二个
    val sorted: RDD[(String, Int, Int)] = userRDD.sortBy(tp=> (-tp._3,tp._2))
    val result = sorted.collect()
    println(result.toBuffer)

    sc.stop()
  

 

spark-自定义排序(代码片段)

一、自定义排序规则-封装类importorg.apache.spark.rdd.RDDimportorg.apache.spark.SparkConf,SparkContext/***实现自定义的排序*/objectMySort1defmain(args:Array[String]):Unit=//1.spark程序的入口valconf:SparkConf=newSparkConf().setAppName("MySort1").setMaster("local[2]")valsc:SparkC... 查看详情

sparksql自定义函数(代码片段)

目录1SparkSQL自定义函数1.1自定义函数分类1.2自定义UDF1.3自定义UDAF2开窗函数2.1、概述2.2.准备工作2.3.聚合开窗函数2.4.排序开窗函数2.4.1ROW_NUMBER顺序排序2.4.2RANK跳跃排序2.4.3DENSE_RANK连续排序2.4.4NTILE分组排名1SparkSQL自定义函数1.1自定... 查看详情

spark自定义分区器实现(代码片段)

...的分区器会产生数据倾斜等原因,这个时候就需要我们自定义分区,按照我们指定的字段进行分区。具体的流程步骤如下:1、创建一个自定义的分区类,并继承Partitioner,注意这个partitioner是spark的partitioner2、重写partitioner中的方... 查看详情

Spark UDF 到自定义排序结构数组

】SparkUDF到自定义排序结构数组【英文标题】:SparkUDFtocustomsortarrayofstructs【发布时间】:2020-01-2418:35:24【问题描述】:我正在尝试使用UDF根据我定义的自定义排序对结构数组进行排序。这是我希望获得的结果类型的示例:input_tbl... 查看详情

spark学习之路(十九)sparksql的自定义函数udf(代码片段)

讨论QQ:1586558083在Spark中,也支持Hive中的自定义函数。自定义函数大致可以分为三种:UDF(User-Defined-Function),即最基本的自定义函数,类似to_char,to_date等UDAF(User-DefinedAggregationFuncation),用户自定义聚合函数,类似在groupby之后使... 查看详情

spark学习rdd自定义分区和缓存(代码片段)

一,简介二,自定义分区规则  2.1 普通的分组TopN实现  2.2 自定义分区规则TopN实现三,RDD的缓存  3.1RDD缓存简介  3.2RDD缓存方式    正文一,简介  在之前的文章中,我们知道RDD的有一个特征:... 查看详情

textphp自定义排序(代码片段)

查看详情

vbscriptvba:自定义排序(代码片段)

查看详情

spark基础知识四(代码片段)

...绕spark的其他特性和应用。主要包括以下几个方面spark自定义分区spark中的共享变量spark程序的序列化问题spark中的application/job/stage/task之间的关系sparkonyarn原理和机制spark的资源分配方式1.spark自定义分区1.1自定义分区说明在对RDD数... 查看详情

phpfacetwp自定义排序方面(代码片段)

查看详情

phpfacetwp自定义排序方面(代码片段)

查看详情

collections自定义list排序规则,进行自定义排序(代码片段)

//这里的顺序,是我自己定义的一个List<String>String[]regulation="诸葛亮","鲁班","貂蝉","吕布";finalList<String>regulationOrder=Arrays.asList(regulation);String[]ordered="貂蝉","诸葛亮","吕布","貂蝉","鲁班","诸葛亮","貂蝉","鲁班" 查看详情

php自定义查询-订单-排序(代码片段)

查看详情

htmlpods-按自定义日期排序(代码片段)

查看详情

php订购(排序)自定义查询(代码片段)

查看详情

phppods按自定义日期排序(代码片段)

查看详情

spark自定义udf输入类型为array报错(代码片段)

定义udf如下vallist2string=udf(style:Array[String],num:Array[Long])=>  style.zip(num).map(t=>t._1+":"+t._2).mkString("<br>")输入为两个数组,输出为string 报错如下Causedby:java.lang.ClassCastEx 查看详情

php标签按自定义字段排序标记排序(代码片段)

查看详情