关键词:
一,基本概念累加器是Spark的一种变量,顾名思义该变量只能增加。有以下特点:
1,累加器只能在Driver端构建及并只能是Driver读取结果,Task只能累加。
2,累加器不会改变Spark Lazy计算的特点。只会在Job触发的时候进行相关累加操作。
3,现有累加器的类型。
相信有很多学习大数据的道友,在这里我给大家说说我滴群哦,大数据海量知识分享,784789432.在此我保证,绝对大数据的干货,等待各位的到来,我们一同从入门到精通吧!
二,累加器的使用
Driver端初始化,并在Action之后获取值。
val accum = sc.accumulator(0, "test Accumulator")
accum.value
Executor端进行计算
accum+=1;
三,累加器的重点类
Class Accumulator extends Accumulable
主要是实现了累加器的初始化及封装了相关的累加器操作方法。同时在类对象构建的时候向我们的Accumulators注册了累加器。累加器的add操作的返回值类型和我们传入的值类型可以不一样。所以,我们一定要定义好如何累加和合并值。也即add方法
object Accumulators:
该方法在Driver端管理着我们的累加器,也包含了特定累加器的聚合操作。
trait AccumulatorParam[T] extends AccumulableParam[T, T]:
AccumulatorParam的addAccumulator操作的泛型封装,具体的实现还是要再具体实现类里面实现addInPlace方法。
object AccumulatorParam:
主要是进行隐式类型转换的操作。
TaskContextImpl:
在Executor端管理着我们的累加器。
四,累加器的源码解析
1,Driver端的初始化
val accum = sc.accumulator(0, "test Accumulator")
val acc = new Accumulator(initialValue, param, Some(name))
主要是在Accumulable(Accumulator)中调用了,这样我们就可以使用Accumulator使用了。
Accumulators.register(this)
2,Executor端的反序列化得到我们对象的过程
首先,我们的value_ 可以看到其并不支持序列化
@volatile @transient private var value_ : R = initialValue // Current value on master
其初始化是在我们反序列化的时候做的,反序列化还完成了Accumulator向我们的TaskContextImpl的注册
反序列化是在调用ResultTask的RunTask方法的时候做的
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
过程中会调用
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException
in.defaultReadObject()
value_ = zero
deserialized = true
// Automatically register the accumulator when it is deserialized with the task closure.
//
// Note internal accumulators sent with task are deserialized before the TaskContext is created
// and are registered in the TaskContext constructor. Other internal accumulators, such SQL
// metrics, still need to register here.
val taskContext = TaskContext.get()
if (taskContext != null)
taskContext.registerAccumulator(this)
3,累加器的累加
accum+=1;
param.addAccumulator(value_, term)
根据不同的累加器参数有不同的实现AccumulableParam
如,int类型。最终调用的AccumulatorParam特质的addAccumulator方法。
trait AccumulatorParam[T] extends AccumulableParam[T, T]
def addAccumulator(t1: T, t2: T): T =
addInPlace(t1, t2)
然后,调用的是各个具体实现的addInPlace方法
implicit object IntAccumulatorParam extends AccumulatorParam[Int]
def addInPlace(t1: Int, t2: Int): Int = t1 + t2
def zero(initialValue: Int): Int = 0
返回后更新了我们的Accumulators的value_的值。
4,Accumulator的各个节点累加的之后的聚合操作
在Task类的run方法里面得到并返回的
(runTask(context), context.collectAccumulators())
最终在DAGScheduler里面调用了updateAccumulators(event)
在updateAccumulators方法中
Accumulators.add(event.accumUpdates)
具体内容如下:
def add(values: Map[Long, Any]): Unit = synchronized
for ((id, value) <- values)
if (originals.contains(id))
// Since we are now storing weak references, we must check whether the underlying data
// is valid.
originals(id).get match
case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]] ++= value
case None =>
throw new IllegalAccessError("Attempted to access garbage collected Accumulator.")
else
logWarning(s"Ignoring accumulator update for unknown accumulator id $id")
5,最后我们就可以获取到累加器的值了
accum.value
五,累加器使用注意事项
累加器不会改变我们RDD的Lazy的特性,之后再Action之后完成计算和更新。
但是假如出现两个Action公用一个转化操作,如map,在map里面进行累加器累加,那么每次action都会累加,造成某些我们不需要的结果。
六,自定义累加器
自定义累加器输出
七,总结
主要牵涉点就是序列化及类加载执行,这是深入玩spark的必须.
spark实战系列sparkstreaming累加器的使用
SparkStreaming累加器的使用最近有同学问我累计器的用法,今天就用spark和flink各写一个demo,其实累计器的场景还是非常多的,比如我们要统计黑名单中数据出现的次数,或者是统计程序处理成功了多少数据,失败了多少数据,所以使用好... 查看详情
spark源码跟踪累加器accumulators(代码片段)
累加器Accumulators一,累加器作用及其原理1.1,作用1.2,原理二,累加器关键源码跟踪阅读2.1,测试代码2.2,跟踪源码2.2.1,add调用2.2.2,merge调用三,累加器在行动算子和转换算子中执行有何不同... 查看详情
spark系列——累加器与广播变量
一、简介在Spark中,提供了两种类型的共享变量:累加器(accumulator)与广播变量(broadcastvariable):累加器:用来对信息进行聚合,主要用于累计计数等场景;广播变量:主要用于在节点间高效分发大对象。二、累加器这里先看一个... 查看详情
spark中自定义累加器(代码片段)
通过继承AccumulatorV2可以实现自定义累加器。官方案例可参考:http://spark.apache.org/docs/latest/rdd-programming-guide.html#accumulators下面是我自己写的一个统计卡种数量的案例。packagecom.shuai7boy.myscalacodeimportorg.apache.spark.SparkConf,SparkContextimportor... 查看详情
spark2.4.8共享变量之累加器(代码片段)
Spark2.4.8共享变量之累加器一、共享变量二、累加器三、基础演示四、实验案例五、简要分析一、共享变量通常,当传递给Spark操作(例如map或reduce)的函数在远程集群节点上执行时,它会在函数中使用的所有变量的单独副本... 查看详情
spark学习之路spark的广播变量和累加器(代码片段)
...量?2.5 定义广播变量需要的注意点?2.6 注意事项三、累加器 3.1 为什么要将一个变量定义为一个累加器?3.2 图解累加器3.3 如何定义一个累加器?3.4 如何还原一个累加器?3. 查看详情
spark累加器
...在共享的变量,而为了实现共享变量spark实现了两种类型-累加器与广播变量,对于其概念与理解可以参考:共享变量(广播变量和累加器) 。可能需要注意:Spark累加器(Accumulator)陷阱及解决办法因此,我们便可以利用累加器与... 查看详情
大数据之spark:sparkcore
目录7.RDD累加器和广播变量1)累加器2)广播变量7.RDD累加器和广播变量在默认情况下,当Spark在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副本。... 查看详情
为啥我的spark累加器一直为0
参考技术A累加器(accumulator)是Spark中提供的一种分布式的变量机制,其原理类似于mapreduce,即分布式的改变,然后聚合这些改变。累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数,累加器值未累加我们都知... 查看详情
spark入门3(累加器和广播变量)
...如此,Spark提供了两种有限类型的共享变量,广播变量和累加器。二、广播变量 通常情况下,当一个RDD的很多操作都需要使用driver中定义的变量时, 查看详情
spark学习之路spark的广播变量和累加器[转](代码片段)
...供了两种有限的共享变量:广播变量(broadcastvariable)和累加器(accumulator)广播变量broadcastvariable为什么要将变量定义成广播变量?如果我们要在分布式计算里面分发大对象,例如:字典,集合,黑白名单等,这个都会由Driver端... 查看详情
spark编程进阶
6.1简介累加器:用来对信息进行聚合;广播变量:用来高效分发较大的对象 6.2累加器通常在向Spark传递函数时,可以使用驱动器程序中定义的变量,但是集群中运行的每个人物都会得到这些变量的一份新的副本,更新这些副... 查看详情
并发之striped64(l累加器)(代码片段)
并发之Striped64(累加器) 对于该类的实现思想: Striped64是在java8中添加用来支持累加器的并发组件,它可以在并发环境下使用来做某种计数,Striped64的设计思路是在竞争激烈的时候尽量分散竞争,在实现上,Stri... 查看详情
spark基础学习笔记21:rdd检查点与共享变量
...变量的传递2、使用广播变量时变量的传递(二)累加器1、累加器功能2、不使用累加器3、使用累加器零、本讲学习 查看详情
sparkstreaming源码解读之数据清理内幕彻底解密
...同时也会产生RDD, 在这个过程中除了基本的RDD外还有累加器、广播变量等,对应SparkStreaming也有自己的对象、源数据及数据清理机制, 在运行中每个Ba 查看详情
spark广播变量累加器
广播变量objectMaindefmain(args:Array[String]):Unit=valsparkConf=newSparkConf().setAppName("readMysql").setMaster("local[2]")valsparkContext=newSparkContext(sparkConf)valrdd:RDD[Int]=sparkContext.paralle 查看详情
spark累加器实验
以下代码用Pyspark+IPython完成统计日志空行的数量:读取日志,创建RDD:myrdd = sc.textFile("access.log")不使用累加器:In [68]: s = 0In [69]: def f(x): ...: 查看详情
spark篇---spark中广播变量和累加器
一、前述Spark中因为算子中的真正逻辑是发送到Executor中去运行的,所以当Executor中需要引用外部变量时,需要使用广播变量。累机器相当于统筹大变量,常用于计数,统计。二、具体原理1、广播变量广播变量理解图  ... 查看详情