05rdd(代码片段)

一条coding 一条coding     2022-12-06     224

关键词:

大家好,我是一条~

5小时推开Spark的大门,最后一小时,聊聊提了这么久的RDD。

话不多说,开干!

什么是RDD

Spark为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于处理不同的应用场景。分别是:

  • RDD : 弹性分布式数据集

  • 累加器:分布式共享只写变量

  • 广播变量:分布式共享只读变量

当前的很多框架对迭代式算法场景与交互性数据挖掘场景的处理性能非常差, 这个是RDD的提出的动机。

接下来我们重点看看RDD是如何在数据处理中使用的。

它代表一个不可变、只读的,被分区的数据集。操作 RDD 就像操作本地集合一样,有很多的方法可以调用,使用方便,而无需关心底层的调度细节。

五大特性

RDD总共有五个特征,三个基本特征,两个可选特征。

  • 分区(partition):有一个数据分片列表,可以将数据进行划分,切分后的数据能够进行并行计算,是数据集的原子组成部分。
  • 函数(compute):对于每一个分片都会有一个函数去迭代/计算执行它。
  • 依赖(dependency):每一个RDD对父RDD有依赖关系,源RDD没有依赖,通过依赖关系建立来记录它们之间的关系。
  • 优先位置(可选):每一个分片会优先计算位置(prefered location)。即要执行任务在哪几台机器上好一点(数据本地性)。
  • 分区策略(可选):对于key-value的RDD可以告诉它们如何进行分片。可以通过repartition函数进行指定。

执行原理

从计算的角度来讲,数据处理过程中需要计算资源(内存 & CPU)和计算模型(逻辑),执行时,需要将计算资源和计算模型进行协调和整合。

Spark 框架在执行时,先申请资源,然后将应用程序的数据处理逻辑分解成一个一个的计算任务。然后将任务发到已经分配资源的计算节点上, 按照指定的计算模型进行数据计算。最后得到计算结果。

创建RDD

在 Spark 中创建 RDD 的创建方式可以分为四种。

打开IDEA,创建一个Scala Class。

1.从内存中创建RDD

Spark 主要提供了两个方法:parallelizemakeRDD

import org.apache.spark.SparkConf, SparkContext

object Rdd 
  def main(args: Array[String]): Unit = 
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
    val sparkContext = new SparkContext(sparkConf)
    val rdd1 = sparkContext.parallelize(
      List(1,2,3,4)
    )
    val rdd2 = sparkContext.makeRDD(
      List(1,2,3,4)
    )
    rdd1.collect().foreach(println)
    rdd2.collect().foreach(println)
    sparkContext.stop()
  

输出结果

从底层代码实现来讲,makeRDD方法其实就是parallelize方法。

def makeRDD[T: ClassTag](

	seq: Seq[T],

	numSlices: Int = defaultParallelism): RDD[T] = withScope 

	parallelize(seq, numSlices)


2.从外部存储(文件)创建 RDD

由外部存储系统的数据集创建 RDD 包括:本地的文件系统,所有 Hadoop 支持的数据集,比如 HDFS、HBase 等。

和第二节提到的读取文件,统计有多少行是一样的。如果Windows系统执行如下代码出现问题,可回顾一下第二节去Spark Shell创建。

import org.apache.spark.rdd.RDD
import org.apache.spark.SparkConf, SparkContext

object Rdd 
  def main(args: Array[String]): Unit = 
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
    val sparkContext = new SparkContext(sparkConf)
    val fileRDD = sparkContext.textFile("src/main/java/test.txt")
    fileRDD.collect().foreach(println)
    sparkContext.stop()
  

输出结果

3.从其他RDD创建

主要是通过一个 RDD 运算完后,再产生新的 RDD。

4.直接创建RDD

使用 new 的方式直接构造 RDD,一般由 Spark 框架自身使用。

最后

恭喜坚持到这里的各位同学,通过5天约5个小时的学习,同学们对Spark有个简单的了解,还完成了大数据入门经典案例——WordCount。

但是,想要学好Spark仍然任重而道远,送给同学们我本人很喜欢的一句话:

道阻且长,行则将至。

流水不争先,争的是川流不息。

感谢各位5天的支持,在此谢过!最后,祝同学们新年快乐!

spark之rdd编程(代码片段)

  Spark对数据的核心抽象——弹性分布式数据集(ResilientDistributedDataset),简称RDD。RDD其实就是分布式的元素集合。在Spark中,对数据的所有操作不外乎创建RDD、转化已有RDD以及调用RDD操作进行求值。而在这背后,Spark会... 查看详情

RDD 分区问题

...sue【发布时间】:2021-05-2913:05:32【问题描述】:使用以下代码读取csv文件rdd=(sc.sparkContext.textFile("./test123.txt").flatMap(lambdaline:line.split("\\n\\r")).map(lambdax:x.split("|")))在运行上面的代码时,spark只创建了一个分区(在我的本 查看详情

rdd转换算子与操作算子(代码片段)

一、RDD算子分类1.RDD算子分类及概述  RDD的算子分为Transformation和Action两类,Transformation是延迟执行,Action是立即执行。Transformation和Action本质上的区别是,Transformation是从一个RDD到一个RDD,Action是从一个RDD到一个值。由下图可... 查看详情

045rdd与dataframe互相转换(代码片段)

一:RDD与DataFrame互相转换1.总纲   二:DataFrame转换为RDD1.rdd  使用schema可以获取DataFrame的schema  使用rdd可以获取DataFrame的数据 三:RDD转换为DataFrame1.第一种方式  使用反射,  RDD的数据类型必须是caseclass。1impor... 查看详情

spark编程模型(上)(代码片段)

初识RDD什么是RDD?定义:Resilientdistributeddatasets(RDD),anefficient,general-purposeandfault-tolerantabstractionforsharingdatainclusterapplications.  RDD是只读的。    RDD是分区记录的集合。  RDD是容错的。---lineageRDD是高效的。  RDD不 查看详情

sparkcore之rdd编程(代码片段)

  spark提供了对数据的核心抽象——弹性分布式数据集(ResilientDistributedDataset,简称RDD)。RDD是一个分布式的数据集合,数据可以跨越集群中的多个机器节点,被分区并行执行。  在spark中,对数据的所有操作不外... 查看详情

rdd的分区依赖关系机制(代码片段)

目录一.RDD的分区二.RDD的依赖关系三.RDD机制1、使用persist()方法对RDD进行持久化​编辑2、使用cache()方法对RDD进行持久化​编辑三、容错机制一.RDD的分区RDD的分区原则是分区的个数尽量等于集群中的CPU核心(Cor... 查看详情

sparkcore应用解析(代码片段)

一。RDD概念1.1。RDD概述1.1.1。什么是RDD  RDD(ResilientDistributedDataset)叫做分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。在Spark中,对数据的所有操作不外乎创建RDD、转... 查看详情

spark-rdd持久化(代码片段)

spark非常重要的一个功能特性就是可以将RDD持久化在内存中。当对进行持久化操作时,每个节点都会将自己的操作的RDD的partition持久化到内存中,并且在之后对该RDD的反复使用中,直接使用内存缓存的partition.这样的话,对于针... 查看详情

rdd基本操作之action(代码片段)

Action介绍在RDD上计算出来一个结果把结果返回给driverprogram或保存在文件系统,count(),save常用的Actionreduce()接收一个函数,作用在RDD两个类型相同的元素上,返回新元素。可以实现,RDD中元素的累加,计数,和其它类型的聚集操作... 查看详情

spark小案例——rdd,broadcast(代码片段)

RDD小案例objectRDD01defmain(args:Array[String]):Unit=valsparkConf:SparkConf=newSparkConf().setMaster("local[*]").setAppName("RDD01")valsc:SparkContext=newSparkContext(sparkC 查看详情

spark小案例——rdd,broadcast(代码片段)

RDD小案例objectRDD01defmain(args:Array[String]):Unit=valsparkConf:SparkConf=newSparkConf().setMaster("local[*]").setAppName("RDD01")valsc:SparkContext=newSparkContext(sparkCon 查看详情

rdd转dataframe(代码片段)

frompyspark.sqlimportSparkSession,Rowfrompyspark.sql.typesimportStructField,StructType,StringType,IntegerType,LongTypedata=[(‘Alex‘,‘male‘,3),(‘Nancy‘,‘female‘,6),[‘Jack‘,‘male‘,9]]#mixedrdd_=spark.sp 查看详情

pyspark的rdd代码纪录(代码片段)

pysparkrdd.py文件代码纪录代码版本为spark2.2.01.RDD及常见算子classRDD():#这里简单介绍几个典型的算子,其余的算子代码可以自己去看一看def__init__(self,jrdd,ctx,jrdd_deserializer=AutoBatchedSerializer(PickleSerializer())):"""_jrdd是个非常重要的属性,这... 查看详情

大数据技术之_19_spark学习_05_sparkgraphx应用解析小结(代码片段)

==========SparkGraphX概述==========1、SparkGraphX是什么?  (1)SparkGraphX是Spark的一个模块,主要用于进行以图为核心的计算还有分布式图的计算。  (2)GraphX他的底层计算也是RDD计算,它和RDD共用一种存储形态,在展示... 查看详情

spark几种调优方式(代码片段)

1、避免创建重复的RDD和不必要的内存空间浪费错误代码:valrdd1=sc.textFile("D:abcwordcountinputhello.txt")rdd1.map(...)valrdd2=sc.textFile("D:abcwordcountinputhello.txt")rdd2.reduce(...)错误解析:这种情况下,Spark需要从文件中加载两次hello.txt文件的内容... 查看详情

spark快速入门之rdd编程模型(代码片段)

RDD(ResilientDistributedDataset)叫做分布式数据集,是Spark中最基本的数据抽象,它代表一个可分区,不可变,里面的元素可并行计算的结合。RDD具有自动容错、位置感知性调度和可伸缩性。RDD允许用户在执行多个查询时显示地将工作... 查看详情

spark学习rdd自定义分区和缓存(代码片段)

一,简介二,自定义分区规则  2.1 普通的分组TopN实现  2.2 自定义分区规则TopN实现三,RDD的缓存  3.1RDD缓存简介  3.2RDD缓存方式    正文一,简介  在之前的文章中,我们知道RDD的有一个特征:... 查看详情