spark实践——基于sparkstreaming的实时日志分析系统(代码片段)

LiBaoquan LiBaoquan     2022-11-13     147

关键词:

本文基于《Spark 最佳实践》第6章 Spark 流式计算。

我们知道网站用户访问流量是不间断的,基于网站的访问日志,即 Web log 分析是典型的流式实时计算应用场景。比如百度统计,它可以做流量分析、来源分析、网站分析、转化分析。另外还有特定场景分析,比如安全分析,用来识别 CC 攻击、 SQL 注入分析、脱库等。这里我们简单实现一个类似于百度分析的系统。

代码见 https://github.com/libaoquan95/WebLogAnalyse

1.模拟生成 web log 记录

在日志中,每行代表一条访问记录,典型格式如下:

46.156.87.72 - - [2018-05-15 06:00:30] "GET /upload.php HTTP/1.1" 200 0 "http://www.baidu.com/s?wd=spark" "Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; Trident/6.0)" "-"

分别代表:访问 ip,时间戳,访问页面,响应状态,搜索引擎索引,访问 Agent。

简单模拟一下数据收集和发送的环节,用一个 Python 脚本随机生成 Nginx 访问日志,为了方便起见,不使用 HDFS,使用单机文件系统。

首先,新建文件夹用于存放日志文件

$ mkdir Documents/nginx
$ mkdir Documents/nginx/log
$ mkdir Documents/nginx/log/tmp

然后,使用 Python 脚本随机生成 Nginx 访问日志,并为脚本设置执行权限, 代码见 sample_web_log.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import random
import time


class WebLogGeneration(object):

    # 类属性,由所有类的对象共享
    site_url_base = "http://www.xxx.com/"

    # 基本构造函数
    def __init__(self):
        #  前面7条是IE,所以大概浏览器类型70%为IE ,接入类型上,20%为移动设备,分别是7和8条,5% 为空
        self.user_agent_dist = 0.0:"Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; Trident/6.0)",
                                0.1:"Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; Trident/6.0)",
                                0.2:"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Trident/4.0; .NET CLR 2.0.50727)",
                                0.3:"Mozilla/4.0 (compatible; MSIE6.0; Windows NT 5.0; .NET CLR 1.1.4322)",
                                0.4:"Mozilla/5.0 (Windows NT 6.1; Trident/7.0; rv:11.0) like Gecko",
                                0.5:"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:41.0) Gecko/20100101 Firefox/41.0",
                                0.6:"Mozilla/4.0 (compatible; MSIE6.0; Windows NT 5.0; .NET CLR 1.1.4322)",
                                0.7:"Mozilla/5.0 (iPhone; CPU iPhone OS 7_0_3 like Mac OS X) AppleWebKit/537.51.1 (KHTML, like Gecko) Version/7.0 Mobile/11B511 Safari/9537.53",
                                0.8:"Mozilla/5.0 (Linux; Android 4.2.1; Galaxy Nexus Build/JOP40D) AppleWebKit/535.19 (KHTML, like Gecko) Chrome/18.0.1025.166 Mobile Safari/535.19",
                                0.9:"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.85 Safari/537.36",
                                1:" ",
        self.ip_slice_list = [10, 29, 30, 46, 55, 63, 72, 87, 98,132,156,124,167,143,187,168,190,201,202,214,215,222]
        self.url_path_list = ["login.php","view.php","list.php","upload.php","admin/login.php","edit.php","index.html"]
        self.http_refer = [ "http://www.baidu.com/s?wd=query","http://www.google.cn/search?q=query","http://www.sogou.com/web?query=query","http://one.cn.yahoo.com/s?p=query","http://cn.bing.com/search?q=query"]
        self.search_keyword = ["spark","hadoop","hive","spark mlib","spark sql"]


    def sample_ip(self):
        slice = random.sample(self.ip_slice_list, 4) #从ip_slice_list中随机获取4个元素,作为一个片断返回
        return  ".".join([str(item) for item in slice])  #  todo


    def sample_url(self):
        return  random.sample(self.url_path_list,1)[0]


    def sample_user_agent(self):
        dist_uppon = random.uniform(0, 1)
        return self.user_agent_dist[float(\'%0.1f\' % dist_uppon)]


    # 主要搜索引擎referrer参数
    def sample_refer(self):
        if random.uniform(0, 1) > 0.2:  # 只有20% 流量有refer
            return "-"

        refer_str=random.sample(self.http_refer,1)
        query_str=random.sample(self.search_keyword,1)
        return refer_str[0].format(query=query_str[0])

    def sample_one_log(self,count = 3):
        time_str = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())
        while count >1:
            query_log = "ip - - [local_time] \\"GET /url HTTP/1.1\\" 200 0 \\"refer\\" \\"user_agent\\" \\"-\\"".format(ip=self.sample_ip(),local_time=time_str,url=self.sample_url(),refer=self.sample_refer(),user_agent=self.sample_user_agent())
            print query_log
            count = count -1

if __name__ == "__main__":
    web_log_gene = WebLogGeneration()

    #while True:
    #    time.sleep(random.uniform(0, 3))
    web_log_gene.sample_one_log(random.uniform(10, 100))

设置可执行权限的方法如下

$ chmod +x sample_web_log.py

之后,编写 bash 脚本,自动生成日志记录,并赋予可执行权限,代码见 genLog.sh

#!/bin/bash

while [ 1 ]; do
    ./sample_web_log.py > test.log

    tmplog="access.`date +\'%s\'`.log"
    cp test.log streaming/tmp/$tmplog
    mv streaming/tmp/$tmplog streaming/
    echo "`date +"%F %T"` generating $tmplog succeed"
    sleep 1
done

赋予权限

$ chmod +x genLog.sh

执行 genLog.sh 查看效果,输入 ctrl+c 终止。

$ ./genLog.sh

2.流式分析

创建 Scala 脚本,代码见 genLog.sh

import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds, StreamingContext

val batch = 10  // 计算周期(秒)
//val conf = new SparkConf().setAppName("WebLogAnalyse").setMaster("local")
//val ssc = new StreamingContext(conf, Seconds(batch))
val ssc = new StreamingContext(sc, Seconds(batch))
val input = "file:///home/libaoquan/Documents/nginx/log"  // 文件流
val lines = ssc.textFileStream(input)

// 计算总PV
lines.count().print()

// 各个ip的pv
lines.map(line => (line.split(" ")(0), 1)).reduceByKey(_+_).print()

// 获取搜索引擎信息
val urls = lines.map(_.split("\\"")(3))

// 先输出搜索引擎和查询关键词,避免统计搜索关键词时重复计算
// 输出(host, query_keys)
val searchEnginInfo = urls.map( url  => 
  // 搜索引擎对应的关键字索引
  val searchEngines = Map(
  "www.google.cn" -> "q",
  "www.yahoo.com" -> "p",
  "cn.bing.com" -> "q",
  "www.baidu.com" -> "wd",
  "www.sogou.com" -> "query"
  )
  val temp = url.split("/")
  // Array(http:, "", www.baidu.com, s?wd=hadoop)
  if(temp.length > 2)
  val host = temp(2)
  if(searchEngines.contains(host))
    val q = url.split("//?")
    if(q.length > 0) 
      val query = q(1)
      val arr_search_q = query.split(\'&\').filter(_.indexOf(searchEngines(host) + "=") == 0)
      if (arr_search_q.length > 0) 
        (host, arr_search_q(0).split(\'=\')(1))
       else 
        (host, "")
      
     else
      ("", "")
    
   else
    ("", "")
  
   else
  ("", "")
  
)

// 搜索引擎pv
searchEnginInfo.filter(_._1.length > 0).map(i => (i._1, 1)).reduceByKey(_+_).print()

// 关键字pv
searchEnginInfo.filter(_._2.length > 0).map(i => (i._2, 1)).reduceByKey(_+_).print()

// 终端pv
lines.map(_.split("\\"")(5)).map(agent => 
  val types = Seq("iPhone", "Android")
  var r = "Default"
  for (t <- types) 
  if (agent.indexOf(t) != -1)
    r = t
  
  (r, 1)
).reduceByKey(_ + _).print()

// 各页面pv
lines.map(line => (line.split("\\"")(1).split(" ")(1), 1)).reduceByKey(_+_).print()

ssc.start()
ssc.awaitTermination()

3.执行

同时开启两个终端,分别执行 genLog.sh 生成日志文件和执行 WebLogAnalyse.scala 脚本进行流式分析。

执行 genLog.sh

$ ./genLog.sh

执行 WebLogAnalyse.scala, 使用 spark-shell 执行 scala 脚本

$ spark-shell --executor-memory 5g --driver-memory 1g --master local  < WebLogAnalyse.scala 

效果如下,左边是 WebLogAnalyse.scala,右边是 genLog.sh

sparkstreaming实践和优化

...dn.net/news/detail/54500作者:徐鑫,董西成在流式计算领域,SparkStreaming和Storm时下应用最广泛的两个计算引擎。其中,SparkStreaming是Spark生态系统中的重要组成部分,在实现上复用Spark计算引擎。如图1所示,SparkStreaming支持的数据源... 查看详情

sparkstreaming高级特性在ndcg计算实践

从storm到sparkstreaming,再到flink,流式计算得到长足发展,依托于spark平台的sparkstreaming走出了一条自己的路,其借鉴了spark批处理架构,通过批处理方式实现了实时处理框架。为进一步了解sparkstreaming的相关内容,飞马网于3月20日... 查看详情

基于spark和sparkstreaming的word2vec

...以是1小时1训练也可以1天1训练,根据具体业务来判断,sparkstreaming在线分析。由于历史问题,spark还在用1.5.0,接口上和2.1还是有点区别, 查看详情

spark配置-----sparkstreaming

SparkStreamingSparkStreaming使用SparkAPI进行流计算,这意味着在Spark上进行流处理与批处理的方式一样。因此,你可以复用批处理的代码,使用SparkStreaming构建强大的交互式应用程序,而不仅仅是用于分析数据。SparkStreaming示例(基于流... 查看详情

sparkstreaming基于案例详解(代码片段)

...代码的实战和实验演示都会详细的补充。packagecom.dt.spark.sparkstreamingimportorg.apache.spark.SparkConfimportorg.apache.spark.sql.Rowimpo 查看详情

Spark Streaming - 基于 TIMESTAMP 字段的处理

】SparkStreaming-基于TIMESTAMP字段的处理【英文标题】:SparkStreaming-TIMESTAMPfieldbasedprocessing【发布时间】:2017-02-1409:17:48【问题描述】:我对SparkStreaming还很陌生,我需要一些基本的说明,但阅读文档时我无法完全理解。用例是我有... 查看详情

大数据之spark:sparkstreaming

...流程2.数据抽象3.DStream相关操作1)Transformations2)Output/ActionSparkStreaming是一个基于SparkCore之上的实时计算框架,可以从很多数据源消费数据并对数据进行实时的处理,具有高吞吐量和容错能力强等特点。SparkStreaming的特点࿱... 查看详情

spark定制版:016~sparkstreaming源码解读之数据清理内幕彻底解密

本讲内容:a.SparkStreaming数据清理原因和现象b.SparkStreaming数据清理代码解析注:本讲内容基于Spark1.6.1版本(在2016年5月来说是Spark最新版本)讲解。上节回顾上一讲中,我们之所以用一节课来讲NoReceivers,是因为企业级SparkStreaming... 查看详情

sparkstreaming基础理论

一、SparkStreaming的介绍(1)为什么要有SparkStreaming?  Hadoop的MapReduce及SparkSQL等只能进行离线计算,无法满足实时性要求较高的业务需求,例如实时推荐、实时网站性能分析等,流式计算可以解决这些问题。目前有三种比... 查看详情

spark图解(代码片段)

...越来越得到大家的青睐,我自己最近半年在接触spark以及sparkstreaming之后,对spark技术的使用有一些自己的经验积累以及心得体会,在此分享给大家。本文依次从spark生态,原理,基本概念,sparkstreaming原理及实践,还有spark调优以... 查看详情

sparkstreaming企业运用

==========SparkStreaming是什么==========1、SParkStreaming是Spark中一个组件,基于SparkCore进行构建,用于对流式进行处理,类似于Storm。2、SparkStreaming能够和SparkCore、SparkSQL来进行混合编程。3、SparkStreaming我们主要关注:  (1)SparkSt... 查看详情

spark学习9sparkstreaming流式数据处理组件学习(代码片段)

目录SparkStreaming相关概念概述SparkStreaming的基本数据抽象DStream处理模式操作流程中细节StreamingContextStreamingContext对象的创建StreamingContext主要用法输入源DStream两种转化无状态转化操作有状态转化操作输出操作实践(最简单的wordCount... 查看详情

sparkstreaming整合flume

1目的  SparkStreaming整合Flume。参考官方整合文档(http://spark.apache.org/docs/2.2.0/streaming-flume-integration.html)2整合方式一:基于推2.1基本要求flume和spark一个work节点要在同一台机器上,flume会在本机器上通过配置的端口推送数据streami... 查看详情

怎样利用sparkstreaming和hadoop实现近实时的会话连接

参考技术A科普Spark,Spark是什么,如何使用Spark1.Spark基于什么算法的分布式计算(很简单)2.Spark与MapReduce不同在什么地方3.Spark为什么比Hadoop灵活4.Spark局限是什么5.什么情况下适合使用Spark什么是SparkSpark是UCBerkeleyAMPlab所开源的类H... 查看详情

Spark:单个应用程序中的两个 SparkContext 最佳实践

...下面的代码中,您会注意到我有两个SparkContext,一个用于SparkStreaming,另一个用于普通SparkContext。根据最佳实践, 查看详情

是时候学习真正的spark技术了

...parksql可以说是spark中的精华部分了,我感觉整体复杂度是sparkstreaming的5倍以上,现在spark官方主推structedstreaming,sparkstreaming 维护的也不积极了,我们基于spark来构建大数据计算任务,重心也要向DataSet转移,原来基于RDD写的代... 查看详情

京东基于spark的风控系统架构实践和技术细节

京东基于Spark的风控系统架构实践和技术细节时间 2016-06-0209:36:32  炼数成金原文  http://www.dataguru.cn/article-9419-1.html主题 Spark 软件架构1.背景互联网的迅速发展,为电子商务兴起提供了肥沃的土壤。2014年... 查看详情

个推spark实践教你绕过开发那些“坑”

...理速度,特别是复杂的迭代计算。Spark主要包括SparkSQL,SparkStreaming,SparkMLLib以及图计算。Spark核心概念简介1、RDD即弹性分布式数据集,通过RDD可以执行各种算子实现数据处理和计算。比如用Spark做统计词频,即拿到一串文字进行... 查看详情