spark自定义分区器实现(代码片段)

gxiaobai gxiaobai     2022-12-22     290

关键词:

在spark中,框架默认使用的事hashPartitioner分区器进行对rdd分区,但是实际生产中,往往使用spark自带的分区器会产生数据倾斜等原因,这个时候就需要我们自定义分区,按照我们指定的字段进行分区。具体的流程步骤如下:

1、创建一个自定义的分区类,并继承Partitioner,注意这个partitioner是spark的partitioner

2、重写partitioner中的方法

  override def numPartitions: Int = ???
override def getPartition(key: Any): Int = ???

代码实现:
测试数据集:
cookieid,createtime,pv
cookie1,2015-04-10,1
cookie1,2015-04-11,5
cookie1,2015-04-12,7
cookie1,2015-04-13,3
cookie1,2015-04-14,2
cookie1,2015-04-15,4
cookie1,2015-04-16,4
cookie2,2015-04-10,2
cookie2,2015-04-11,3
cookie2,2015-04-12,5
cookie2,2015-04-13,6
cookie2,2015-04-14,3
cookie2,2015-04-15,9
cookie2,2015-04-16,7

  指定按照第一个字段进行分区

步骤1:
package _core.sourceCodeLearning.partitioner

import org.apache.spark.Partitioner
import scala.collection.mutable.HashMap

/**
  * Author Mr. Guo
  * Create 2019/6/23 - 12:19
  */
class UDFPartitioner(args: Array[String]) extends Partitioner 

  private val partitionMap: HashMap[String, Int] = new HashMap[String, Int]()
  var parId = 0
  for (arg <- args) 
    if (!partitionMap.contains(arg)) 
      partitionMap(arg) = parId
      parId += 1
    
  

  override def numPartitions: Int = partitionMap.valuesIterator.length

  override def getPartition(key: Any): Int = 
    val keys: String = key.asInstanceOf[String]
    val sub = keys
    partitionMap(sub)
  

  步骤2:

主类测试:

package _core.sourceCodeLearning.partitioner

import org.apache.spark.SparkConf, TaskContext
import org.apache.spark.sql.SparkSession

/**
  * Author Mr. Guo
  * Create 2019/6/23 - 12:21
  */
object UDFPartitionerMain 
  def main(args: Array[String]): Unit = 
    val conf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName)
    val ssc = SparkSession
      .builder()
      .config(conf)
      .getOrCreate()
    val sc = ssc.sparkContext
    sc.setLogLevel("WARN")

    val rdd = ssc.sparkContext.textFile("file:///E:\\\\TestFile\\\\analyfuncdata.txt")
    val transform = rdd.filter(_.split(",").length == 3).map(x => 
      val arr = x.split(",")
      (arr(0), (arr(1), arr(2)))
    )
    val keys: Array[String] = transform.map(_._1).collect()
    val partiion = transform.partitionBy(new UDFPartitioner(keys))
    partiion.foreachPartition(iter => 
      println(s"**********分区号:$TaskContext.getPartitionId()***************")
      iter.foreach(r => 
        println(s"分区:$TaskContext.getPartitionId()###" + r._1 + "\\t" + r._2 + "::" + r._2._1)
      )
    )
    ssc.stop()
  

  运行结果:

技术图片

这样就是按照第一个字段进行了分区,当然在分区器的中,对于key是可以根据自己的需求随意的处理,比如添加随机数等等

spark基础知识四(代码片段)

...绕spark的其他特性和应用。主要包括以下几个方面spark自定义分区spark中的共享变量spark程序的序列化问题spark中的application/job/stage/task之间的关系sparkonyarn原理和机制spark的资源分配方式1.spark自定义分区1.1自定义分区说明在对RDD数... 查看详情

spark自定义分区器

...器:  Spark目前支持Hash分区和Range分区,用户也可以自定义分区,Hash分区为当前的默认分区,Spark中分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle过程属于哪个分区和Reduce的个数。只有Key-Value类型的RDD才有分区器... 查看详情

spark中的分区和自定义分区器中的重新分区和排序给出数组越界异常

】spark中的分区和自定义分区器中的重新分区和排序给出数组越界异常【英文标题】:repartitionandsortwithinpartitionandcustompartitionerinsparkgivingarrayoutofboundexception【发布时间】:2016-06-1008:50:21【问题描述】:6我试图实现here的解释。当... 查看详情

spark自定义排序与分区(代码片段)

Spark自定义排序与分区前言:随着信息时代的不断发展,数据成了时代主题,今天的我们徜徉在数据的海洋中;由于数据的爆炸式增长,各种数据计算引擎如雨后春笋般冲击着这个时代。作为时下最主流的计算引擎之一Spark也是... 查看详情

Spark 数据集自定义分区器

】Spark数据集自定义分区器【英文标题】:Sparkdatasetcustompartitioner【发布时间】:2017-02-0615:25:44【问题描述】:能否请您帮我找到用于将sales数据集重新分区为N大小相等的分区的JavaAPI?大小相等是指行数相等。Dataset<Row>sales=s... 查看详情

apache spark中的自定义分区器

】apachespark中的自定义分区器【英文标题】:custompartitionerinapachespark【发布时间】:2016-02-0220:33:41【问题描述】:我正在学习《学习火花:闪电般的大数据分析》一书中的示例://custompartitionerclassDomainNamePartitioner(numParts:Int)extendsP... 查看详情

mongosparkconnector中的分区器(代码片段)

...goRDD类对象,MongoSparkConnector框架本质上就是一个大号的自定义RDD,加了些自定义配置、适配几种分区器规则、Sql的数据封装等等,个人认为相对核心的也就是分区器的规则实现;弄清楚了其分析器也就搞明白了MongoSparkConnector。... 查看详情

idea本地运行spark项目[演示自定义分区器]并查看hdfs结果文件

...创建Scala项目(四)添加Spark库文件到项目(五)创建自定义分区器(六)测试自定义分区器三、打包上传,提交运行(一)新建测试自定义分区器类(二)利用IDEA将项目打成Jar包(三)上传jar包到服务器(四)提交到Spark集群... 查看详情

mapreduce之自定义分区器partitioner(代码片段)

@目录问题引出默认Partitioner分区自定义Partitioner步骤Partition分区案例实操分区总结问题引出要求将统计结果按照条件输出到不同文件中(分区)。比如:将统计结果按照手机归属地不同省份输出到不同文件中(分区)默认Partitione... 查看详情

kafka2.5.0自定义分区器(代码片段)

自定义分区器:importorg.apache.kafka.clients.producer.Partitioner;importorg.apache.kafka.common.Cluster;importorg.apache.kafka.common.PartitionInfo;importjava.util.List;importjava.util.Map;/***@authorKing老师* 查看详情

spark自定义分区(partitioner)

...下,Spark内部不能符合咱们的需求,这时候我们就可以自定义分区策略。为此,Spark提供了相应的接口,我们只需要扩展Partitioner抽象类,然后实现里面的三个方法:01packageorg.apache.spark0203/** 查看详情

[mapreduce_8]mapreduce中的自定义分区实现(代码片段)

 0.说明  设置分区数量&&编写自定义分区代码   1.设置分区数量  分区(Partition)  分区决定了指定的Key进入到哪个Reduce中  默认hash分区,算法//返回的分区号(key.hashCode()&Integer.MAX_VALUE)%numReduceTasks&nb... 查看详情

jvm:如何实现一个自定义类加载器?(代码片段)

JVM:如何实现一个自定义类加载器?1.为什么要自定义加载器2.如何实现自定义加载器3.能不能自己写一个java.lang.String1.为什么要自定义加载器原因:1、存放在自定义路径上的类,需要通过自定义类加载器去加载。【注... 查看详情

spark-partitionby

...源码理解参考:https://www.cnblogs.com/liuming1992/p/6377540.html自定义分区函数自己根据业务数据减缓数据倾斜问题:要实现自定义的分区器,你需要继承org.apache.spark.Partitioner类并实现下面三个方法 查看详情

基于kubernetes调度框架的自定义调度器实现(代码片段)

文章目录基于kubernetes调度框架的自定义调度器实现调度流程自定义调度器调度框架(SchedulingFramework)设计目标调度过程和绑定过程扩展点(ExtensionPoints)扩展点源码分析调度框架插件的启用或者禁用示例:基... 查看详情

用户自定义类加载器及具体实现(代码片段)

用户自定义类加载器在Java的日常应用程序开发中,类的加载几乎是由上节3种类加载器(引导、扩展和系统类加载器)相互配合执行的,在必要时,我们还可以自定义类加载器,来定制类的加载方式。为什... 查看详情

kafka之消息生成者基本知识(代码片段)

...发送二原理解析2.1基本知识2.2拦截器2.2.1基本结构2.2.2自定义拦截器2.3序列化器2.3.1基本方法2.3.2自定义序列化器2.4分区器2.4.1基本方法2.4.2自定义分区器2.5消息累加器2.1基本知识2.6Sender线程生产者一消息提供者开发1.1过程1.2代码实... 查看详情

(转)jvm——自定义类加载器(代码片段)

背景:为什么要自定义,如何自定义,实现过程转载:http://blog.csdn.net/SEU_Calvin/article/details/523151250. 为什么需要自定义类加载器  网上的大部分自定义类加载器文章,几乎都是贴一段实现代码,然后分析一两句自定义Cl... 查看详情