关键词:
数据仓库分层
- 把复杂问题简单化,把一个复杂的任务分解成多个步骤来完成,每一层只处理单一的步骤,比较简单和容易理解
- 清晰的数据结构,每一层都有它的作用域,这样我们在使用表的时候能更方便的定位和理解。 便于维护数据的准确性,当数据出现问题的时候,可以不用修复所有的数据,只需要从有问题的步骤开始修复
- 减少重复开发,规范数据分层,通过中间层数据,能够减少极大的重复计算,增加一次计算结果的复用性
- 隔离原始数据,使得真是数据与统计数据接耦
分层结构图
- ODS层(原始数据层)
原始数据层,存放原始数据,直接加载原始日志、数据,数据保持原貌不做处理。 - DWD层(明细数据层)
结构和粒度与ODS
层保持一致,对ODS
层数据进行清洗(去除空值,脏数据,超过极限范围的数据),也有公司叫DWI
。 - DWS层(服务数据层)
以DWD
为基础,进行轻度汇总。一般聚集到以用户当日,设备当日,商家当日,商品当日等等的粒度。在这层通常会有以某一个维度为线索,组成跨主题的宽表,比如,一个用户的当日的签到数、收藏数、评论数、抽奖数、订阅数、点赞数、浏览商品数、添加购物车数、下单数、支付数、退款数、点击广告数组成的多列表。 - ADS层(数据应用层)
数据应用层,也有公司或书把这层命名为APP
层、DAL
层等。面向实际的数据需求,以DWD
或者DWS
层的数据为基础,组成的各种统计报表。统计结果最终同步到RDS
以供BI
或应用系统查询使用。
Hive运行引擎Tez
性能优于MapReduce
,用Hive
直接编写程序,假设有四个有依赖关系的MapReduce
作业,绿色是Rgmallce Task
,云状表示写屏蔽,需要将中间结果持久化写到HDFS
。Tez
可以将多个有依赖的作业转换为一个作业,这样只需写一次HDFS
,且中间节点较少,从而大大提升DAG
作业的性能。
数仓搭建之ODS & DWD
ODS层
原始数据层,存放原始数据,直接加载原始日志、数据,数据保持原貌不做处理。
创建启动日志表ods_start_log
hive (gmall)>
drop table if exists ods_start_log;
CREATE EXTERNAL TABLE `ods_start_log`(`line` string)
PARTITIONED BY (`dt` string)
STORED AS
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION '/warehouse/gmall/ods/ods_start_log';
创建事件日志表ods_event_log
hive (gmall)>
drop table if exists ods_event_log;
CREATE EXTERNAL TABLE `ods_event_log`(`line` string)
PARTITIONED BY (`dt` string)
STORED AS
INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION '/warehouse/gmall/ods/ods_event_log';
ODS层加载数据脚本
#!/bin/bash
# 定义变量方便修改
APP=gmall
hive=/opt/module/hive/bin/hive
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n $1 ] ;then
log_date=$1
else
log_date=`date -d "-1 day" +%F`
fi
echo "===日志日期为 $log_date==="
$hive -e "load data inpath '/origin_data/gmall/log/topic_start/$log_date' into table "$APP".ods_start_log partition(dt='$log_date')"
$hive -e "load data inpath '/origin_data/gmall/log/topic_event/$log_date' into table "$APP".ods_event_log partition(dt='$log_date')"
DWD层数据解析
对ODS
层数据进行清洗(去除空值,脏数据,超过极限范围的数据,行式存储改为列存储,改压缩格式)
创建启动日志基础明细表
其中event_name
和event_json
用来对应事件名和整个事件。这个地方将原始日志1对多的形式拆分出来了。操作的时候我们需要将原始日志展平,需要用到UDF
和UDTF
。
hive (gmall)>
drop table if exists dwd_base_start_log;
CREATE EXTERNAL TABLE `dwd_base_start_log`(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`event_name` string,
`event_json` string,
`server_time` string)
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_base_start_log/';
创建事件日志基础明细表
hive (gmall)>
drop table if exists dwd_base_event_log;
CREATE EXTERNAL TABLE `dwd_base_event_log`(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`event_name` string,
`event_json` string,
`server_time` string)
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_base_event_log/';
自定义UDF函数(解析公共字段)
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.json.JSONException;
import org.json.JSONObject;
public class BaseFieldUDF extends UDF
public String evaluate(String line, String jsonkeysString)
// 0 准备一个sb
StringBuilder sb = new StringBuilder();
// 1 切割jsonkeys mid uid vc vn l sr os ar md
String[] jsonkeys = jsonkeysString.split(",");
// 2 处理line 服务器时间 | json
String[] logContents = line.split("\\\\|");
// 3 合法性校验
if (logContents.length != 2 || StringUtils.isBlank(logContents[1]))
return "";
// 4 开始处理json
try
JSONObject jsonObject = new JSONObject(logContents[1]);
// 获取cm里面的对象
JSONObject base = jsonObject.getJSONObject("cm");
// 循环遍历取值
for (int i = 0; i < jsonkeys.length; i++)
String filedName = jsonkeys[i].trim();
if (base.has(filedName))
sb.append(base.getString(filedName)).append("\\t");
else
sb.append("").append("\\t");
sb.append(jsonObject.getString("et")).append("\\t");
sb.append(logContents[0]).append("\\t");
catch (JSONException e)
e.printStackTrace();
return sb.toString();
自定义UDTF函数(解析具体事件字段)
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.json.JSONArray;
import org.json.JSONException;
import java.util.ArrayList;
public class EventJsonUDTF extends GenericUDTF
//该方法中,我们将指定输出参数的名称和参数类型:
@Override
public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException
ArrayList<String> fieldNames = new ArrayList<String>();
ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
fieldNames.add("event_name");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("event_json");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
//输入1条记录,输出若干条结果
@Override
public void process(Object[] objects) throws HiveException
// 获取传入的et
String input = objects[0].toString();
// 如果传进来的数据为空,直接返回过滤掉该数据
if (StringUtils.isBlank(input))
return;
else
try
// 获取一共有几个事件(ad/facoriters)
JSONArray ja = new JSONArray(input);
if (ja == null)
return;
// 循环遍历每一个事件
for (int i = 0; i < ja.length(); i++)
String[] result = new String[2];
try
// 取出每个的事件名称(ad/facoriters)
result[0] = ja.getJSONObject(i).getString("en");
// 取出每一个事件整体
result[1] = ja.getString(i);
catch (JSONException e)
continue;
// 将结果返回
forward(result);
catch (JSONException e)
e.printStackTrace();
//当没有记录处理的时候该方法会被调用,用来清理代码或者产生额外的输出
@Override
public void close() throws HiveException
业务术语
- 用户
用户以设备为判断标准,在移动统计中,每个独立设备认为是一个独立用户。Android
系统根据IMEI
号,IOS
系统根据OpenUDID
来标识一个独立用户,每部手机一个用户。 - 新增用户
首次联网使用应用的用户。如果一个用户首次打开某app
,那这个用户定义为新增用户;卸载再安装的设备,不会被算作一次新增。新增用户包括日新增用户、周新增用户、月新增用户。 - 活跃用户
打开应用的用户即为活跃用户,不考虑用户的使用情况。每天一台设备打开多次会被计为一个活跃用户。 - 周(月)活跃用户
某个自然周(月)内启动过应用的用户,该周(月)内的多次启动只记一个活跃用户。 - 月活跃率
月活跃用户与截止到该月累计的用户总和之间的比例。 - 沉默用户
用户仅在安装当天(次日)启动一次,后续时间无再启动行为。该指标可以反映新增用户质量和用户与APP
的匹配程度。 - 版本分布
不同版本的周内各天新增用户数,活跃用户数和启动次数。利于判断App
各个版本之间的优劣和用户行为习惯。 - 本周回流用户
上周未启动过应用,本周启动了应用的用户。 - 连续n周活跃用户
连续n周,每周至少启动一次。 - 忠诚用户
连续活跃5周以上的用户 - 连续活跃用户
连续2周及以上活跃的用户 - 近期流失用户
连续n(2<= n <= 4)
周没有启动应用的用户(第n+1
周没有启动过) - 留存用户
某段时间内的新增用户,经过一段时间后,仍然使用应用的被认作是留存用户;这部分用户占当时新增用户的比例即是留存率。例如,5月份新增用户200,这200人在6月份启动过应用的有100人,7月份启动过应用的有80人,8月份启动过应用的有50人;则5月份新增用户一个月后的留存率是50%,二个月后的留存率是40%,三个月后的留存率是25%。 - 用户新鲜度
每天启动应用的新老用户比例,即新增用户数占活跃用户数的比例。 - 单次使用时长
每次启动使用的时间长度。 - 日使用时长
累计一天内的使用时间长度。 - 启动次数计算标准
IOS
平台应用退到后台就算一次独立的启动;Android
平台我们规定,两次启动之间的间隔小于30秒,被计算一次启动。用户在使用过程中,若因收发短信或接电话等退出应用30秒又再次返回应用中,那这两次行为应该是延续而非独立的,所以可以被算作一次使用行为,即一次启动。业内大多使用30秒这个标准,但用户还是可以自定义此时间间隔。
大数据项目之电商数仓-用户行为数据采集(代码片段)
数据仓库简介数据仓库是为企业所有决策制定过程,提供所有系统数据支持的战略集合,通过数据仓库中的数据的分析,可以帮助企业改进业务流程、控制成本、提高产品质量等。项目需求实时采集买点的用户行为数... 查看详情
大数据项目之电商数仓-业务数据仓库(代码片段)
电商业务流程简介电商术语SKU,库存量单位,即库存进出计量的基本单元,可以是以件,盒,托盘等为单位。SKU这是对于大型连锁超市DC(配送中心)物流管理的一个必要的方法。现在已经被引申为产... 查看详情
大数据项目之电商数仓-用户行为数据仓库(代码片段)
数据仓库分层把复杂问题简单化,把一个复杂的任务分解成多个步骤来完成,每一层只处理单一的步骤,比较简单和容易理解清晰的数据结构,每一层都有它的作用域,这样我们在使用表的时候能更方便的定... 查看详情
大数据项目之电商数仓数据仓库概念项目需求及架构设计(代码片段)
文章目录1.数据仓库概念2.项目需求及架构设计2.1项目需求分析2.1.1采集平台2.1.2离线需求2.1.3实时需求2.1.4思考题2.2项目框架2.2.1技术选型2.2.2系统数据流程设计2.2.3框架版本选型2.2.3.1Apache框架版本2.2.4服务器选型2.2.4.1物理机:... 查看详情
数据仓库之电商数仓--1用户行为数据采集(代码片段)
目录一、数据仓库概念二、项目需求及架构设计2.1项目需求分析2.2项目框架2.2.1技术选型2.2.2系统数据流程设计2.2.3框架版本选型2.2.4服务器选型2.2.5集群规模2.2.6集群资源规划设计三、数据生成模块3.1目标数据3.1.1页面日志3.1.2事... 查看详情
大数据项目之电商数仓日志采集flumesourcechannelsinkkafka的三个架构
文章目录4.用户行为数据采集模块4.3日志采集Flume4.3.1Kafka的三个架构4.3.1.1source4.3.1.2channel4.3.1.3sink4.3.1.4kafkasource4.3.1.5kafkasink4.3.1.6kafkachannel4.3.1.6.1第一个结构4.3.1.6.2第二个结构4.3.1.6.3第三个结构4.用户行为数据采集模块4.3日志采集F... 查看详情
大数据项目之电商数仓-业务数据仓库(代码片段)
电商业务流程简介电商术语SKU,库存量单位,即库存进出计量的基本单元,可以是以件,盒,托盘等为单位。SKU这是对于大型连锁超市DC(配送中心)物流管理的一个必要的方法。现在已经被引申为产... 查看详情
电商数仓——(师承尚硅谷)大数据实战项目(代码片段)
数仓实战1.概念技术选型:搭建环境三台ECS创建wts用户:useraddwtspasswdwts输入两边密码cd/home有无wts?让wts有sudoer权力:[root@hadoop100~]#vim/etc/sudoers修改/etc/sudoers文件,在%wheel这行下面添加一行,如下所示 查看详情
大数据项目之数仓相关知识
第1章数据仓库概念数据仓库(DW):为企业指定决策,提供数据支持的,帮助企业,改进业务流程,提高产品质量等。DW的输入数据通常包括:业务数据,用户行为数据和爬虫数据等 ODS: 数据备份 D... 查看详情
2023/2/10大数据实习日志(代码片段)
今日学习内容一、数据仓库(DataWarehouse)概念数据仓库,顾名思义就是用于存储数据的仓库,是为企业制定决策,提供数据支持的。可以帮助企业,改进业务流程、提高产品质量等。二、数仓数据分类数... 查看详情
大数据实战之用户画像概念项目概述及环境搭建(代码片段)
下面跟着我一起来学习大数据获取用户画像:项目Profile课程安排 :用户画像概念1、用户画像概述1.1、产生背景早期的用户画像起源于交互设计之父AlanCooper提出的”Personasareaconcreterepresentationoftargetusers.”。认为用户画像... 查看详情
电商数仓笔记1(数据仓库概念,项目需求及架构设计,数据生成模块)(代码片段)
电商数仓一、数据仓库概念二、项目需求及架构设计1、项目需求分析2、项目框架(1)技术选型(2)系统数据流程设计(3)框架版本选型(4)服务器选型(5)集群规模(6)集群资... 查看详情
二.flink实时项目电商用户行为之实时流量统计(代码片段)
...块创建和数据准备在Flink-project下新建一个mavenmodule作为子项目,命名为gmall-network-flow。在这个子模块中,我们同样并没有引入更多的依赖,所以也不需要改动pom文件。在src/main/目录下,将apache服务器的日志文件apache.log复制到资... 查看详情
hive数仓项目之访问咨询主题看板增量的流程(代码片段)
往期内容:Hive数仓项目架构说明、环境搭建及数据仓库基础知识Hive数仓项目之数仓分层、数仓工具的使用Hive数仓项目之访问咨询主题看板:数据的采集、转换、分析导出今日内容:访问咨询主题看板_增量的流程(操作)1.... 查看详情
实时即未来,大数据项目车联网之重启机制及数据积压(代码片段)
文章目录1checkpoint配置2任务重启策略3分区发现4数据积压问题4.1什么是数据积压4.2数据积压的原因4.3数据积压的后果4.4积压解决方案4.5解决数据积压方法1checkpoint配置l选择合适的Checkpoint存储方式lCheckPoint存储方式存在三种官方文... 查看详情
湖仓一体电商项目:3万字带你从头开始搭建12个大数据项目基础组件(代码片段)
文章目录一、搭建Zookeeper1、上传zookeeper并解压,配置环境变量2、在node3节点配置zookeeper3、将配置好的zookeeper发送到node4,node5节点4、各个节点上创建数据目录,并配置zookeeper环境变量5、各个节点创建节点ID6、各个节点启动zookee... 查看详情
hive数仓项目之需求分析建模分析优化方案(代码片段)
往期内容:Hive数仓项目架构说明、环境搭建及数据仓库基础知识Hive数仓项目之数仓分层、数仓工具的使用Hive数仓项目之访问咨询主题看板:数据的采集、转换、分析导出Hive数仓项目之访问咨询主题看板增量的流程... 查看详情
助力工业物联网,工业大数据项目之数据采集(代码片段)
文章目录01:Sqoop命令回顾02:YARN资源调度及配置03:MR的Uber模式04:Sqoop采集数据格式问题05:问题解决:Avro格式06:Sqoop增量采集方案回顾01:Sqoop命令回顾目标:掌握Sqoop常用命令的使用路径step... 查看详情