关键词:
在阅读spark mllib源码的时候,发现一个出镜率很高的函数——aggregate和treeAggregate,比如matrix.columnSimilarities()中。为了好好理解这两个方法的使用,于是整理了本篇内容。
由于treeAggregate是在aggregate基础上的优化版本,因此先来看看aggregate是什么.
更多内容参考我的大数据学习之路
aggregate
先直接看一下代码例子:
import org.apache.spark.sql.SparkSession
object AggregateTest {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]").appName("tf-idf").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
// 创建rdd,并分成6个分区
val rdd = spark.sparkContext.parallelize(1 to 12).repartition(6)
// 输出每个分区的内容
rdd.mapPartitionsWithIndex((index:Int,it:Iterator[Int])=>{
Array((s" $index : ${it.toList.mkString(",")}")).toIterator
}).foreach(println)
// 执行agg
val res1 = rdd.aggregate(0)(seqOp, combOp)
}
// 分区内执行的方法,直接加和
def seqOp(s1:Int, s2:Int):Int = {
println("seq: "+s1+":"+s2)
s1 + s2
}
// 在driver端汇总
def combOp(c1: Int, c2: Int): Int = {
println("comb: "+c1+":"+c2)
c1 + c2
}
}
这段代码的主要目的就是为了求和。考虑到spark分区并行计算的特性,在每个分区独立加和,最后再汇总加和。
过程可以参考下面的图片:
首先看一下map阶段,即在每个分区内计算加和。初始情况如蓝色方块所示,内容为:
分区号:里面的内容
如,0分区内的数据为6和8
当执行seqop时,会说先用初始值0开始遍历累加,原理类似如下:
rdd.mapPartitions((it:Iterator)=>{
var sum = init_value // 默认为0
it.foreach(sum + _)
sum
})
因此屏幕上会出现下面的内容,由于分区之间是并行的,所以最后的结果是乱序的:
seq: 0:6
seq: 0:1
seq: 0:3
seq: 1:9
seq: 3:10
seq: 0:2
seq: 0:5
seq: 5:7
seq: 12:12
seq: 0:4
seq: 4:11
seq: 6:8
计算完成后,依次遍历每个分区结果,进行累加:
comb: 0:10
comb: 10:13
comb: 23:2
comb: 25:24
comb: 49:15
comb: 64:14
aggregate的源码也比较简单:
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())
val cleanSeqOp = sc.clean(seqOp)
val cleanCombOp = sc.clean(combOp)
val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
sc.runJob(this, aggregatePartition, mergeResult)
jobResult
}
treeAggregate
treeAggregate在aggregate的基础上做了一些优化,因为aggregate是在每个分区计算完成后,把所有的数据拉倒driver端,进行统一的遍历合并,这样如果数据量很大,在driver端可能会OOM。
因此treeAggregate在中间多加了一层合并。
先来看看代码,没有任何的变化:
import org.apache.spark.sql.SparkSession
object TreeAggregateTest {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]").appName("tf-idf").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
val rdd = spark.sparkContext.parallelize(1 to 12).repartition(6)
rdd.mapPartitionsWithIndex((index:Int,it:Iterator[Int])=>{
Array(s" $index : ${it.toList.mkString(",")}").toIterator
}).foreach(println)
val res1 = rdd.treeAggregate(0)(seqOp, combOp)
println(res1)
}
def seqOp(s1:Int, s2:Int):Int = {
println("seq: "+s1+":"+s2)
s1 + s2
}
def combOp(c1: Int, c2: Int): Int = {
println("comb: "+c1+":"+c2)
c1 + c2
}
}
输出的结果则发生了变化,首先分区内的操作不变:
3 : 3,10
2 : 2
0 : 6,8
1 : 1,9
4 : 4,11
5 : 5,7,12
seq: 0:3
seq: 0:6
seq: 3:10
seq: 6:8
seq: 0:2
seq: 0:1
seq: 1:9
seq: 0:4
seq: 4:11
seq: 0:5
seq: 5:7
seq: 12:12
...
在合并的时候发生了 变化:
comb: 10:13
comb: 23:24
comb: 14:2
comb: 16:15
comb: 47:31
配合下面的流程图,可以更好的理解:
搭配treeAggregate的源码来看一下:
def treeAggregate[U: ClassTag](zeroValue: U)(
seqOp: (U, T) => U,
combOp: (U, U) => U,
depth: Int = 2): U = withScope {
require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.")
if (partitions.length == 0) {
Utils.clone(zeroValue, context.env.closureSerializer.newInstance())
} else {
// 这里都没什么变化,在分区中遍历数据累加
val cleanSeqOp = context.clean(seqOp)
val cleanCombOp = context.clean(combOp)
val aggregatePartition =
(it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
var partiallyAggregated = mapPartitions(it => Iterator(aggregatePartition(it)))
// 关键是这下面的内容 !!!!
// 首先获得当前的分区数
var numPartitions = partiallyAggregated.partitions.length
// 计算合适的并行度,我这里相当于6^(1/2),也就是2.4左右,ceill向上取整后变成3.
// max(3,2)得到最后的结果为3。即每个树的分枝有3个叶子节点
val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2)
// 遍历分区,通过对scale取模进行合并计算
// 这里判断一下,当前的分区数是否还够分。如果少于条件值 scale+(p/scale),就停止分区
while (numPartitions > scale + math.ceil(numPartitions.toDouble / scale)) {
numPartitions /= scale
val curNumPartitions = numPartitions
// 重新定义分区id,并按照分区id重新分区,执行合并计算
partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex {
(i, iter) => iter.map((i % curNumPartitions, _))
}.reduceByKey(new HashPartitioner(curNumPartitions), cleanCombOp).values
}
// 最后统计结果
partiallyAggregated.reduce(cleanCombOp)
}
}
spark中的应用
// matrix求相似度
def columnSimilarities(threshold: Double): CoordinateMatrix = {
... columnSimilaritiesDIMSUM(computeColumnSummaryStatistics().normL2.toArray, gamma)
}
// 统计每一个向量的相关数据,里面包含了min max 等等很多信息
def computeColumnSummaryStatistics(): MultivariateStatisticalSummary = {
val summary = rows.treeAggregate(new MultivariateOnlineSummarizer)(
(aggregator, data) => aggregator.add(data),
(aggregator1, aggregator2) => aggregator1.merge(aggregator2))
updateNumRows(summary.count)
summary
}
了解了treeAggregate之后,后续就可以看matrix的并行求解相似度的源码了!敬请期待吧...
参考
r语言利器之ddply和aggregate
ddply和aggregate是两个用来整合数据的功能强大的函数。 aggregate(x,...) 关于aggregate()函数的使用在《R语言实战》中P105有简单描述,这里重新说一下。此函数主要有一下几种用法: ##DefaultS3method: aggregate(x,...) #... 查看详情
sparkmllib之vector向量深入浅出
SparkMLlib里面提供了几种基本的数据类型,虽然大部分在调包的时候用不到,但是在自己写算法的时候,还是很需要了解的。MLlib支持单机版本的localvectors向量和martix矩阵,也支持集群版本的matrix矩阵。他们背后使用的都是ScalaNLP中... 查看详情
sparkmllib学习分类算法之逻辑回归算法
SparkMLlib学习分类算法之逻辑回归算法(一),逻辑回归算法的概念(参考网址:http://blog.csdn.net/sinat_33761963/article/details/51693836) 逻辑回归与线性回归类似,但它不属于回归分析家族(主要为二分类),而属于分类家族... 查看详情
sparkmllib之使用breeze操作矩阵向量
这下面的练习中,需要自己将spark的jar包添加进来。在使用Breeze库时,需要导入相关包:importbreeze.linalg._importbreeze.numerics._ 具体练习如下:packageleaningimportbreeze.linalg._importbreeze.numerics._importbreeze.stats.distributions.Rand/*** 查看详情
sparkmllib分类算法之决策树学习
SparkMLlib分类算法之决策树学习(一)决策树的基本概念 决策树(DecisionTree)是在已知各种情况发生概率的基础上,通过构成决策树来求取净现值的期望值大于等于零的概率,评价项目风险,判断其可行性的决策分析方法... 查看详情
如何利用sparkmllib进行个性推荐?
在现今的推荐技术和算法中,最被大家广泛认可和采用的就是基于协同过滤的推荐方法。协同过滤(CollaborativeFiltering,简称CF)是利用集体智慧的一个典型方法。换句话说,就是借鉴和你相关人群的观点来进行推荐。MLlib中的协同过... 查看详情
14.sparkmllib之快速入门(代码片段)
简介??MLlib是Spark提供提供机器学习的库,专为在集群上并行运行的情况而设计。MLlib包含很多机器学习算法,可在Spark支持的所有编程语言中使用。??MLlib设计理念是将数据以RDD的形式表示,然后在分布式数据集上调用各种算法。... 查看详情
细说linq之aggregate
前言Linq中有关常见的方法我们已经玩的得心应手,而对于那些少用的却是置若罔闻(夸张了点),但只有在实际应用中绞尽脑汁想出的方法还不如内置的Linq方法来的实际和简洁之前在Ruby中演示了一段代码来讲述Ruby的神奇,下... 查看详情
sparkmllib之水塘抽样算法(reservoirsampling)(代码片段)
1.理解 问题定义可以简化如下:在不知道文件总行数的情况下,如何从文件中随机的抽取一行? 首先想到的是我们做过类似的题目吗?当然,在知道文件行数的情况下,我们可以很容易的用C运行库的rand函数随机的获得一... 查看详情
线段树单点修改区间修改单点查询值区间查询最大值最小值区间和之模板(代码片段)
...左儿子3#definersonrt<<1|1//==rt*2+1右儿子4#defineint_midintmid=tree[rt].l+tree[rt].r>>15inta[N];//初始值6structnode7intl,r;8llval,lazy;9tree[N*4];10voidpush_up(intrt)11//tree[rt].val=min(tree[lson].val,tree[rson].val);12//tree[rt].val=max(tree[lson].val,tree[rson].val);13tree[rt... 查看详情
easyui扩展之tree的simpledata加载
实例化。这里增加了三个属性,可以指定idFiled,textFiled和parentField。所以这里的simpleData可以不严格转换成tree的数据格式。 $(function(){ $(‘#tt3‘).tree({ checkbox:true, url:‘tree_data_simp.php‘, parentField:"pid", textFiled:"name", 查看详情
sparkmllib学习之线性回归
SparkMLlib学习之线性回归(一)回归的概念 1,回归与分类的区别 分类模型处理表示类别的离散变量,而回归模型则处理可以取任意实数的目标变量。但是二者基本的原则类似,都是通过确定一个模型,将输入特征映射... 查看详情
sparkmllib数据类型
MLlib支持几种数据类型:本地向量(localvectors),和存储在本地或者基于RDD的分布式矩阵(matrices)。底层的线性代数转换操作是基于Breeze和jblas实现的。在MLlib中有监督学习算法使用的训练样本数据类型被称为“带标签的点(labeledp... 查看详情
sparkmllib和ml类里面的区别
mllib是老的api,里面的模型都是基于RDD的,模型使用的时候api也是有变化的(model这里是naiveBayes),(1:在模型训练的时候是naiveBayes.run(data:RDD[LabeledPoint])来训练的,run之后的返回值是一个NaiveBayesModel对象,就可以使用NaiveBayesModel.p... 查看详情
sparkmllib数据类型(代码片段)
MLlib支持几种数据类型:本地向量(localvectors),和存储在本地或者基于RDD的分布式矩阵(matrices)。底层的线性代数转换操作是基于Breeze和jblas实现的。在MLlib中有监督学习算法使用的训练样本数据类型被称为“带标签的... 查看详情
easyui之tree组件与treegrid组件
...做的截图:虽然,后台的逻辑上并不难,但是这个EasyUI的tree组件和treegrid组件,确实让我废了一大把力气》》好了废话不多说,开始正题:一:tree组件每个节点都具备以下属性:id:节点ID,对加载远程数据很重要。text:显示节... 查看详情
sparkmllib中分类和回归算法
SparkMLlib中分类和回归算法:-分类算法: pyspark.mllib.classification -朴素贝叶斯Naive... 查看详情
sparkmllib介绍
SparkMLlib介绍Spark之所以在机器学习方面具有得天独厚的优势,有以下几点原因:(1)机器学习算法一般都有很多个步骤迭代计算的过程,机器学习的计算需要在多次迭代后获得足够小的误差或者足够收敛才会停止,迭代时如果... 查看详情