sparkstreaming实时计算框架学习01(代码片段)

数据攻城小狮子 数据攻城小狮子     2022-12-13     759

关键词:

文章目录

初探Spark Streaming

从hadoop102的8888端口接受一行或者多行文本内容,并对接收到的内容以空格分隔计算每个单词出现的次数

package Spark_Streaming
import org.apache.log4j.Level,Logger
import org.apache.spark.SparkContext,SparkConf
import org.apache.spark.streaming._
object demo01 
  def main(args: Array[String]): Unit = 
    //设置日志级别
    Logger.getLogger("org").setLevel(Level.ERROR)
    val conf=new SparkConf().setAppName("demo01").setMaster("local[2]")
    val sc=new SparkContext(conf)
    //从SparkConf创建SteamingContext并指定10s的批处理大小
    val ssc=new StreamingContext(sc,Seconds(10))
    //启动连接到hadoop102 8888端口上,使用收到的数据创建DAtream
    val lines=ssc.socketTextStream("hadoop102",8888)
    val words=lines.flatMap(_.split(" "))
    val wordCounts=words.map(x=>(x,1)).reduceByKey(_+_)
    wordCounts.print()
    //启动流计算环境StreamingContext
    ssc.start()
    ssc.awaitTermination()
  


监听hadoop102 HDFS目录/opt/datafiles,一旦有新文件加入到此目录,Spark
Streaming会计算出该时间内的单词统计数

package Spark_Streaming
import org.apache.log4j.Level,Logger
import org.apache.spark.SparkContext,SparkConf
import org.apache.spark.streaming._
object demo02 
  def main(args: Array[String]): Unit = 
    //设置日志级别
    Logger.getLogger("org").setLevel(Level.ERROR)
    val conf=new SparkConf().setAppName("demo02").setMaster("local[2]")
    val sc=new SparkContext(conf)
    //从SparkConf创建SteamingContext并指定10s的批处理大小
    val ssc=new StreamingContext(sc,Seconds(10))
    //启动连接到hadoop102 hdfs目录/opt/datafiles上
    val lines=ssc.textFileStream("hdfs://hadoop102:8020/opt/datafiles")
    val words=lines.flatMap(_.split(" "))
    val wordCounts=words.map(x=>(x,1)).reduceByKey(_+_)
    wordCounts.print()
    //启动流计算环境StreamingContext
    ssc.start()
    ssc.awaitTermination()
  



掌握DStream编程模型

DStream转换操作

使用transform将一行语句分割成单词

package Spark_Streaming
import org.apache.log4j.Level,Logger
import org.apache.spark.SparkContext,SparkConf
import org.apache.spark.streaming._
object demo03 
  def main(args: Array[String]): Unit = 
    //设置日志级别
    Logger.getLogger("org").setLevel(Level.ERROR)
    val conf=new SparkConf().setAppName("demo03").setMaster("local[2]")
    val sc=new SparkContext(conf)
    //从SparkConf创建SteamingContext并指定10s的批处理大小
    val ssc=new StreamingContext(sc,Seconds(10))
    //启动连接到hadoop102 8888端口上
    val lines=ssc.socketTextStream("hadoop102",8888)
    val words=lines.transform(rdd=>rdd.flatMap(_.split(" ")))
    words.print()
    //启动流计算环境StreamingContext
    ssc.start()
    ssc.awaitTermination()
  


DStream窗口操作

window操作是基于一个源DStream的窗口批次计算后得到新的DStream。
例如设置窗口长度为3s,滑动时间间隔为1s。截取源DStream中的元素形成新的DStream。

package Spark_Streaming
import org.apache.log4j.Level,Logger
import org.apache.spark.SparkContext,SparkConf
import org.apache.spark.streaming._
object demo04 
  def main(args: Array[String]): Unit = 
    //设置日志级别
    Logger.getLogger("org").setLevel(Level.ERROR)
    val conf=new SparkConf().setAppName("demo04").setMaster("local[2]")
    val sc=new SparkContext(conf)
    val ssc=new StreamingContext(sc,Seconds(1))
    val lines=ssc.socketTextStream("localhost",8888)
    val words=lines.flatMap(_.split(" "))
    val windowWords=words.window(Seconds(3),Seconds(1))
    windowWords.print()
    //启动流计算环境StreamingContext
    ssc.start()
    ssc.awaitTermination()
  

基本每秒输入一个字母,取出当前时刻3s这个长度中的所有元素,打印出来。第4s时已经看不到a了,说明a已经不在当前窗口中

DStream输出操作

package Spark_Streaming
import org.apache.log4j.Level,Logger
import org.apache.spark.streaming.Seconds,StreamingContext
import org.apache.spark.SparkContext,SparkConf
object demo06 
  def main(args: Array[String]): Unit = 
    //设置日志级别
    Logger.getLogger("org").setLevel(Level.ERROR)
    val conf=new SparkConf().setAppName("demo06").setMaster("local[2]")
    val sc=new SparkContext(conf)
    val ssc=new StreamingContext(sc,Seconds(10))
    val lines=ssc.socketTextStream("hadoop102",8888)
    lines.saveAsTextFiles("hdfs://hadoop102:8020/opt/datafiles/sahf","txt")
    ssc.start()
    ssc.awaitTermination()
  

如果报错显示权限不足无法写入
执行

hdfs dfs -chmod a+w /opt/datafiles
[xwk@hadoop102 ~]$ nc -l -p 8888
this is 1th line
this is 2th line
this is 3th line
this is 4 th line


4行文本分别保存在前4个目录中

使用foreachPartition,将处理结果写到MySQL数据库中

mysql> create database spark;
Query OK, 1 row affected (0.07 sec)

mysql> use spark;
Database changed
mysql> create table searchKeyWord(insert_time date,keyword varchar(30),search_count integer);
Query OK, 0 rows affected (0.66 sec)

设置窗口长度为60s,窗口滑动时间间隔10s,计算10s内每个单词出现的次数,根据出现的次数对单词进行降序排序

package Spark_Streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import java.sql.DriverManager
import org.apache.log4j.Level, Logger

object demo07 
  def main(args: Array[String]): Unit = 
    Logger.getLogger("org").setLevel(Level.ERROR)
    val conf = new SparkConf().setMaster("local[3]").setAppName("WriteDataToMySQL")
    val ssc = new StreamingContext(conf, Seconds(5))
    val ItemsStream = ssc.socketTextStream("hadoop102", 8888)
    val ItemPairs = ItemsStream.map(line => (line.split(",")(0), 1))
    val ItemCount = ItemPairs.reduceByKeyAndWindow((v1:Int,v2:Int)=>v1+v2,Seconds (60),Seconds(10))
    val hottestWord = ItemCount.transform(itemRDD => 
      val top3 = itemRDD.map(pair => (pair._2, pair._1)).sortByKey(false).map(pair => (pair._2, pair._1)).take(3)
      ssc.sparkContext.makeRDD(top3)
    )
    hottestWord.foreachRDD(rdd => 
      rdd.foreachPartition(partitionOfRecords => 
        val url = "jdbc:mysql://hadoop102:3306/spark"
        val user = "root"
        val password = "root"
        Class.forName("com.mysql.cj.jdbc.Driver")
        val conn = DriverManager.getConnection(url, user, password)
        conn.prepareStatement("delete from searchKeyWord where 1=1").executeUpdate()
        conn.setAutoCommit(false)
        val stmt = conn.createStatement()
        partitionOfRecords.foreach(record => 
          stmt.addBatch("insert into searchKeyWord (insert_time,keyword,search_count) values (now(),'" + record._1 + "','" + record._2 + "')")
        )
        stmt.executeBatch()
        conn.commit()
      )
    )
    ItemsStream.print()
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  

nc -l -p 8888
select * from searchKeyWord;

streaming30分钟概览sparkstreaming实时计算

本文主要介绍四个问题:什么是SparkStreaming实时计算?Spark实时计算原理流程是什么?Spark2.X下一代实时计算框架StructuredStreamingSparkStreaming相对其他实时计算框架该如何技术选型?本文主要针对初学者,如果有不明白的概念可了解... 查看详情

sparkstreaming学习之一sparkstreaming初识

...TP:Xftp4  jdk1.8  scala-2.10.4(依赖jdk1.8)  spark-1.6一、SparkStreaming简介SparkStreaming是流式处理框架,是SparkAPI的扩展,支持可扩展、高吞吐量、容错的实时数据流处理,实时数据的来源可以是:Kafka,Flume,Twitter,ZeroMQ或者TCPsockets,... 查看详情

sparkstreaming之实时数据流计算实例(代码片段)

最近在用sparkstreaming的技术来实现公司实时号码热度排序,学习了一下sparkstreaming的相关技术,今天主要要讲一个简单sparkstreaming实时数据流技术的一个示例,帮助大家更好的理解和学习sparkstreaming编程原理。在开始实... 查看详情

sparkstreaming初步使用以及工作原理详解

...析数据。因此出现了很多流式实时计算框架,比如Storm,SparkStreaming,Samaz等框架,本文主要讲解SparkStreaming的工作原理以及如何使用。一、流式计算1.什么是流?Streaming:是一种数 查看详情

sparkstreamingsparkday11:sparkstreaming学习笔记(代码片段)

SparkDay11:SparkStreaming01-[了解]-昨日课程内容回顾主要讲解:SparkStreaming模块快速入门1、Streaming流式计算概述 -Streaming应用场景 实时报表RealTimeReport 实时增量ETL 实时预警和监控 实时搜索推荐 等等 -大数据架构:Lambd... 查看详情

sparkstreamingsparkday11:sparkstreaming学习笔记(代码片段)

SparkDay11:SparkStreaming01-[了解]-昨日课程内容回顾主要讲解:SparkStreaming模块快速入门1、Streaming流式计算概述 -Streaming应用场景 实时报表RealTimeReport 实时增量ETL 实时预警和监控 实时搜索推荐 等等 -大数据架构:Lambd... 查看详情

spark学习笔记

...Hdoop,往往有更好的运行效率。Spark包括了SparkCore,SparkSQL,SparkStreaming,MLlib和Graphx等组件。SparkCore:内存计算框架SparkSQL:及时查询SparkStreaming:实时应用的处理ML 查看详情

sparkstreaming基础理论

一、SparkStreaming的介绍(1)为什么要有SparkStreaming?  Hadoop的MapReduce及SparkSQL等只能进行离线计算,无法满足实时性要求较高的业务需求,例如实时推荐、实时网站性能分析等,流式计算可以解决这些问题。目前有三种比... 查看详情

pk2227-sparkstreaming实时流处理项目实战

PK2227-SparkStreaming实时流处理项目实战新年伊始,学习要趁早,点滴记录,学习就是进步!随笔背景:在很多时候,很多入门不久的朋友都会问我:我是从其他语言转到程序开发的,有没有一些基础性的资料给我们学习学习呢,你... 查看详情

spark学习9sparkstreaming流式数据处理组件学习(代码片段)

目录SparkStreaming相关概念概述SparkStreaming的基本数据抽象DStream处理模式操作流程中细节StreamingContextStreamingContext对象的创建StreamingContext主要用法输入源DStream两种转化无状态转化操作有状态转化操作输出操作实践(最简单的wordCount... 查看详情

.sparkstreaming(上)--实时流计算sparkstreaming原理介

Spark入门实战系列--7.SparkStreaming(上)--实时流计算SparkStreaming原理介绍 http://www.cnblogs.com/shishanyuan/p/4747735.html1、SparkStreaming简介1.1 概述SparkStreaming 是Spark核心API的一个扩展,可以实现高吞吐量的、具备容错机制的实时... 查看详情

多个sparkstreaming怎么控制时间

参考技术AStorm优势就在于Storm是实时的连续性的分布式的计算框架,一旦运行起来,除非你将它杀掉,否则它一直处理计算或等待计算的状态.Spark和hadoop都做不到.当然它们各自都有其应用场景,各有各的优势.可以配合使用.下面我转一... 查看详情

sparkstreaming实时计算在甜橙金融监控系统中的应用及优化

...把这些信息发送到Kafka分布式发布订阅消息系统,接着由SparkStreaming消费Kafka中的消息,同时消费记录由Zookeeper集群统一管理,这样即使Kafka宕机重启后也能找到上次的消费记录继而进行消费。在这里SparkStreaming首先从MySQL读取规则... 查看详情

spark学习笔记——sparkstreaming

...、训练机器学习模型的应用,还有自动检测异常的应用。SparkStreaming是Spark为这些应用而设计的模型。它允许用户使用一套和批处理非常接近的API来编写流式计算应用,这样就可以大量重用批处理应用的技术甚至代码。SparkStreaming... 查看详情

实战sparkstream+kafka+redis实时计算商品销售额

写在前面2016年天猫双十一当天,零点的倒计时话音未落,52秒交易额冲破10亿。随后,又迅速在0时6分28秒,达到100亿!每一秒开猫大屏上的交易额都在刷新,这种时实刷新的大屏看着感觉超爽。天猫这个大... 查看详情

sparkstreaming实时计算海量用户uv

提出需求实时统计业务系统(web,APP之类)的访问人数,即所谓UV,或者DAU指标.这个需求怕是流计算最最最常见的需求了.计算UV的关键点就在于去重,即同一个人访问两次是只计一个UV的.在离线计算中统计UV比较容易想到的方法就是用grou... 查看详情

sparkstreaming高级特性在ndcg计算实践

从storm到sparkstreaming,再到flink,流式计算得到长足发展,依托于spark平台的sparkstreaming走出了一条自己的路,其借鉴了spark批处理架构,通过批处理方式实现了实时处理框架。为进一步了解sparkstreaming的相关内容,飞马网于3月20日... 查看详情

storm介绍及与sparkstreaming对比

1 Storm介绍Storm是由Twitter开源的分布式、高容错的实时处理系统,它的出现令持续不断的流计算变得容易,弥补了Hadoop批处理所不能满足的实时要求。Storm常用于在实时分析、在线机器学习、持续计算、分布式远程调用和ETL等... 查看详情