大数据项目之电商数仓-用户行为数据仓库(代码片段)

_TIM_ _TIM_     2022-12-06     560

关键词:

数据仓库分层

  • 把复杂问题简单化,把一个复杂的任务分解成多个步骤来完成,每一层只处理单一的步骤,比较简单和容易理解
  • 清晰的数据结构,每一层都有它的作用域,这样我们在使用表的时候能更方便的定位和理解。 便于维护数据的准确性,当数据出现问题的时候,可以不用修复所有的数据,只需要从有问题的步骤开始修复
  • 减少重复开发,规范数据分层,通过中间层数据,能够减少极大的重复计算,增加一次计算结果的复用性
  • 隔离原始数据,使得真是数据与统计数据接耦

分层结构图

  • ODS层(原始数据层)
    原始数据层,存放原始数据,直接加载原始日志、数据,数据保持原貌不做处理。
  • DWD层(明细数据层)
    结构和粒度与ODS层保持一致,对ODS层数据进行清洗(去除空值,脏数据,超过极限范围的数据),也有公司叫DWI
  • DWS层(服务数据层)
    DWD为基础,进行轻度汇总。一般聚集到以用户当日,设备当日,商家当日,商品当日等等的粒度。在这层通常会有以某一个维度为线索,组成跨主题的宽表,比如,一个用户的当日的签到数、收藏数、评论数、抽奖数、订阅数、点赞数、浏览商品数、添加购物车数、下单数、支付数、退款数、点击广告数组成的多列表。
  • ADS层(数据应用层)
    数据应用层,也有公司或书把这层命名为APP层、DAL层等。面向实际的数据需求,以DWD或者DWS层的数据为基础,组成的各种统计报表。统计结果最终同步到RDS以供BI或应用系统查询使用。

Hive运行引擎Tez

性能优于MapReduce,用Hive直接编写程序,假设有四个有依赖关系的MapReduce作业,绿色是Rgmallce Task,云状表示写屏蔽,需要将中间结果持久化写到HDFSTez可以将多个有依赖的作业转换为一个作业,这样只需写一次HDFS,且中间节点较少,从而大大提升DAG作业的性能。

数仓搭建之ODS & DWD

ODS层

原始数据层,存放原始数据,直接加载原始日志、数据,数据保持原貌不做处理。
创建启动日志表ods_start_log

hive (gmall)> 
drop table if exists ods_start_log;
CREATE EXTERNAL TABLE  `ods_start_log`(`line` string)
PARTITIONED BY (`dt` string)
STORED AS
  INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
  OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION '/warehouse/gmall/ods/ods_start_log';

创建事件日志表ods_event_log

hive (gmall)> 
drop table if exists ods_event_log;
CREATE EXTERNAL TABLE  `ods_event_log`(`line` string)
PARTITIONED BY (`dt` string)
STORED AS
  INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
  OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION '/warehouse/gmall/ods/ods_event_log';

ODS层加载数据脚本

#!/bin/bash

# 定义变量方便修改
APP=gmall
hive=/opt/module/hive/bin/hive

# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n $1 ] ;then
 log_date=$1
else 
 log_date=`date  -d "-1 day"  +%F`  
fi 

echo "===日志日期为 $log_date==="
$hive -e "load data inpath '/origin_data/gmall/log/topic_start/$log_date' into table "$APP".ods_start_log partition(dt='$log_date')"
$hive -e "load data inpath '/origin_data/gmall/log/topic_event/$log_date' into table "$APP".ods_event_log partition(dt='$log_date')"

DWD层数据解析

ODS层数据进行清洗(去除空值,脏数据,超过极限范围的数据,行式存储改为列存储,改压缩格式)

创建启动日志基础明细表
其中event_nameevent_json用来对应事件名和整个事件。这个地方将原始日志1对多的形式拆分出来了。操作的时候我们需要将原始日志展平,需要用到UDFUDTF

hive (gmall)> 
drop table if exists dwd_base_start_log;
CREATE EXTERNAL TABLE `dwd_base_start_log`(
	`mid_id` string,
	`user_id` string, 
	`version_code` string, 
	`version_name` string, 
	`lang` string, 
	`source` string, 
	`os` string, 
	`area` string, 
	`model` string,
	`brand` string, 
	`sdk_version` string, 
	`gmail` string, 
	`height_width` string, 
	`app_time` string, 
	`network` string, 
	`lng` string, 
	`lat` string, 
	`event_name` string, 
	`event_json` string, 
	`server_time` string)
PARTITIONED BY (`dt` string)
stored as  parquet
location '/warehouse/gmall/dwd/dwd_base_start_log/';

创建事件日志基础明细表

hive (gmall)> 
drop table if exists dwd_base_event_log;
CREATE EXTERNAL TABLE `dwd_base_event_log`(
	`mid_id` string,
	`user_id` string, 
	`version_code` string, 
	`version_name` string, 
	`lang` string, 
	`source` string, 
	`os` string, 
	`area` string, 
	`model` string,
	`brand` string, 
	`sdk_version` string, 
	`gmail` string, 
	`height_width` string, 
	`app_time` string, 
	`network` string, 
	`lng` string, 
	`lat` string, 
	`event_name` string, 
	`event_json` string, 
	`server_time` string)
PARTITIONED BY (`dt` string)
stored as  parquet
location '/warehouse/gmall/dwd/dwd_base_event_log/';

自定义UDF函数(解析公共字段)

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.json.JSONException;
import org.json.JSONObject;

public class BaseFieldUDF extends UDF 

    public String evaluate(String line, String jsonkeysString) 
        
        // 0 准备一个sb
        StringBuilder sb = new StringBuilder();

        // 1 切割jsonkeys  mid uid vc vn l sr os ar md
        String[] jsonkeys = jsonkeysString.split(",");

        // 2 处理line   服务器时间 | json
        String[] logContents = line.split("\\\\|");

        // 3 合法性校验
        if (logContents.length != 2 || StringUtils.isBlank(logContents[1])) 
            return "";
        

        // 4 开始处理json
        try 
            JSONObject jsonObject = new JSONObject(logContents[1]);

            // 获取cm里面的对象
            JSONObject base = jsonObject.getJSONObject("cm");

            // 循环遍历取值
            for (int i = 0; i < jsonkeys.length; i++) 
                String filedName = jsonkeys[i].trim();

                if (base.has(filedName)) 
                    sb.append(base.getString(filedName)).append("\\t");
                 else 
                    sb.append("").append("\\t");
                
            

            sb.append(jsonObject.getString("et")).append("\\t");
            sb.append(logContents[0]).append("\\t");
         catch (JSONException e) 
            e.printStackTrace();
        

        return sb.toString();
    

自定义UDTF函数(解析具体事件字段)

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.json.JSONArray;
import org.json.JSONException;

import java.util.ArrayList;

public class EventJsonUDTF extends GenericUDTF 

    //该方法中,我们将指定输出参数的名称和参数类型:
    @Override
    public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException 

        ArrayList<String> fieldNames = new ArrayList<String>();
        ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();

        fieldNames.add("event_name");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        fieldNames.add("event_json");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
    

    //输入1条记录,输出若干条结果
    @Override
    public void process(Object[] objects) throws HiveException 

        // 获取传入的et
        String input = objects[0].toString();

        // 如果传进来的数据为空,直接返回过滤掉该数据
        if (StringUtils.isBlank(input)) 
            return;
         else 

            try 
                // 获取一共有几个事件(ad/facoriters)
                JSONArray ja = new JSONArray(input);

                if (ja == null)
                    return;

                // 循环遍历每一个事件
                for (int i = 0; i < ja.length(); i++) 
                    String[] result = new String[2];

                    try 
                        // 取出每个的事件名称(ad/facoriters)
                        result[0] = ja.getJSONObject(i).getString("en");

                        // 取出每一个事件整体
                        result[1] = ja.getString(i);
                     catch (JSONException e) 
                        continue;
                    

                    // 将结果返回
                    forward(result);
                
             catch (JSONException e) 
                e.printStackTrace();
            
        
    

    //当没有记录处理的时候该方法会被调用,用来清理代码或者产生额外的输出
    @Override
    public void close() throws HiveException 

    


业务术语

  1. 用户
    用户以设备为判断标准,在移动统计中,每个独立设备认为是一个独立用户。Android系统根据IMEI号,IOS系统根据OpenUDID来标识一个独立用户,每部手机一个用户。
  2. 新增用户
    首次联网使用应用的用户。如果一个用户首次打开某app,那这个用户定义为新增用户;卸载再安装的设备,不会被算作一次新增。新增用户包括日新增用户、周新增用户、月新增用户。
  3. 活跃用户
    打开应用的用户即为活跃用户,不考虑用户的使用情况。每天一台设备打开多次会被计为一个活跃用户。
  4. 周(月)活跃用户
    某个自然周(月)内启动过应用的用户,该周(月)内的多次启动只记一个活跃用户。
  5. 月活跃率
    月活跃用户与截止到该月累计的用户总和之间的比例。
  6. 沉默用户
    用户仅在安装当天(次日)启动一次,后续时间无再启动行为。该指标可以反映新增用户质量和用户与APP的匹配程度。
  7. 版本分布
    不同版本的周内各天新增用户数,活跃用户数和启动次数。利于判断App各个版本之间的优劣和用户行为习惯。
  8. 本周回流用户
    上周未启动过应用,本周启动了应用的用户。
  9. 连续n周活跃用户
    连续n周,每周至少启动一次。
  10. 忠诚用户
    连续活跃5周以上的用户
  11. 连续活跃用户
    连续2周及以上活跃的用户
  12. 近期流失用户
    连续n(2<= n <= 4)周没有启动应用的用户(第n+1周没有启动过)
  13. 留存用户
    某段时间内的新增用户,经过一段时间后,仍然使用应用的被认作是留存用户;这部分用户占当时新增用户的比例即是留存率。例如,5月份新增用户200,这200人在6月份启动过应用的有100人,7月份启动过应用的有80人,8月份启动过应用的有50人;则5月份新增用户一个月后的留存率是50%,二个月后的留存率是40%,三个月后的留存率是25%。
  14. 用户新鲜度
    每天启动应用的新老用户比例,即新增用户数占活跃用户数的比例。
  15. 单次使用时长
    每次启动使用的时间长度。
  16. 日使用时长
    累计一天内的使用时间长度。
  17. 启动次数计算标准
    IOS平台应用退到后台就算一次独立的启动;Android平台我们规定,两次启动之间的间隔小于30秒,被计算一次启动。用户在使用过程中,若因收发短信或接电话等退出应用30秒又再次返回应用中,那这两次行为应该是延续而非独立的,所以可以被算作一次使用行为,即一次启动。业内大多使用30秒这个标准,但用户还是可以自定义此时间间隔。

大数据项目之电商数仓-用户行为数据采集(代码片段)

数据仓库简介数据仓库是为企业所有决策制定过程,提供所有系统数据支持的战略集合,通过数据仓库中的数据的分析,可以帮助企业改进业务流程、控制成本、提高产品质量等。项目需求实时采集买点的用户行为数... 查看详情

大数据项目之电商数仓-业务数据仓库(代码片段)

电商业务流程简介电商术语SKU,库存量单位,即库存进出计量的基本单元,可以是以件,盒,托盘等为单位。SKU这是对于大型连锁超市DC(配送中心)物流管理的一个必要的方法。现在已经被引申为产... 查看详情

大数据项目之电商数仓-用户行为数据仓库(代码片段)

数据仓库分层把复杂问题简单化,把一个复杂的任务分解成多个步骤来完成,每一层只处理单一的步骤,比较简单和容易理解清晰的数据结构,每一层都有它的作用域,这样我们在使用表的时候能更方便的定... 查看详情

大数据项目之电商数仓数据仓库概念项目需求及架构设计(代码片段)

文章目录1.数据仓库概念2.项目需求及架构设计2.1项目需求分析2.1.1采集平台2.1.2离线需求2.1.3实时需求2.1.4思考题2.2项目框架2.2.1技术选型2.2.2系统数据流程设计2.2.3框架版本选型2.2.3.1Apache框架版本2.2.4服务器选型2.2.4.1物理机:... 查看详情

数据仓库之电商数仓--1用户行为数据采集(代码片段)

目录一、数据仓库概念二、项目需求及架构设计2.1项目需求分析2.2项目框架2.2.1技术选型2.2.2系统数据流程设计2.2.3框架版本选型2.2.4服务器选型2.2.5集群规模2.2.6集群资源规划设计三、数据生成模块3.1目标数据3.1.1页面日志3.1.2事... 查看详情

大数据项目之电商数仓日志采集flumesourcechannelsinkkafka的三个架构

文章目录4.用户行为数据采集模块4.3日志采集Flume4.3.1Kafka的三个架构4.3.1.1source4.3.1.2channel4.3.1.3sink4.3.1.4kafkasource4.3.1.5kafkasink4.3.1.6kafkachannel4.3.1.6.1第一个结构4.3.1.6.2第二个结构4.3.1.6.3第三个结构4.用户行为数据采集模块4.3日志采集F... 查看详情

大数据项目之电商数仓-业务数据仓库(代码片段)

电商业务流程简介电商术语SKU,库存量单位,即库存进出计量的基本单元,可以是以件,盒,托盘等为单位。SKU这是对于大型连锁超市DC(配送中心)物流管理的一个必要的方法。现在已经被引申为产... 查看详情

电商数仓——(师承尚硅谷)大数据实战项目(代码片段)

数仓实战1.概念技术选型:搭建环境三台ECS创建wts用户:useraddwtspasswdwts输入两边密码cd/home有无wts?让wts有sudoer权力:[root@hadoop100~]#vim/etc/sudoers修改/etc/sudoers文件,在%wheel这行下面添加一行,如下所示&# 查看详情

大数据项目之数仓相关知识

第1章数据仓库概念数据仓库(DW):为企业指定决策,提供数据支持的,帮助企业,改进业务流程,提高产品质量等。DW的输入数据通常包括:业务数据,用户行为数据和爬虫数据等 ODS: 数据备份 D... 查看详情

2023/2/10大数据实习日志(代码片段)

今日学习内容一、数据仓库(DataWarehouse)概念数据仓库,顾名思义就是用于存储数据的仓库,是为企业制定决策,提供数据支持的。可以帮助企业,改进业务流程、提高产品质量等。二、数仓数据分类数... 查看详情

大数据实战之用户画像概念项目概述及环境搭建(代码片段)

下面跟着我一起来学习大数据获取用户画像:项目Profile课程安排 :用户画像概念1、用户画像概述1.1、产生背景早期的用户画像起源于交互设计之父AlanCooper提出的”Personasareaconcreterepresentationoftargetusers.”。认为用户画像... 查看详情

电商数仓笔记1(数据仓库概念,项目需求及架构设计,数据生成模块)(代码片段)

电商数仓一、数据仓库概念二、项目需求及架构设计1、项目需求分析2、项目框架(1)技术选型(2)系统数据流程设计(3)框架版本选型(4)服务器选型(5)集群规模(6)集群资... 查看详情

二.flink实时项目电商用户行为之实时流量统计(代码片段)

...块创建和数据准备在Flink-project下新建一个mavenmodule作为子项目,命名为gmall-network-flow。在这个子模块中,我们同样并没有引入更多的依赖,所以也不需要改动pom文件。在src/main/目录下,将apache服务器的日志文件apache.log复制到资... 查看详情

hive数仓项目之访问咨询主题看板增量的流程(代码片段)

 往期内容:Hive数仓项目架构说明、环境搭建及数据仓库基础知识Hive数仓项目之数仓分层、数仓工具的使用Hive数仓项目之访问咨询主题看板:数据的采集、转换、分析导出今日内容:访问咨询主题看板_增量的流程(操作)1.... 查看详情

实时即未来,大数据项目车联网之重启机制及数据积压(代码片段)

文章目录1checkpoint配置2任务重启策略3分区发现4数据积压问题4.1什么是数据积压4.2数据积压的原因4.3数据积压的后果4.4积压解决方案4.5解决数据积压方法1checkpoint配置l选择合适的Checkpoint存储方式lCheckPoint存储方式存在三种官方文... 查看详情

湖仓一体电商项目:3万字带你从头开始搭建12个大数据项目基础组件(代码片段)

文章目录一、搭建Zookeeper1、上传zookeeper并解压,配置环境变量2、在node3节点配置zookeeper3、将配置好的zookeeper发送到node4,node5节点4、各个节点上创建数据目录,并配置zookeeper环境变量5、各个节点创建节点ID6、各个节点启动zookee... 查看详情

hive数仓项目之需求分析建模分析优化方案(代码片段)

 往期内容:Hive数仓项目架构说明、环境搭建及数据仓库基础知识Hive数仓项目之数仓分层、数仓工具的使用Hive数仓项目之访问咨询主题看板:数据的采集、转换、分析导出Hive数仓项目之访问咨询主题看板增量的流程​... 查看详情

助力工业物联网,工业大数据项目之数据采集(代码片段)

文章目录01:Sqoop命令回顾02:YARN资源调度及配置03:MR的Uber模式04:Sqoop采集数据格式问题05:问题解决:Avro格式06:Sqoop增量采集方案回顾01:Sqoop命令回顾目标:掌握Sqoop常用命令的使用路径step... 查看详情