实验7spark初级编程实践(代码片段)

Z.Q.Fengᯤ⁵ᴳ Z.Q.Fengᯤ⁵ᴳ     2023-03-04     183

关键词:


一、实验目的

  1. 掌握使用 Spark 访问本地文件和 HDFS 文件的方法
  2. 掌握 Spark 应用程序的编写、编译和运行方法

二、实验平台

  1. 操作系统:Ubuntu18.04(或Ubuntu16.04)
  2. Spark版本:3.2.0
  3. Hadoop版本:3.3.2

三、实验步骤

1. 准备工作

(1)安装spark及其API程序

安装 sparkUbuntu下安装Spark3.2.0教程
安装 sbtUbuntu下为Spark安装配置sbt

(2)配置环境变量

修改你的 .bashrc 文件:

vim ~/.bashcrc

添加以下内容至文件顶部:

export SPARK_HOME=/usr/local/spark
export PATH=$PATH:$SPARK_HOME/bin

使修改立即生效:

source ~/.bashrc

(3)启动Hadoop

进入 Hadoop 安装目录:

cd /usr/local/hadoop

启动并使用 jps 检查结点:

./sbin/start-dfs.sh
jps

2. Spark读取文件系统的数据

(1)在spark-shell中读取Linux系统本地文件“/home/hadoop/test.txt”,然后统计出文件的行数;
创建 test.txt:

echo -e "Hello\\nThis is a test\\nBye!" >> ~/test.txt

启动 spark-shell

cd  /usr/local/spark
./bin/spark-shell

Scala 命令:

val textFile=sc.textFile("file:///home/hadoop/test.txt")
textFile.count()

输出如下:

scala> val textFile=sc.textFile("file:///home/hadoop/test.txt")
textFile: org.apache.spark.rdd.RDD[String] = file:///home/hadoop/test.txt MapPartitionsRDD[1] at textFile at <console>:23

scala> textFile.count()
res0: Long = 3

(2)在spark-shell中读取HDFS系统文件“/user/hadoop/test.txt”(如果该文件不存在,请先创建),然后,统计出文件的行数;

上传 test.txt 文件至 HDFS 中(终端中执行,退出 spark-shell):

/usr/local/hadoop/bin/hdfs dfs -put ~/test.txt

Scala 命令如下(spark-shell):

val textFile=sc.textFile("hdfs://localhost:9000/user/hadoop/test.txt")
textFile.count()

输出如下:

scala> val textFile=sc.textFile("hdfs://localhost:9000/user/hadoop/test.txt")
textFile: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/user/hadoop/test.txt MapPartitionsRDD[3] at textFile at <console>:23

scala> textFile.count()
res1: Long = 3

(3)编写独立应用程序(推荐使用Scala语言),读取HDFS系统文件“/user/hadoop/test.txt”(如果该文件不存在,请先创建),然后,统计出文件的行数;通过sbt工具将整个应用程序编译打包成 JAR包,并将生成的JAR包通过 spark-submit 提交到 Spark 中运行命令。

进入 spark 安装目录:

cd /usr/local/spark
mkdir mycode && cd mycode

创建 HDFStest 目录并编写 Scala 文件:

mkdir -p HDFStest/src/main/scala
vim ./HDFStest/src/main/scala/HDFStest.scala

代码如下:

/* HDFStest.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
 
object HDFStest 
    def main(args: Array[String]) 
        val logFile = "hdfs://localhost:9000/user/hadoop/test.txt"
        val conf = new SparkConf().setAppName("Simple Application")
        val sc = new SparkContext(conf)
        val logData = sc.textFile(logFile, 2)
        val num = logData.count()
        printf("The num of this file is %d\\n", num)
    

进入 HDFStest 目录,创建 simple.sbt

cd HDFStest
vim simple.sbt

内容如下:

name := "A Simple HDFS Test"
version := "1.0"
scalaVersion := "2.12.15"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.2.0"

注意这里的 scalaVersion 是你的 Scala 版本,spark-core 是你的 spark 版本。接下来,可以通过如下代码将整个应用程序打包成 JAR:

/usr/local/sbt/sbt package

打包成功输出如下:

运行如下代码使用生成的 jar 包:

/usr/local/spark/bin/spark-submit  --class  "HDFStest" /usr/local/spark/mycode/HDFStest/target/scala-2.12/a-simple-hdfs-test_2.12-1.0.jar 2>& 1 | grep The

输出如下:

3. 编写独立应用程序实现数据去重

对于两个输入文件 A 和 B,编写 Spark 独立应用程序(推荐使用 Scala 语言),对两个文件进行合并,并剔除其中重复的内容,得到一个新文件 C。下面是输入文件和输出文件的一个样例,供参考。

输入文件 A 的样例如下:

20170101 x
20170102 y
20170103 x
20170104 y
20170105 z
20170106 z

输入文件 B 的样例如下:

20170101 y
20170102 y
20170103 x
20170104 z
20170105 y

根据输入的文件 A 和 B 合并得到的输出文件 C 的样例如下:

20170101 x
20170101 y
20170102 y
20170103 x
20170104 y
20170104 z
20170105 y
20170105 z
20170106 z

进入到 mycode 目录,新建 RemDup 目录,

cd /usr/local/spark/mycode
mkdir -p RemDup/src/main/scala
cd RemDup

新建 datas 目录,写入文件 A 和文件 B:

mkdir datas

注意这里 A 和 B 文件内容不能有多余的换行符或者空格!

vim ./datas/A
vim ./datas/B

编写 Scala 文件:

vim ./src/main/scala/RemDup.scala

代码如下:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.HashPartitioner

object RemDup 
    def main(args: Array[String]) 
        val conf = new SparkConf().setAppName("RemDup")
        val sc = new SparkContext(conf)
        val dataFile = "file:///usr/local/spark/mycode/RemDup/datas"
        val data = sc.textFile(dataFile,2)
        val res = data.filter(_.trim().length>0).map(line=>(line.trim,"")).partitionBy(new HashPartitioner(1)).groupByKey().sortByKey().keys
        res.saveAsTextFile("file:///usr/local/spark/mycode/RemDup/result")
    

编写 simple.sbt 文件:

vim simple.sbt

内容如下:

name := "Remove Duplication"
version := "1.0"
scalaVersion := "2.12.15"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.2.0"

使用如下命令打包:

/usr/local/sbt/sbt package

使用生成的 jar 包:

/usr/local/spark/bin/spark-submit --class "RemDup"  /usr/local/spark/mycode/RemDup/target/scala-2.12/remove-duplication_2.12-1.0.jar

使用如下命令查看输出:

cat result/*

输出如下:

4. 编写独立应用程序实现求平均值问题

每个输入文件表示班级学生某个学科的成绩,每行内容由两个字段组成,第一个是学生名字,第二个是学生的成绩;编写 Spark 独立应用程序求出所有学生的平均成绩,并输出到一个新文件中。下面是输入文件和输出文件的一个样例,供参考。

Algorithm 成绩:

小明 92
小红 87
小新 82
小丽 90

Database 成绩:

小明 95
小红 81
小新 89
小丽 85

Python 成绩:

小明 82
小红 83
小新 94
小丽 91

平均成绩如下:

(小红,83.67)
(小新,88.33)
(小明,89.67)
(小丽,88.67)

进入到 mycode 目录,新建 AvgScore 目录,

cd /usr/local/spark/mycode
mkdir -p AvgScore/src/main/scala
cd AvgScore

新建 datas 目录,写入文件 algorithm、database、python:

mkdir datas

注意这里 algorithm、database 和 python 文件内容不能有多余的换行符或者空格!

vim ./datas/algorithm
vim ./datas/database
vim ./datas/python

编写 Scala 文件:

vim ./src/main/scala/AvgScore.scala

代码如下:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.HashPartitioner

object AvgScore 
    def main(args: Array[String]) 
        val conf = new SparkConf().setAppName("AvgScore")
        val sc = new SparkContext(conf)
        val dataFile = "file:///usr/local/spark/mycode/AvgScore/datas"
        val data = sc.textFile(dataFile,3)

       val res = data.filter(_.trim().length>0).map(line=>(line.split(" ")(0).trim(),line.split(" ")(1).trim().toInt)).partitionBy(new HashPartitioner(1)).groupByKey().map(x => 
       	   	var n = 0
	       	var sum = 0.0
	       	for(i <- x._2)
				sum = sum + i
	       		n = n +1
    	    
	        val avg = sum/n
    	    val format = f"$avg%1.2f".toDouble
    	    (x._1,format)
	    )
       res.saveAsTextFile("file:///usr/local/spark/mycode/AvgScore/result")
    

编写 simple.sbt 文件:

vim simple.sbt

内容如下:

name := "Average Score"
version := "1.0"
scalaVersion := "2.12.15"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.2.0"

使用如下命令打包:

/usr/local/sbt/sbt package

使用生成的 jar 包:

/usr/local/spark/bin/spark-submit --class "AvgScore"  /usr/local/spark/mycode/AvgScore/target/scala-2.12/average-score_2.12-1.0.jar

使用如下命令查看输出:

cat result/*

输出如下:


四、实验总结

第五周周二练习:实验5sparksql编程初级实践(代码片段)

1.题目:源码:importjava.util.Propertiesimportorg.apache.spark.sql.types._importorg.apache.spark.sql.Rowimportorg.apache.spark.sql.SparkSessionimportorg.apache.spark.sql.DataFrameReaderobjectTestMySQLdefma 查看详情

实验5mapreduce初级编程实践——编程实现文件合并和去重操作(代码片段)

一、实验目的通过实验掌握基本的MapReduce编程方法;掌握用MapReduce解决一些常见的数据处理问题,包括数据去重、数据排序和数据挖掘等。二、实验平台操作系统:Linux(建议Ubuntu16.04或Ubuntu18.04)Hadoop版本ÿ... 查看详情

实验4rdd编程初级实践(代码片段)

注意:spark的编码格式是utf-8,其他的格式会有乱码,所以文件要使用utf-8编码pom.xml:<?xmlversion="1.0"encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.o... 查看详情

实验5mapreduce初级编程实践——对给定的表格进行信息挖掘(代码片段)

一、实验目的通过实验掌握基本的MapReduce编程方法;掌握用MapReduce解决一些常见的数据处理问题,包括数据去重、数据排序和数据挖掘等。二、实验平台操作系统:Linux(建议Ubuntu16.04或Ubuntu18.04)Hadoop版本ÿ... 查看详情

实验5mapreduce初级编程实践(python实现)(代码片段)

一、实验目的通过实验掌握基本的MapReduce编程方法;掌握用MapReduce解决一些常见数据处理问题的方法,包括数据合并、数据去重、数据排序和数据挖掘等。二、实验平台操作系统:Ubuntu18.04(或Ubuntu16.04)Hadoop... 查看详情

《移动项目实践》实验报告——初级控件(代码片段)

实验目的1、掌握Android屏幕显示与初级视图的相关知识;2、掌握包括屏幕显示基础、简单布局的用法、简单控件的用法、简单图形的用法实验内容以windows上的计算器为例,程序界面如下图所示:完成精简之后的Android... 查看详情

实验5mapreduce初级编程实践——编程实现文件合并和去重操作(代码片段)

一、实验目的通过实验掌握基本的MapReduce编程方法;掌握用MapReduce解决一些常见的数据处理问题,包括数据去重、数据排序和数据挖掘等。二、实验平台操作系统:Linux(建议Ubuntu16.04或Ubuntu18.04)Hadoop版本ÿ... 查看详情

rdd编程初级实践(代码片段)

RDD编程初级实践一、pyspark交互式编程二、编写独立应用程序实现数据去重三、编写独立应用程序实现求平均值问题一、pyspark交互式编程本作业提供分析数据data.txt,该数据集包含了某大学计算机系的成绩,数据格式如下... 查看详情

rdd编程初级实践(代码片段)

...Jim,DataBase,90Jim,Algorithm,60Jim,DataStructure,80……请根据给定的实验数据,在pyspark中通过编程来计算以下内容:(1)该系总共有多少学生;代码如下:lines=sc.textFile("file:///usr/local/spark/sparksqldata/data.txt")r... 查看详情

rdd编程初级实践

RDD编程初级实践一、实验目的(1)熟悉Spark的RDD基本操作及键值对操作;(2)熟悉使用RDD编程解决实际具体问题的方法。二、实验平台操作系统:Ubuntu16.04Spark版本:2.4.0Python版本:3.4.3三、实验内... 查看详情

2021-06-09*rdd编程初级实践(代码片段)

RDD编程初级实践一、数据来源描述pyspark交互式编程科任老师提供分析数据data.txt,该数据集包含了某大学计算机系的成绩,数据格式如下所示:Tom,DataBase,80Tom,Algorithm,50Tom,DataStructure,60Jim,DataBase,90Jim,Algorithm,60Jim,DataStruct... 查看详情

实验5sparksql编程初级实践

SparkSQL基本操作 (1)查询所有数据; (2)查询所有数据,并去除重复的数据; (3)查询所有数据,打印时去除id字段; (4)筛选出age>30的记录; (5)将数据按age分组; (6)将数据按name升序排列; (7)取出前3行数... 查看详情

sparksql编程初级实践

今下午在课上没有将实验做完,课下进行了补充,最终完成。下面附上厦门大学数据库实验室中spark实验官网提供的标准答案,以供参考。 三、实验内容和要求1.SparkSQL基本操作 将下列json数据复制到你的ubuntu系统/usr/loca... 查看详情

实验5mapreduce初级编程实践——编写程序实现对输入文件的排序(代码片段)

一、实验目的通过实验掌握基本的MapReduce编程方法;掌握用MapReduce解决一些常见的数据处理问题,包括数据去重、数据排序和数据挖掘等。二、实验平台操作系统:Linux(建议Ubuntu16.04或Ubuntu18.04)Hadoop版本ÿ... 查看详情

实验4rdd编程初级实践

1.spark-shell交互式编程(1)该系总共有多少学生scala>vallines=sc.textFile("file:///usr/local/spark/sparklab/Data01.txt")lines:org.apache.spark.rdd.RDD[String]=file:///usr/local/spark/sparklab/Data01.txtMapPartitionsRDD[4]attextFileat<console>:24scala>valinfo=lines.map(ro... 查看详情

大数据hadoop实验报告(代码片段)

文章目录实验一熟悉常用的Linux操作和Hadoop操作1.实验目的2.实验平台3.实验内容和要求实验二熟悉常用的HDFS操作1.实验目的2.实验平台3.实验步骤实验三熟悉常用的HBase操作1.实验目的2.实验平台3.实验步骤实验四MapReduce/Spark编程初... 查看详情

大数据hadoop实验报告(代码片段)

文章目录实验一熟悉常用的Linux操作和Hadoop操作1.实验目的2.实验平台3.实验内容和要求实验二熟悉常用的HDFS操作1.实验目的2.实验平台3.实验步骤实验三熟悉常用的HBase操作1.实验目的2.实验平台3.实验步骤实验四MapReduce/Spark编程初... 查看详情

rdd编程初级实践

RDD编程初级实践一、实验目的(1)熟悉Spark的RDD基本操作及键值对操作;(2)熟悉使用RDD编程解决实际具体问题的方法。二、实验平台操作系统:Ubuntu16.04Spark版本:2.4.0Python版本:3.4.3三、实验内... 查看详情