基于spark的用户行为路径分析

ChouYarn ChouYarn     2022-08-20     532

关键词:

一、研究背景

  互联网行业越来越重视自家客户的一些行为偏好了,无论是电商行业还是金融行业,基于用户行为可以做出很多东西,电商行业可以归纳出用户偏好为用户推荐商品,金融行业可以把用户行为作为反欺诈的一个点,本文主要介绍其中一个重要的功能点,基于行为日志统计用户行为路径,为运营人员提供更好的运营决策。可以实现和成熟产品如adobe analysis类似的用户行为路径分析。最终效果如图。使用的是开源大数据可视化工具。如图所示,用户行为路径的数据非常巨大,uv指标又不能提前计算好(时间段未定),如果展示5级,一个页面的数据量就是10的5次方,如果有几千个页面,数据量是无法估量的,所以只能进行实时计算,而Spark非常适合迭代计算,基于这样的考虑,Spark是不错的选择。

二、解决方案

1.流程描述

  客户搜索某一起始页面的行为路径明细数据时,RPC请求到后台,调用spark-submit脚本启动spark程序,Spark程序实时计算并返回数据,前端Java解析数据并展现。

 

2.准备工作

  1.首先要有行为数据啦,用户行为日志数据必须包含必须包含以下四个字段,访问时间、设备指纹、会话id、页面名称,其中页面名称可以自行定义,用来标示一种或者一类页面,每次用户请求的时候上报此字段,服务器端收集并存储,此页面名称最好不要有重复,为后续分析打下基础。

  2.然后对行为日志进行一级清洗(基于Hive),将数据统一清洗成如下格式。设备指纹是我另一个研究的项目,还没时间贴出来。会话id就是可以定义一个会话超时时间,即20分钟用户如果没有任何动作,等20分钟过后再点击页面就认为这是下个一会话id,可通过cookie来控制此会话id。

设备指纹 会话id 页面路径(按时间升序 时间
fpid1 sessionid1 A_B_C_D_E_F_G 2017-01-13

 

A、B、C代表页面名称,清洗过程采用row_number函数,concat_ws函数,具体用法可以百度。清洗完之后落地到hive表,后续会用到。T+1清洗此数据。

  3.弄清楚递归的定义

  递归算法是一种直接或者间接调用自身函数或者方法的算法。Java递归算法是基于Java语言实现的递归算法。递归算法的实质是把问题分解成规模缩小的同类问题的子问题,然后递归调用方法来表示问题的解。递归算法对解决一大类问题很有效,它可以使算法简洁和易于理解。递归算法,其实说白了,就是程序的自身调用。它表现在一段程序中往往会遇到调用自身的那样一种coding策略,这样我们就可以利用大道至简的思想,把一个大的复杂的问题层层转换为一个小的和原问题相似的问题来求解的这样一种策略。递归往往能给我们带来非常简洁非常直观的代码形势,从而使我们的编码大大简化,然而递归的思维确实很我们的常规思维相逆的,我们通常都是从上而下的思维问题, 而递归趋势从下往上的进行思维。这样我们就能看到我们会用很少的语句解决了非常大的问题,所以递归策略的最主要体现就是小的代码量解决了非常复杂的问题。

  递归算法解决问题的特点:   

  1)递归就是方法里调用自身。   

  2)在使用递增归策略时,必须有一个明确的递归结束条件,称为递归出口。    
  3)递归算法解题通常显得很简洁,但递归算法解题的运行效率较低。所以一般不提倡用递归算法设计程序。
  4)在递归调用的过程当中系统为每一层的返回点、局部量等开辟了栈来存储。递归次数过多容易造成栈溢出等,所以一般不提倡用递归算法设计程序。

      在做递归算法的时候,一定要把握住出口,也就是做递归算法必须要有一个明确的递归结束条件。这一点是非常重要的。其实这个出口是非常好理解的,就是一个条件,当满足了这个条件的时候我们就不再递归了。

  4.多叉树的基本知识

三、Spark处理

流程概述:

1.构建一个多叉树的类,类主要属性描述,name全路径如A_B_C,childList儿子链表,多叉树的构建和递归参考了这里

2.按时间范围读取上一步预处理的数据,递归计算每一级页面的属性指标,并根据页面路径插入到初始化的Node类根节点中。

3.递归遍历上一步初始化的根节点对象,并替换其中的name的id为名称,其中借助Spark DataFrame查询数据。

4.将root对象转化成json格式,返回前端。

附上代码如下。

import java.util

import com.google.gson.Gson
import org.apache.spark.SparkContext
import org.apache.log4j.{Level, Logger => LG}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.hive.HiveContext

/**
  * 用户行为路径实时计算实现
  * Created by chouyarn on 2016/12/12.
  */

/**
  * 树结构类
  *
  * @param name      页面路径
  * @param visit     访次
  * @param pv        pv
  * @param uv        uv
  * @param childList 儿子链表
  */
class Node(
            var name: String,
            var path:Any,
            var visit: Any,
            var pv: Any,
            var uv: Any,
            var childList: util.ArrayList[Node]) extends Serializable {
  /**
    * 添加子节点
    *
    * @param node 子节点对象
    * @return
    */
  def addNode(node: Node) = {
    childList.add(node)
  }

  /**
    * 遍历节点,深度优先
    */
  def traverse(): Unit = {
    if (childList.isEmpty)
      return
    //    node.
    val childNum = childList.size

    for (i <- 0 to childNum - 1) {
      val child: Node = childList.get(i)
      child.name = child.name.split("_").last//去除前边绝对路径
      child.traverse()
    }
  }

  /**
    * 遍历节点,深度优先
    */
  def traverse(pages:DataFrame): Unit = {
    if (childList.isEmpty||childList.size()==0)
      return
    //    node.
    val childNum = childList.size

    for (i <- 0 to childNum - 1) {
      val child: Node = childList.get(i)
      child.name = child.name.split("_").last
      val id =pages.filter("page_id='"+child.name+"'").select("page_name").first().getString(0)//替换id为name
      child.name = id
      child.traverse(pages)
    }
  }

  /**
    * 动态插入节点
    *
    * @param node 节点对象
    * @return
    */
  def insertNode(node: Node): Boolean = {
    val insertName = node.name
    if (insertName.stripSuffix("_" + insertName.split("_").last).equals(name)) {
      //      node.name=node.name.split("_").last
      addNode(node)
      true
    } else {
      val childList1 = childList
      val childNum = childList1.size
      var insetFlag = false
      for (i <- 0 to childNum - 1) {
        val childNode = childList1.get(i)
        insetFlag = childNode.insertNode(node)
        if (insetFlag == true)
          true
      }
      false
    }
  }
}

/**
  * 处理类
  */
class Path extends CleanDataWithRDD {
  LG.getRootLogger.setLevel(Level.ERROR)//控制spark日志输出级别

  val sc: SparkContext = SparkUtil.createSparkContextYarn("path")
  val hiveContext = new HiveContext(sc)

  override def handleData(conf: Map[String, String]): Unit = {

    val num = conf.getOrElse("depth", 5)//路径深度
    val pageName = conf.getOrElse("pageName", "")//页面名称
    //    val pageName = "A_C"
    val src = conf.getOrElse("src", "")//标示来源pc or wap

    val pageType = conf.getOrElse("pageType", "")//向前或者向后路径
    val startDate = conf.getOrElse("startDate", "")//开始日期
    val endDate = conf.getOrElse("endDate", "")//结束日期
    //        保存log缓存以保证后续使用
    val log = hiveContext.sql(s"select fpid,sessionid,path " +
      s"from specter.t_pagename_path_sparksource " +
      s"where day between '$startDate' and '$endDate' and path_type=$pageType and src='$src' ")
      .map(s => {
        (s.apply(0) + "_" + s.apply(1) + "_" + s.apply(2))
      }).repartition(10).persist()

    val pages=hiveContext.sql("select page_id,page_name from specter.code_pagename").persist()//缓存页面字典表
    // 本地测试数据
    // val log = sc.parallelize(Seq("fpid1_sessionid1_A_B",
    //      "fpid2_sessionid2_A_C_D_D_B_A_D_A_F_B",
    //      "fpid1_sessionid1_A_F_A_C_D_A_B_A_V_A_N"))
    var root: Node = null
    /**
      * 递归将计算的节点放入树结构
      *
      * @param pageName 页面名称
      */
    def compute(pageName: String): Unit = {
      val currenRegex = pageName.r //页面的正则表达式
      val containsRdd = log.filter(_.contains(pageName)).persist() //包含页面名称的RDD,后续步骤用到
      val currentpv = containsRdd.map(s => {//计算pv
        currenRegex findAllIn (s)
      }).map(_.mkString(","))
        .flatMap(_.toString.split(","))
        .filter(_.size > 0)
        .count()

      val tempRdd = containsRdd.map(_.split("_")).persist() //分解后的RDD
      val currentuv = tempRdd.map(_.apply(0)).distinct().count() //页面uv
      val currentvisit = tempRdd.map(_.apply(1)).distinct().count() //页面访次

      //      初始化根节点或添加节点
      if (root == null) {
        root = new Node(pageName,pageName.hashCode, currentvisit, currentpv, currentuv, new util.ArrayList[Node]())
      } else {
        root.insertNode(new Node(pageName,pageName.hashCode, currentvisit, currentpv, currentuv, new util.ArrayList[Node]()))
      }

      if (pageName.split("_").size == 5||tempRdd.isEmpty()) {//递归出口
        return
      } else {
        //          确定下个页面名称正则表达式
        val nextRegex =
        s"""${pageName}_[0-9a-z]{8}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{12}""".r
        // 本地测试       
        // val nextRegex =s"""${pageName}_[A-Z]""".r
        val nextpvMap = containsRdd.map(s => {//下一级路径的pv数top9
          nextRegex findAllIn (s)
        }).map(_.mkString(","))
          .flatMap(_.toString.split(","))
          .filter(_.size > 0)
          .map(s => (s.split("_").last, 1))
          .filter(!_._1.contains(pageName.split("_")(0)))
          .reduceByKey(_ + _).sortBy(_._2, false).take(9).toMap

        nextpvMap.keySet.foreach(key => {//递归计算
          compute(pageName + "_" + key)
        })
      }
    }
    //触发计算
    compute(pageName)
    val gson: Gson = new Gson()

    root.traverse(pages)
    root.name=pages.filter("page_id='"+pageName+"'").select("page_name").first().getString(0)
    println(gson.toJson(root))//转化成JSON并打印,Alibaba fsatjson不可用,还是google得厉害。
  }

  override def stop(): Unit = {
    sc.stop()
  }
}

object Path {
  def main(args: Array[String]): Unit = {
    //    println("ss".hashCode)
    var num=5
    try {
      num=args(5).toInt
    }catch {
      case e:Exception =>
    }

    val map = Map("pageName" -> args(0),
      "pageType" -> args(1),
      "startDate" -> args(2),
      "endDate" -> args(3),
      "src" -> args(4),
      "depth" -> num.toString)
    val path = new Path()
    path.handleData(map)
  }
}

四、总结

  Spark基本是解决了实时计算行为路径的问题,缺点就是延迟稍微有点高,因为提交Job之后要向集群申请资源,申请资源和启动就耗费将近30秒,后续这块可以优化。据说spark-jobserver提供一个restful接口,为Job预启动容器,博主没时间研究有兴趣的可以研究下啦。

  fastjson在对复杂对象的转换中不如Google 的Gson。

  使用递归要慎重,要特别注意出口条件,若出口不明确,很有可能导致死循环。

基于clickhouse的用户行为(路径)分析实践(代码片段)

...丰富的多参聚合函数(parametricaggregatefunction)和基于数组+Lambda表达式的高阶函数(higher-orderfunction),将它们灵活使用可以达到魔法般的效果。在我们的体系中,ClickHouse定位点击流数仓,所以下面... 查看详情

如何做好用户分析

...需求进行前端布局调整。04用户健康度分析用户健康度是基于用户行为数据综合考虑的核心指标,体现产品的运营情况,为产品的发展进行预警。包括三大类型指标:产品基础指标、流量质量指标、产品营收指标。它们三者构成... 查看详情

基于无埋点技术的用户行为分析

...个是过程。现在国内市场上关于用户行为分析的产品分为基于前台数据的用户行为分析和基于后台数据的用户行为分析。基于前台技术的用户行为分析侧重于用户的行为分析,而基于后台技术的用户行为分析侧重于用户行为的结... 查看详情

用户行为路径分析——附python桑基图代码实现

参考技术A用户路径,就是用户在网站或APP中的访问行为路径,为了衡量网站/APP的优化效果或者营销推广效果,了解用户的行为偏好,要对访问路径的数据进行分析。用户路径分析和转化分析有点类似,转化分析能告诉我们最终... 查看详情

3.8spark用户日志分析

文章目录网站流量指标为什么要分析日志用户行为日志Spark日志分析日志挖掘的方法路径分析关联规则序列模式分类分析聚类分析统计协同过滤参考3.7SparkRDD编程本文讨论的日志处理方法中的日志,仅指用户访问日志。其实并没... 查看详情

常见用户行为分析模型:用户行为路径分析模型

用户行为路径分析同样是重要的数据分析模型,它为企业实现理想的数据驱动与布局调整提供科学指导,对精准勾勒用户画像也有重要参考价值。用户访问APP/网络,如同参观画展,观众是感受和传达画展参展方和展品的目的受... 查看详情

利用用户行为数据——基于spark平台的协同过滤实时电影推荐系统项目系列博客

系列文章目录初识推荐系统——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(一)利用用户行为数据——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(二)项目主要效果展示——基于Spark... 查看详情

用户行为分析模型——路径分析(代码片段)

...网页或者营销渠道中,用户行为模型有比较多,基于渠道的,笔者觉得有:渠道类型渠道重要性渠道跳转与流失单渠道,多节点路径分析,漏斗功能多渠道归因分析这里多渠道指的是,单渠道多节点的... 查看详情

基于网站的用户行为分析

...;设定了一些常用的用户指标和值得关注的用户指标,基于这些分类用户指标的分析可以发现用户运营和推广中的诸多问题,其中活跃用户和流失用户的定义中已经用到了与用户行为相关的指标,这里重点介绍常用的... 查看详情

常见用户行为分析模型:用户行为路径分析模型

用户行为路径分析同样是重要的数据分析模型,它为企业实现理想的数据驱动与布局调整提供科学指导,对精准勾勒用户画像也有重要参考价值。用户访问APP/网络,如同参观画展,观众是感受和传达画展参展方和展品的目的受... 查看详情

如何用sql分析电商用户行为数据(案例)

...法当没有清晰的数据看板时我们需要先清洗杂乱的数据,基于分析模型做可视化,搭建描述性的数据看板。然后基于描述性的数据挖掘问题,提出假设做优化,或者基于用户特征数据进行预测分析找规律,基于规律设计策略。简... 查看详情

spark用户访问session分析

...构user_visit_action点击流数据 (hive表)date//日期:代表用户点击行为是在哪一天发生user_id//代表这个点击行为是哪一个用户执行的session_id//唯一标识了某个用户的一个访问sessionpage_id//页面的id,点击品类,进入某个页面action_na... 查看详情

用户行为流程分析法及在金融分析中的应用

参考技术A用户行为路径分析是一种监测用户流向,从而统计产品使用深度的分析方法。它主要根据每位用户在App或网站中的点击行为日志,分析用户在App或网站中各个模块的流转规律与特点,挖掘用户的访问或点击模式,进而... 查看详情

spark实践——基于sparkstreaming的实时日志分析系统(代码片段)

Spark实践——基于SparkStreaming的实时日志分析系统本文基于《Spark最佳实践》第6章Spark流式计算。我们知道网站用户访问流量是不间断的,基于网站的访问日志,即Weblog分析是典型的流式实时计算应用场景。比如百度统计,它可以... 查看详情

产品分析:用户行为路径及确定性

参考技术A用户行为路径,既用户在具体场景中使用产品的具体操作步骤,指用户要达成某个确定性目标,在产品上操作的功能步骤。确定性,可分为三个层级。一、如果针对产品本身,一般是指该产品能给用户带来明确的作用... 查看详情

常见用户行为分析模型:行为事件分析模型

...过理论推导,能够相对完整地揭示用户行为的内在规律。基于此帮助企业实现多维交叉分析,帮助企业建立快速反应、适应变化的敏捷商业智能决策。结合近期的思考与学习,将为大家陆续介绍不同针对用户行为的分析模型。本... 查看详情

spark项目之电商用户行为分析大数据平台之idea项目搭建及工具类介绍

一、创建Maven项目创建项目,名称为LogAnalysis二、常用工具类2.1 配置管理组建ConfigurationManager.java1importjava.io.InputStream;2importjava.util.Properties;34/**5*配置管理组件6*7*1、配置管理组件可以复杂,也可以很简单,对于简单的配置管理组... 查看详情

基于hive的淘宝用户行为数据分析

基于Hive的淘宝用户行为数据分析本文将通过阿里云天池提供的淘宝用户行为数据集,从不同维度出发,通过数据来分析淘宝用户的一些行为习惯和爱好。淘宝或商家可以根据结论做出一些举措。一、数据集介绍本数据集... 查看详情