大数据进阶之路——sparksql日志分析(代码片段)

孙中明 孙中明     2023-01-10     206

关键词:

基本方案

用户行为日志:用户每次访问网站时所有的行为数据(访问、浏览、搜索、点击…)
用户行为轨迹、流量日志

日志数据内容:

  • 1)访问的系统属性: 操作系统、浏览器等等
  • 2)访问特征:点击的url、从哪个url跳转过来的(referer)、页面上的停留时间等
  • 3)访问信息:session_id、访问ip(访问城市)等
2013-05-19 13:00:00     http://www.taobao.com/17/?tracker_u=1624169&type=1      B58W48U4WKZCJ5D1T3Z9ZY88RU7QA7B1        http://hao.360.cn/      1.196.34.243 

数据处理流程

  • 1) 数据采集
    Flume: web日志写入到HDFS

  • 2)数据清洗
    脏数据
    Spark、Hive、MapReduce 或者是其他的一些分布式计算框架
    清洗完之后的数据可以存放在HDFS(Hive/Spark SQL)

  • 3)数据处理
    按照我们的需要进行相应业务的统计和分析
    Spark、Hive、MapReduce 或者是其他的一些分布式计算框架

  • 4)处理结果入库
    结果可以存放到RDBMS、NoSQL

  • 5)数据的可视化
    通过图形化展示的方式展现出来:饼图、柱状图、地图、折线图
    ECharts、HUE、Zeppelin

数据清洗

首先通过debug 找到分割后各个字段的对应的

  • 报错
java.io.IOException: Could not locate executable null\\bin\\winutils.exe in the Hadoop binaries.

执行第一步数据清洗时候,数据能打印出来,但是不能写入本地文件,这是因为本地没有hadoop伪分布式系统

装一个插件即可

https://hiszm.lanzous.com/iWyqmhrgk0f

下载上述插件,然后,新建目录并且放入到目录里面
C:\\Data\\hadoop\\bin

然后再系统环境变量添加
HADOOP_HOME
C:\\Data\\hadoop

package org.sparksql

import org.apache.spark.sql.SparkSession

object SparkFormatApp 

  def main(args: Array[String]): Unit = 

    //SparkSession是spark的入口类
    val spark = SparkSession.builder().appName("SparkFormatApp")
                .master("local[2]").getOrCreate()
    val access = spark.sparkContext.textFile("10000_access.log")

    //access.take(10).foreach(println)

    access.map(line=>
      val splits = line.split(" ")
      val ip = splits(0)
      val time = splits(3) + " " + splits(4)
      val traffic = splits(9)
      val url =  splits(11).replace("\\"","")
     //(ip,DateUtils.parse(time),traffic,traffic,url)
      DateUtils.parse(time) + "\\t" + url + "\\t" + traffic + "\\t" + ip
    ).saveAsTextFile("output")

    //.take(10).foreach(println)
    //.saveAsTextFile("output")

    spark.stop()

  


一般的日志处理方式,我们是需要进行分区的,
按照日志中的访问时间进行相应的分区,比如:d,h,m5(每5分钟一个分区)

二次清洗

  • 输入:访问时间、访问URL、耗费的流量、访问IP地址信息
  • 输出:URL、cmsType(video/article)、cmsId(编号)、流量、ip、城市信息、访问时间、天
package org.sparksql

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.LongType, StringType, StructField, StructType

//访问日志工具转换类
object AccessConvertUtils 

  val struct=StructType(
    Array(
      StructField("url",StringType),
      StructField("cmsType",StringType),
      StructField("cmsId",LongType),
      StructField("traffic",LongType),
      StructField("ip",StringType),
      StructField("city",StringType),
      StructField("time",StringType),
      StructField("day",StringType)
    )
  )

//根据输入的每一行信息转化成输出的样式
  def parseLog(log:String)=
    try
      val splits=log.split("\\t")
      val url =splits(1)
      val traffic = splits(2).toLong
      val ip = splits(3)

      val domain="http://www.imooc.com/"
      val cms=url.substring(url.indexOf(domain) + domain.length)
      val cmsTypeId = cms.split("/")
      var cmsType = ""
      var cmsId = 0l
      if(cmsTypeId.length > 1)
        cmsType = cmsTypeId(0)
        cmsId = cmsTypeId(1).toLong
      

      val city = IpUtils.getCity(ip)
      val time = splits(0)
      val day =  time.substring(0,10).replaceAll("-","")
      Row(url,cmsType,cmsId,traffic,ip,city,time,day)
    catch 
      case e : Exception => Row(0)
    
  



  • IP=>省份

使用github上已有的开源项目
1)git clone https://github.com/wzhe06/ipdatabase.git

2)编译下载的项目:mvn clean package -DskipTests

3)安装jar包到自己的maven仓库

mvn install:install-file -Dfile=C:\\Data\\ipdatabase\\target\\ipdatabase-1.0-SNAPSHOT.jar -DgroupId=com.ggstar -DartifactId=ipdatabase -Dversion=1.0 -Dpackaging=jar

  1. 拷贝相关文件不然会报错

java.io.FileNotFoundException: file:/Users/rocky/maven_repos/com/ggstar/ipdatabase/1.0/ipdatabase-1.0.jar!/ipRegion.xlsx (No such file or directory)
  1. 测试

package org.sparksql

import org.apache.spark.sql.SparkSession

object SparkCleanApp 

  def main(args: Array[String]): Unit = 
    //SparkSession是spark的入口类
    val spark = SparkSession.builder().appName("SparkFormatApp")
      .master("local[2]").getOrCreate()
    val accessRDD = spark.sparkContext.textFile("access.log")

    //accessRDD.take(10).foreach(println)

    val accessDF = spark.createDataFrame(accessRDD.map(x=>AccessConvertUtils.parseLog(x)),AccessConvertUtils.struct)

    accessDF.printSchema()
    accessDF.show()

    spark.stop
  




root
 |-- url: string (nullable = true)
 |-- cmsType: string (nullable = true)
 |-- cmsId: long (nullable = true)
 |-- traffic: long (nullable = true)
 |-- ip: string (nullable = true)
 |-- city: string (nullable = true)
 |-- time: string (nullable = true)
 |-- day: string (nullable = true)




+--------------------+-------+-----+-------+---------------+----+-------------------+--------+
|                 url|cmsType|cmsId|traffic|             ip|city|               time|     day|
+--------------------+-------+-----+-------+---------------+----+-------------------+--------+
|http://www.imooc....|  video| 4500|    304|  218.75.35.226| 浙江省|2017-05-11 14:09:14|20170511|
|http://www.imooc....|  video|14623|     69| 202.96.134.133| 广东省|2017-05-11 15:25:05|20170511|
|http://www.imooc....|article|17894|    115| 202.96.134.133| 广东省|2017-05-11 07:50:01|20170511|
|http://www.imooc....|article|17896|    804|  218.75.35.226| 浙江省|2017-05-11 02:46:43|20170511|
|http://www.imooc....|article|17893|    893|222.129.235.182| 北京市|2017-05-11 09:30:25|20170511|
|http://www.imooc....|article|17891|    407|  218.75.35.226| 浙江省|2017-05-11 08:07:35|20170511|
|http://www.imooc....|article|17897|     78| 202.96.134.133| 广东省|2017-05-11 19:08:13|20170511|
|http://www.imooc....|article|17894|    658|222.129.235.182| 北京市|2017-05-11 04:18:47|20170511|
|http://www.imooc....|article|17893|    161|   58.32.19.255| 上海市|2017-05-11 01:25:21|20170511|
|http://www.imooc....|article|17895|    701|    218.22.9.56| 安徽省|2017-05-11 13:37:22|20170511|
|http://www.imooc....|article|17892|    986|  218.75.35.226| 浙江省|2017-05-11 05:53:47|20170511|
|http://www.imooc....|  video|14540|    987|   58.32.19.255| 上海市|2017-05-11 18:44:56|20170511|
|http://www.imooc....|article|17892|    610|  218.75.35.226| 浙江省|2017-05-11 17:48:51|20170511|
|http://www.imooc....|article|17893|      0|    218.22.9.56| 安徽省|2017-05-11 16:20:03|20170511|
|http://www.imooc....|article|17891|    262|   58.32.19.255| 上海市|2017-05-11 00:38:01|20170511|
|http://www.imooc....|  video| 4600|    465|  218.75.35.226| 浙江省|2017-05-11 17:38:16|20170511|
|http://www.imooc....|  video| 4600|    833|222.129.235.182| 北京市|2017-05-11 07:11:36|20170511|
|http://www.imooc....|article|17895|    320|222.129.235.182| 北京市|2017-05-11 19:25:04|20170511|
|http://www.imooc....|article|17898|    460| 202.96.134.133| 广东省|2017-05-11 15:14:28|20170511|
|http://www.imooc....|article|17899|    389|222.129.235.182| 北京市|2017-05-11 02:43:15|20170511|
+--------------------+-------+-----+-------+---------------+----+-------------------+--------+

调优点:

  1. 控制文件输出的大小: coalesce
  2. 分区字段的数据类型调整:spark.sql.sources.partitionColumnTypeInference.enabled
  3. 批量插入数据库数据,提交使用batch操作
package org.sparksql

import org.apache.spark.sql.DataFrame, SparkSession
import org.apache.spark.sql.functions._
object TopNApp 
  //最受欢迎
  def videoAccessTopN(spark: SparkSession, accessDF: DataFrame) = 
    import spark.implicits._
    val videoTopNDF = accessDF.filter($"day"==="20170511"&& $"cmsType" === "video")
      .groupBy("day","cmsId").agg(count("cmsId")
      .as("times")).orderBy($"times".desc)
    videoTopNDF.show()

    accessDF.createOrReplaceTempView("access_log")
    val videoTopNDF1 = spark.sql("select day,cmsId,count(1) as times from access_log where day='20170511' and cmsType = 'video' group by day,cmsId order by times desc")

    videoTopNDF1.show()


  

  def main(args: Array[String]): Unit = 
    //SparkSession是spark的入口类
    val spark = SparkSession.builder().appName("SparkFormatApp")
      .config("spark.sql.sources.partitionColumnTypeInference.enabled","false")
      .master("local[2]").getOrCreate()

    val accessDF= spark.read.format("parquet").load("output2/")
    accessDF.printSchema()
    accessDF.show(false)

    videoAccessTopN(spark,accessDF)
    spark.stop()
  













+--------大数据日志可视化分析(hadoop+sparksql)(代码片段)

...具环境等的说明92.1IDEA简介92.2HTML/CSS简介92.3Spark简介102.4SparkSQL简介102.5.Hadoop简介112.6.ECharts简介112.7.Mysql简介112.5本章小结123需求分析133.1功能需求分析133.2业务流程分析133.3数据流图173.4数 查看详情

日志分析为例进入大数据sparksql的世界共10章

第1章初探大数据本章将介绍为什么要学习大数据、如何学好大数据、如何快速转型大数据岗位、本项目实战课程的内容安排、本项目实战课程的前置内容介绍、开发环境介绍。同时为大家介绍项目中涉及的Hadoop、Hive相关的知识... 查看详情

大数据进阶之路——scala入门(代码片段)

文章目录概述安装JavaVSScalaval和var基本数据类型lazy在Scala中的应用开发工具IDEAMaven概述https://www.scala-lang.org/Scalacombinesobject-orientedandfunctionalprogramminginoneconcise,high-levellanguage.Scala’sstatictypeshelpavoidb 查看详情

大数据进阶之路——scala函数和对象(代码片段)

文章目录函数方法定义默认参数命名参数可变参数条件语句循环语句面向对象概述类的定义和使用抽象类伴生类和伴生对象case和trait函数方法定义 def方法名(参数:参数类型):返回值类型= //方法体 //最后一行作为返回值(不需... 查看详情

maxcompute文章索引

...ute2.0生态开放之路及最新发展10年老兵带你看尽MaxCompute大数据运算挑战与实践一分钟了解阿里云产品:大数据计算服务MaxCompute概述数加平台如何通过Serverless架构实现普惠大数据淘宝大数据之路 应用案例: 日志分析:云... 查看详情

开发者的进阶之路:用语法树来实现预编译

...,会产生一系列的token流,token是一个保存着type和value的数据结构。词法分析将源代码的每一个关键词都分割出来,之后通过语法分析器进行语法模板匹配,最后生成语法树。语法树的应用这一阶段,开发者可以对语法树进行修... 查看详情

数据分析师教程_从小白到“数据分析师”大神进阶之路

...//www.xuetuwuyou.com课程特色——成长六部曲小白脱白篇—>数据分析篇—>数据挖掘篇—>Python加薪冲刺篇—>大数据分析师进阶篇——>精英项目实战篇一、课程目标数据分析师秉承着总结凝练最先进的商业数据分析实践为... 查看详情

sparksql读取mysql中的数据日志分析

"F:\ProgramFiles\Java\jdk1.7.0_15\bin\java"-Didea.launcher.port=7539"-Didea.launcher.bin.path=F:\ProgramFiles(x86)\JetBrains\IntelliJIDEASpark\bin"-Dfile.encoding= 查看详情

flume学习之路flume的基础介绍(代码片段)

...体开发流程:从Hadoop的业务开发流程图中可以看出,在大数据的业务处理过程中,对于数据的采集是十分重要的一步,也是不可避免的一步。许多公司的平台每天会产生大量的日志(一般为流式数据,如,搜索引擎的pv,查询等... 查看详情

数学之路-数据分析进阶-区间预计与如果检验

...向该客户推送相关广告。客户服务部门推荐了客户A,在数据库随机抽取了100个客户资料的前4个季度平均季消费数据(在这里用平均随机数模拟数据),客户A平均季消费为元,检測其是否消费处于中上水平(位于中位数以上)>... 查看详情

大数据分析学习之路

  一、大数据分析的五个基本方面  二、如何选择适合的数据分析工具  三、如何区分三个大数据热门职业  四、从菜鸟成为数据科学家的9步养成方案   一、大数据分析的五个基本方面  1,可视化分析  大... 查看详情

sparksql优化之路——hive篇(代码片段)

文章目录前言优化方向数据存储结构优化分区设计分桶设计数据压缩存储格式数据生产者应注意的事项优化场景个别Task运行缓慢源端数据倾斜处理过程中的数据倾斜不合理的哈系分布大小表JoinTask数量多源数据小文件多写入时小... 查看详情

java进阶之路-自定义持久层框架(代码片段)

1持久层以及JDBC问题分析1.1什么是持久层数据访问层又称为DAL或Dao层,有时候也称为是持久层,其功能主要是负责数据库的访问,简单的说法就是实现对数据表的Select(查询),Insert(插入),Update&#... 查看详情

spark手机流量日志处理使用sparksql按月统计流量使用量最多的用户(代码片段)

...f0c;RDD,算子的使用,底层原理,SparkCore,SparkSQL,SparkStreaming等,Spark专栏地址.欢迎小伙伴们订阅💪手机流量日志处理SparkSQL简介依赖引入SparkSQL快速入门案例手机流量日志数据格式与处理要求处理程序... 查看详情

大数据之spark:sparksql

目录1.数据分析方式1)命令式2)SQL3)总结2.SparkSQL前世今生1)发展历史3.Hive和SparkSQL4.数据分类和SparkSQL适用场景1)结构化数据2)半结构化数据3)总结1.数据分析方式1)命令式在前面的RDD部分,非常明显可以感觉的到是命令式的,主要特征是... 查看详情

大数据spark及sparksql数据倾斜现象和解决思路(代码片段)

数据倾斜分类join其中一个表数据量小,key比较集中分发到某一个或几个reduce的数据远高于平均值大表与小表,空值过多这些空值都由一个reduce处理,处理慢groupbygroupby维度太少,某字段量太大处理某值的reduce非常... 查看详情

大数据技术之_27_电商平台数据分析项目_02_预备知识+scala+sparkcore+sparksql+sparkstreaming+java对象池(代码片段)

第0章预备知识0.1Scala0.1.1Scala操作符0.1.2拉链操作0.2SparkCore0.2.1SparkRDD持久化0.2.2Spark共享变量0.3SparkSQL0.3.1RDD、DataFrame与DataSet0.3.2DataSet与RDD互操作0.3.3RDD、DataFrame与DataSet之间的转换0.3.4用户自定义聚合函数(UDAF)0.3.5开窗函数0.4Sp 查看详情

通过扩展sparksql,打造自己的大数据分析引擎

SparkSQL的Catalyst,这部分真的很有意思,值得去仔细研究一番,今天先来说说Spark的一些扩展机制吧,上一次写Spark,对其SQL的解析进行了一定的魔改,今天我们按套路来,使用砖厂为我们提供的机制ÿ... 查看详情