mongosparkconnector中的分区器(代码片段)

softlin softlin     2022-12-01     303

关键词:

??MongoSpark为入口类,调用MongoSpark.load,该方法返回一个MongoRDD类对象,Mongo Spark Connector框架本质上就是一个大号的自定义RDD,加了些自定义配置、适配几种分区器规则、Sql的数据封装等等,个人认为相对核心的也就是分区器的规则实现;弄清楚了其分析器也就搞明白了Mongo Spark Connector 。

当前实现的分区器(Partitioner):

??MongoPaginateByCountPartitioner 基于总数的分页分区器
??MongoPaginateBySizePartitioner 基于大小的分页分区器
??MongoSamplePartitioner 基于采样的分区器
??MongoShardedPartitioner 基于分片的分区器
??MongoSinglePartitioner 单分区分区器
??MongoSplitVectorPartitioner 基于分割向量的分区器

??这里根据源码简单介绍MongoSinglePartitioner与MongoSamplePartitioner分区器,这或许就是用得最多的两种分区器,他的默认分区器(DefaultMongoPartitioner)就是MongoSamplePartitioner分区器;
该分区默认的PartitionKey为_id、默认PartitionSizeMB为64MB、默认每个分区采样为10;

MongoSamplePartitioner

??该类的核心也是唯一的方法为:partitions方法,下面为该方法的执行流程与核心逻辑;
??1、检查执行buildInfo指令检查Mongo版本用于判断是否支持随机采样聚合运算,版本大于3.2。 hasSampleAggregateOperator方法。Mongo3.2版本中才新增了数据采样功能。
??Mongodb中的语法为:

db.cName.aggregate([
  $sample: size: 10  
])

??上示例N等于10,如果N大于collection中总数据的5%,那么$sample将会执行collection扫描、sort,然后选择top N条文档;如果N小于5%,对于wiredTiger而言则会遍历collection并使用“伪随机”的方式选取N条文档,对于MMAPv1引擎则在_id索引上随机选取N条文档。
??2、执行collStats,用于获取集合的存储信息,如行数、大小、存储大小等等信息;
??matchQuery: 过滤条件
??partitionerOptions: ReadConfig传进去的分析器选项
??partitionKey: 分区key,默认为_id
??partitionSizeInBytes: 分区大小,默认64MB
??samplesPerPartition: 每个分区默认采样数量,默认10
??count: 集合总条数
??avgObjSizeInBytes: 对象平均字节数
??numDocumentsPerPartition: 每个分区文档数, ??partitionSizeInBytes / avgObjSizeInBytes:分区大小/对象平均大小
??numberOfSamples: 采样数量,samplesPerPartition * count / numDocumentsPerPartition,每个分区采样数*集合总数/每个分区文档数

技术图片

??如每个分区文档数大于集合总文档数,则将直接创建单分区,不采取采样数据方式创建分区,因为此时数据量太少单个分区已经可以容得下无需多个分区;

一、创建单分区

??在MongoSinglePartitioner类中通过PartitionerHelper.createPartitions执行相关逻辑;
??_id作为partitionKey,

二、通过采样数据创建分区

技术图片

??指定采样条件、采样数据量、PartitionKey、排序条件等,获取采样数据;
集合拆分:

def collectSplit(i: Int): Boolean = (i % samplesPerPartition == 0) || !matchQuery.isEmpty && i == count – 1

右侧边界:

val rightHandBoundaries = samples.zipWithIndex.collect 
case (field, i) if collectSplit(i) => field.get(partitionKey)

??获取右侧边界,使用采样数据数组索引对每个分区采样数求余等于0对采样数据进行过滤取右侧边界(如匹配条件不为空则再取最后一条数据);
??如采样得到62条数据,并且没有存在匹配条件,根据上述的采样数据过滤条件最后取得7条数据,分别为数据数组索引为0、索引为10、20、30、40、50、60的7条数据,数据的值为PartitionKey默认就是集合中_id字段的值;

技术图片

创建分区(Partitions)

技术图片

??获取得到PartitionKey、rightHandBoundaries后就可以调用PartitionerHelper.createPartitions创建Partition;下面为创建Partition的具体逻辑;
??使用PartitionKey创建查询边界,每个分区具有不同的查询边界,有最大、最小边界; 此处创建分区Partition并在每个分区中指定了查询边界;
??上面获取得到了7条数据,此处将创建8个分区;下面给出了简单数据用于说明该分区边界条件的基本逻辑与实现;

??1、创建Min、1、3、5、7、9、11、13、Max的序列
??2、创建1、3、5、7、9、11、13、Max序列
??3、使用zip将两个序列拉链式的合并:合并后的数据为:
??4、Min,1、1,3、3,5、5,7、7,9、9,11、11,13、13,Max

??Partition的边界条件将使用上面的边界条件,8条数据八个Partition一个对应;
??0 Partition的边界条件为:小于1
??1 Partition的边界条件为:大于等于1 小于 3
??2 Partition的边界条件为:大于等于3 小于 5
??3 Partition的边界条件为:大于等于5 小于 7
??4 Partition的边界条件为:大于等于7小于 9
??5 Partition的边界条件为:大于等于9 小于 11
??6 Partition的边界条件为:大于等于11 小于 13
??7 Partition的边界条件为:大于等于13
??上面的8个Partition为8个MongoPartition对象,每个对象的index、查询边界与上面所说的一一对应;
??在MongoRDD类的compute方法中可以看到根据对应的分区与上面创建分区时所建立的边界条件用于计算(从Mongo中获取对应数据);

技术图片

MongoSinglePartitioner

??创建单分区分区器时,直接调用PartitionerHelper.createPartitions方法创建分区,该类并无其他逻辑,并且固定的PartitionKey为_id,右侧边界条件为空集合,然后创建id为0的MongoPartition对象,并无查询边界;





































使用自定义分区器对 Pyspark 中的数据框进行分区

】使用自定义分区器对Pyspark中的数据框进行分区【英文标题】:PartitioningofDataFrameinPysparkusingCustomPartitioner【发布时间】:2018-10-1307:45:23【问题描述】:寻找有关在Pyspark中使用自定义分区程序的一些信息。我有一个数据框,其中... 查看详情

apache spark中的自定义分区器

】apachespark中的自定义分区器【英文标题】:custompartitionerinapachespark【发布时间】:2016-02-0220:33:41【问题描述】:我正在学习《学习火花:闪电般的大数据分析》一书中的示例://custompartitionerclassDomainNamePartitioner(numParts:Int)extendsP... 查看详情

是否可以使用“$”装饰器访问标准 SQL 中的 BigQuery 分区?

】是否可以使用“$”装饰器访问标准SQL中的BigQuery分区?【英文标题】:IsitpossibletoaccessaBigQuerypartitioninStandardSQLusingthe\'$\'decorator?【发布时间】:2016-11-1617:31:09【问题描述】:在GoogleBigQuery中,我在使用标准SQL查询分区表时尝试... 查看详情

多维数据集分区:DSV 设计器中的事实表只是多维数据集分区之一吗?

】多维数据集分区:DSV设计器中的事实表只是多维数据集分区之一吗?【英文标题】:cubepartitions:isthefacttableintheDSVdesignerjustoneofthecubepartitions?【发布时间】:2009-07-0118:03:53【问题描述】:一个事实表有16个分区。它们全部16具有... 查看详情

通过自定义分区器对雪花中的大表进行分区

】通过自定义分区器对雪花中的大表进行分区【英文标题】:Partitioninglargetableinsnowflakethroughacustompartitioner【发布时间】:2020-07-0802:03:27【问题描述】:我们有一张雪花大表,里面有超过550亿条记录。用户通过提供YEAR和SERIAL_NUMBER... 查看详情

spark自定义分区器实现(代码片段)

...tioner,注意这个partitioner是spark的partitioner2、重写partitioner中的方法 ov 查看详情

BigQuery、日期分区表和装饰器

...NTIME的伪列。使用date装饰器语法,您可以将记录添加到表中的某个分区。我想知道在幕后是否也使用伪列_PARTITIO 查看详情

列中的 BigQuery 日期分区

】列中的BigQuery日期分区【英文标题】:BigQuerydatepartitionfromcolumn【发布时间】:2017-03-0511:20:54【问题描述】:我试图了解如何从包含多天数据的csv文件中加载日期分区表。我想我正在寻找一个类似的特性关系数据库提供它们在特... 查看详情

Big Query 中的表未分区

】BigQuery中的表未分区【英文标题】:TableisnotpartitionedinBigQuery【发布时间】:2021-07-1910:31:11【问题描述】:BigQuery中的表表示它已对表进行了分区,但数据并未根据日期存储在不同的表中。TablenotPartitionedExpectedPartitionedTable【问题... 查看详情

[qnx自适应分区用户指南]1.4系统架构-自适应分区线程调度器

...eutrino架构之上设计的,主要用于解决嵌入式系统设计中的下面问题:在系统过载时保证指定的最小CPU时间份额。防止不重要或不可信的引用程序独占系统资源。更多信息参考theAdaptivePartitioning User'sGuide。 查看详情

如何修复 hadoop 中的“非法分区”错误?

】如何修复hadoop中的“非法分区”错误?【英文标题】:Howtofixthe"Illegalpartition"errorinhadoop?【发布时间】:2013-02-2219:14:48【问题描述】:我已经编写了一个自定义分区器。当我的减少任务数量大于1时,作业失败。这是我得... 查看详情

MapReduce 中的自定义动态分区

】MapReduce中的自定义动态分区【英文标题】:CustomDynamicPartitionsinMapReduce【发布时间】:2019-02-0619:38:31【问题描述】:我正在使用MapReduce来处理我的数据。我需要将输出存储在日期分区下。我的排序键是日期字符串。现在,如果... 查看详情

spark自定义分区器

1、spark中默认的分区器:  Spark目前支持Hash分区和Range分区,用户也可以自定义分区,Hash分区为当前的默认分区,Spark中分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle过程属于哪个分区和Reduce的个数。只有Key-Value... 查看详情

Hadoop 分区器

】Hadoop分区器【英文标题】:Hadooppartitioner【发布时间】:2014-12-2200:14:44【问题描述】:我想问一下,它是在Mappers中实现的吗?如何衡量使用默认哈希分区器的性能-是否有更好的分区器来减少数据倾斜?谢谢【问题讨论】:【... 查看详情

如果在 Hadoop Map Reduce 中定义了自定义分区器,默认哈希分区器是不是仍然有效?

】如果在HadoopMapReduce中定义了自定义分区器,默认哈希分区器是不是仍然有效?【英文标题】:DoesthedefaulthashpartitionerstillworkifacustompartitionerisdefinedinHadoopMapReduce?如果在HadoopMapReduce中定义了自定义分区器,默认哈希分区器是否仍... 查看详情

组合器和分区器之间的区别

】组合器和分区器之间的区别【英文标题】:Differencebetweencombinerandpartitioner【发布时间】:2016-07-2508:26:22【问题描述】:我是MapReduce的新手,我只是无法弄清楚分区器和组合器的区别。我知道两者都在map和reduce任务之间的中间步... 查看详情

[qnx自适应分区用户指南]8自适应分区线程调度器和其他线程调度器

[QNX自适应分区用户指南]目录优先级和线程调度策略仅与一个自适应分区相关;如果线程调度器需要平衡预算,则优先级顺序在分区内受到遵守,而在分区之间则无法被遵守。您可以将线程调度程序与现有的FIFO、循环和零星调度... 查看详情

为分区器输出单个文件

】为分区器输出单个文件【英文标题】:Outputtingsinglefileforpartitioner【发布时间】:2014-06-1710:26:54【问题描述】:尝试获得与键数一样多的reducerpublicclassCustomPartitionerextendsPartitioner<Text,Text>publicintgetPartition(Textkey,Textvalue,intnumRed... 查看详情