flink实时数仓数据仓库项目实战《四》日志数据分流dwd(代码片段)

一阵暖风 一阵暖风     2022-12-30     619

关键词:

文章目录

【Flink实时数仓】数据仓库项目实战 《四》日志数据分流-流量域 【DWD】

DWD层设计要点:
(1)DWD层的设计依据是维度建模理论,该层存储维度模型的事实表。
(2)DWD层表名的命名规范为dwd_数据域_表名

1.流量域未经加工的事务事实表

1.1主要任务

1.1.1数据清洗(ETL)

数据传输过程中可能会出现部分数据丢失的情况,导致 JSON 数据结构不再完整,因此需要对脏数据进行过滤。

1.1.2新老访客状态标记修复

日志数据 common 字段下的 is_new 字段是用来标记新老访客状态的,1 表示新访客,0 表示老访客。前端埋点采集到的数据可靠性无法保证,可能会出现老访客被标记为新访客的问题,因此需要对该标记进行修复。

1.1.3新老访客状态标记修复

本节将通过分流对日志数据进行拆分,生成五张事务事实表写入 Kafka.
流量域页面浏览事务事实表
流量域启动事务事实表
流量域动作事务事实表
流量域曝光事务事实表
流量域错误事务事实表

1.2图解

1.3代码

代码来自尚硅谷,微信关注尚硅谷公众号 回复: 大数据 即可获取源码及资料。

展示主流程代码。具体工具类及实现请下载源码。

package com.atguigu.app.dwd;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.utils.DateFormatUtil;
import com.atguigu.utils.MyKafkaUtil;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

//数据流:web/app -> Nginx -> 日志服务器(.log) -> Flume -> Kafka(ODS) -> FlinkApp -> Kafka(DWD)
//程  序:     Mock(lg.sh) -> Flume(f1) -> Kafka(ZK) -> BaseLogApp -> Kafka(ZK)
public class BaseLogApp 

    public static void main(String[] args) throws Exception 

        //TODO 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1); //生产环境中设置为Kafka主题的分区数

        //1.1 开启CheckPoint
        //env.enableCheckpointing(5 * 60000L, CheckpointingMode.EXACTLY_ONCE);
        //env.getCheckpointConfig().setCheckpointTimeout(10 * 60000L);
        //env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
        //env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));

        //1.2 设置状态后端
        //env.setStateBackend(new HashMapStateBackend());
        //env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/211126/ck");
        //System.setProperty("HADOOP_USER_NAME", "atguigu");

        //TODO 2.消费Kafka topic_log 主题的数据创建流
        String topic = "topic_log";
        String groupId = "base_log_app_211126";
        DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(topic, groupId));

        //TODO 3.过滤掉非JSON格式的数据&将每行数据转换为JSON对象
        OutputTag<String> dirtyTag = new OutputTag<String>("Dirty") 
        ;
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.process(new ProcessFunction<String, JSONObject>() 
            @Override
            public void processElement(String value, Context ctx, Collector<JSONObject> out) throws Exception 

                try 
                    JSONObject jsonObject = JSON.parseObject(value);
                    out.collect(jsonObject);
                 catch (Exception e) 
                    ctx.output(dirtyTag, value);
                
            
        );
        //获取侧输出流脏数据并打印
        DataStream<String> dirtyDS = jsonObjDS.getSideOutput(dirtyTag);
        dirtyDS.print("Dirty>>>>>>>>>>>>");

        //TODO 4.按照Mid分组
        KeyedStream<JSONObject, String> keyedStream = jsonObjDS.keyBy(json -> json.getJSONObject("common").getString("mid"));

        //TODO 5.使用状态编程做新老访客标记校验
        SingleOutputStreamOperator<JSONObject> jsonObjWithNewFlagDS = keyedStream.map(new RichMapFunction<JSONObject, JSONObject>() 
            private ValueState<String> lastVisitState;

            @Override
            public void open(Configuration parameters) throws Exception 
                lastVisitState = getRuntimeContext().getState(new ValueStateDescriptor<String>("last-visit", String.class));
            

            @Override
            public JSONObject map(JSONObject value) throws Exception 

                //获取is_new标记 & ts 并将时间戳转换为年月日
                String isNew = value.getJSONObject("common").getString("is_new");
                Long ts = value.getLong("ts");
                String curDate = DateFormatUtil.toDate(ts);

                //获取状态中的日期
                String lastDate = lastVisitState.value();

                //判断is_new标记是否为"1"
                if ("1".equals(isNew)) 
                    if (lastDate == null) 
                        lastVisitState.update(curDate);
                     else if (!lastDate.equals(curDate)) 
                        value.getJSONObject("common").put("is_new", "0");
                    
                 else if (lastDate == null) 
                    lastVisitState.update(DateFormatUtil.toDate(ts - 24 * 60 * 60 * 1000L));
                
                return value;
            
        );

        //TODO 6.使用侧输出流进行分流处理  页面日志放到主流  启动、曝光、动作、错误放到侧输出流
        OutputTag<String> startTag = new OutputTag<String>("start") 
        ;
        OutputTag<String> displayTag = new OutputTag<String>("display") 
        ;
        OutputTag<String> actionTag = new OutputTag<String>("action") 
        ;
        OutputTag<String> errorTag = new OutputTag<String>("error") 
        ;
        SingleOutputStreamOperator<String> pageDS = jsonObjWithNewFlagDS.process(new ProcessFunction<JSONObject, String>() 
            @Override
            public void processElement(JSONObject value, Context ctx, Collector<String> out) throws Exception 

                //尝试获取错误信息
                String err = value.getString("err");
                if (err != null) 
                    //将数据写到error侧输出流
                    ctx.output(errorTag, value.toJSONString());
                

                //移除错误信息
                value.remove("err");

                //尝试获取启动信息
                String start = value.getString("start");
                if (start != null) 
                    //将数据写到start侧输出流
                    ctx.output(startTag, value.toJSONString());
                 else 

                    //获取公共信息&页面id&时间戳
                    String common = value.getString("common");
                    String pageId = value.getJSONObject("page").getString("page_id");
                    Long ts = value.getLong("ts");

                    //尝试获取曝光数据
                    JSONArray displays = value.getJSONArray("displays");
                    if (displays != null && displays.size() > 0) 
                        //遍历曝光数据&写到display侧输出流
                        for (int i = 0; i < displays.size(); i++) 
                            JSONObject display = displays.getJSONObject(i);
                            display.put("common", common);
                            display.put("page_id", pageId);
                            display.put("ts", ts);
                            ctx.output(displayTag, display.toJSONString());
                        
                    

                    //尝试获取动作数据
                    JSONArray actions = value.getJSONArray("actions");
                    if (actions != null && actions.size() > 0) 
                        //遍历曝光数据&写到display侧输出流
                        for (int i = 0; i < actions.size(); i++) 
                            JSONObject action = actions.getJSONObject(i);
                            action.put("common", common);
                            action.put("page_id", pageId);
                            ctx.output(actionTag, action.toJSONString());
                        
                    

                    //移除曝光和动作数据&写到页面日志主流
                    value.remove("displays");
                    value.remove("actions");
                    out.collect(value.toJSONString());
                
            
        );

        //TODO 7.提取各个侧输出流数据
        DataStream<String> startDS = pageDS.getSideOutput(startTag);
        DataStream<String> displayDS = pageDS.getSideOutput(displayTag);
        DataStream<String> actionDS = pageDS.getSideOutput(actionTag);
        DataStream<String> errorDS = pageDS.getSideOutput(errorTag);

        //TODO 8.将数据打印并写入对应的主题
        pageDS.print("Page>>>>>>>>>>");
        startDS.print("Start>>>>>>>>");
        displayDS.print("Display>>>>");
        actionDS.print("Action>>>>>>");
        errorDS.print("Error>>>>>>>>");

        String page_topic = "dwd_traffic_page_log";
        String start_topic = "dwd_traffic_start_log";
        String display_topic = "dwd_traffic_display_log";
        String action_topic = "dwd_traffic_action_log";
        String error_topic = "dwd_traffic_error_log";

        pageDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(page_topic));
        startDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(start_topic));
        displayDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(display_topic));
        actionDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(action_topic));
        errorDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(error_topic));

        //TODO 9.启动任务
        env.execute("BaseLogApp");

    


1.4数据测试

1.4.1 测试脏数据

[root@hadoop102 kafka]# bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic topic_log
>"common":"ar":
>

idea 结果脏数据打印,kafka未输出。

1.4.2 测试err 和 start 数据

输入数据

[root@hadoop102 kafka]# bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic topic_log
>"common":"ar":
>"common":"ar":"110000","ba":"Xiaomi","ch":"xiaomi","is_new":"1","md":"Xiaomi Mix2 ","mid":"mid_1818969","os":"Android 11.0","uid":"513","vc":"v2.1.134","err":"error_code":2633,"msg":" Exception in thread \\\\  java.net.SocketTimeoutException\\\\n \\\\tat com.atgugu.gmall2020.mock.bean.log.AppError.main(AppError.java:xxxxxx)","start":"entry":"notice","loading_time":12438,"open_ad_id":7,"open_ad_ms":4407,"open_ad_skip_ms":0,"ts":1651217959000
>

输出数据

[root@hadoop102 kafka]# bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic dwd_traffic_start_log
"common":"ar":"110000","uid":"513","os":"Android 11.0","ch":"xiaomi","is_new":"1","md":"Xiaomi Mix2 ","mid":"mid_1818969","vc":"v2.1.134","ba":"Xiaomi","start":"entry":"notice","open_ad_skip_ms":0,"open_ad_ms":4407,"loading_time":12438,"open_ad_id":7,"ts":1651217959000
[root@hadoop102 kafka]# bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic dwd_traffic_error_log
"common":"ar":"110000","uid":"513","os":"Android 11.0","ch":"xiaomi","is_new":"1","md":"Xiaomi Mix2 ","mid":"mid_1818969","vc":"v2.1.134","ba":"Xiaomi","err":"msg":" Exception in thread \\\\  java.net.SocketTimeoutException\\\\n \\\\tat com.atgugu.gmall2020.mock.bean.log.AppError.main(AppError.java:xxxxxx)","error_code":2633,"start":"entry":"notice","open_ad_skip_ms":0,"open_ad_ms":4407,"loading_time":12438,"open_ad_id":7,"ts":1651217959000

idea打印数据

1.4.3 输入数据Display Action Page 数据

输入数据

"common":"ar":"500000","uid":"981","os":"iOS 13.3.1","ch":"Appstore","is_new":"1","md":"iPhone Xs Max","mid":"mid_7030190","vc":"v2.0.1","ba":"iPhone","err":"msg":" Exception in thread \\\\  java.net.SocketTimeoutException\\\\n \\\\tat com.atgugu.gmall2020.mock.bean.log.AppError.main(AppError.java:xxxxxx)","error_code":1559,"page":"page_id":"good_detail","item":"5","during_time":7045,"item_type":"sku_id","last_page_id":"good_list","source_type":"activity","displays":["display_type":"query","item":"15","item_type":"sku_id","pos_id":1,"order":1,"display_type":"query","item":"26","item_type":"sku_id","pos_id":3,"order":2,"display_type":"query","item":"31","item_type":"sku_id","pos_id":2,"order":3,"display_type":"promotion","item":"29","item_type":"sku_id","pos_id":5,"order":4,"display_type":"query","item":"9","item_type":"sku_id","pos_id":2,"order":5,"display_type":"recommend","item":"1","item_type":"sku_id","pos_id":1,"order":6],"actions":["item":"5","action_id":"favor_add","item_type":"sku_id","ts":1651217964522],"ts":1651217961000

输出数据

dwd_traffic_page_log
[root@hadoop102 kafka]# bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic dwd_traffic_page_log
"common":"ar":"500000","uid":"981","os":"iOS 13.3.1","ch":"Appstore","is_new":"1","md":"iPhone Xs Max","mid":"mid_7030190","vc":"v2.0.1","ba":"iPhone","page":"page_id":"good_detail","item":"5","during_time":7045,"item_type":"sku_id","last_page_id":"good_list","source_type":"activity","ts":1651217961000
dwd_traffic_page_log
[root@hadoop102 kafka]# bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic dwd_traffic_page_log
"common":"ar":"500000","uid":"981","os":"iOS 13.3.1","ch":"Appstore","is_new":"1","md":"iPhone Xs Max","mid":"mid_7030190","vc":"v2.0.1","ba":"iPhone","page":"page_id":"good_detail","item":"5","during_time":7045,"item_type":"sku_id","last_page_id":"good_list","source_type":"activity","ts":1651217961000
dwd_traffic_display_log
[root@hadoop102 kafka]# bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic  dwd_traffic_display_log
"display_type":"query","page_id":"good_detail","item":"15","common":"\\"ar\\":\\"500000\\",\\"uid\\":\\"981\\",\\"os\\":\\"iOS 13.3.1\\",\\"ch\\":\\"Appstore\\",\\"is_new\\":\\"1\\",\\"md\\":\\"iPhone Xs Max\\",\\"mid\\":\\"mid_7030190\\",\\"vc\\":\\"v2.0.1\\",\\"ba\\":\\"iPhone\\"","item_type":"sku_id","pos_id":1,"order":1,"ts":1651217961000
"display_type":"query","page_id":"good_detail","item":"26","common":"\\"ar\\":\\"500000\\",\\"uid\\":\\"981\\",\\"os\\":\\"iOS 13.3.1\\",\\"ch\\":\\"Appstore\\",\\"is_new\\":\\"1\\",\\"md\\":\\"iPhone Xs Max\\",\\"mid\\":\\"mid_7030190\\",\\"vc\\":\\"v2.0.1\\",\\"ba\\":\\"iPhone\\"","item_type":"sku_id","pos_id":3,"order":2,"ts":1651217961000
"display_type":"query","page_id":"good_detail","item":"31","common":"\\"ar\\":\\"500000\\",\\"uid\\":\\"981\\",\\"os\\":\\"iOS 13.3.1\\",\\"ch\\":\\"Appstore\\",\\"is_new\\":\\"1\\",\\"md\\":\\"iPhone Xs Max\\",\\"mid\\":\\"mid_7030190\\",\\"vc\\":\\"v2.0.1\\",\\"ba\\":\\"iPhone\\"","item_type":"sku_id","pos_id":2,"order":3,"ts":1651217961000
"display_type":"promotion","page_id":"good_detail","item":"29","common":"\\"ar\\":\\"500000\\",\\"uid\\":\\"981\\",\\"os\\":\\"iOS 13.3.1\\",\\"ch\\":\\"Appstore\\",\\"is_new\\":\\"1\\",\\"md\\":\\"iPhone Xs Max\\",\\"mid\\":\\"mid_7030190\\",\\"vc\\":\\"v2.0.1\\",\\"ba\\":\\"iPhone\\"","item_type":"sku_id","pos_id":5,"order":4,"ts":1651217961000
"display_type":"query","page_id":"good_detail","item":"9","common":"\\"ar\\":\\"500000\\",\\"uid\\":\\"981\\",\\"os\\":\\"iOS 13.3.1\\",\\"ch\\":\\"Appstore\\",\\"is_new\\":\\"1\\",\\"md\\":\\"iPhone Xs Max\\",\\"mid\\":\\"mid_7030190\\",\\"vc\\":\\"v2.0.1\\",\\"ba\\":\\"iPhone\\"","item_type":"sku_id","pos_id":2,"order":5,"ts":1651217961000
"display_type":"recommend","page_id":"good_detail","item":"1","common":"\\"ar\\":\\"500000\\",\\"uid\\":\\"981\\",\\"os\\":\\"iOS 13.3.1\\",\\"ch\\":\\"Appstore\\",\\"is_new\\":\\"1\\",\\"md\\":\\"iPhone Xs Max\\",\\"mid\\":\\"mid_7030190\\",\\"vc\\":\\"v2.0.1\\",\\"ba\\":\\"iPhone\\"","item_type":"sku_id","pos_id":1,"order":6,"ts":1651217961000
dwd_traffic_action_log
[root@hadoop102 kafka]# bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic dwd_traffic_action_log
"page_id":"good_detail","item":"5","common":"\\"ar\\":\\"500000\\",\\"uid\\":\\"981\\",\\"os\\":\\"iOS 13.3.1\\",\\"ch\\":\\"Appstore\\",\\"is_new\\":\\"1\\",\\"md\\":\\"iPhone Xs Max\\",\\"mid\\":\\"mid_7030190\\",\\"vc\\":\\"v2.0.1\\",\\"ba\\":\\"iPhone\\"","action_id":"favor_add","item_type":"sku_id","ts":1651217964522
dwd_traffic_error_log
"common":"ar":"500000","uid":"981","os":"iOS 13.3.1","ch":"Appstore","is_new":"1","md":"iPhone Xs Max","mid":"mid_7030190","vc":"v2.0.1","ba":"iPhone","err":"msg":" Exception in thread \\\\  java.net.SocketTimeoutException\\\\n \\\\tat com.atgugu.gmall2020.mock.bean.log.AppError.main(AppError.java:xxxxxx)","error_code":1559,"page":"page_id":"good_detail","item":"5","during_time":7045,"item_type":"sku_id","last_page_id":"good_list","source_type":"activity","displays":["display_type":"query","item":"15","item_type":"sku_id","pos_id":1,"order":1,"display_type":"query","item":"26","item_type":"sku_id","pos_id":3,"order":2,"display_type":"query","item":"31","item_type":"sku_id","pos_id":2,"order":3,"display_type":"promotion","item":"29","item_type":"sku_id","pos_id":5,"order":4,"display_type":"query","item":"9","item_type":"sku_id","pos_id":2,"order":5,"display_type":"recommend","item":"1","item_type":"sku_id","pos_id":1,"order":6],"actions":["item":"5","action_id":"favor_add","item_type":"sku_id","ts":1651217964522],"ts":1651217961000

idea打印数据

实时数仓系列-网易云音乐基于flink+kafka的实时数仓建设实践

简介:本文由网易云音乐实时计算平台研发工程师岳猛分享,主要从以下四个部分将为大家介绍Flink+Kafka在网易云音乐的应用实战:背景Flink+Kafka平台化设计Kafka在实时数仓中的应用问题&改进直播回放:https://developer.aliyun.com/li... 查看详情

apacheflinkxapachedoris构建极速易用的实时数仓架构

...给大家带来的分享是《ApacheFlinkXApacheDoris构建极速易用的实时数仓架构》。下面是我们的个人介绍:我是ApacheDorisContributor和阿里云MVP。同时著有《图解Spark大数据快速分析实战》等书籍。接下来咱们进入本次演讲的正题。本... 查看详情

实时数仓与离线数仓架构对比flink消费流程

实时数仓架构图:离线数仓:与离线数仓区别:MySQL业务数据采集改用FlinkCDC;FlinkCDC与Maxwell处理方式和Cannal一样通过监控binlog方式(行级别),而Sqoop是通过MR方式处理数据,这种方式太慢日志数据,... 查看详情

快手基于flink构建实时数仓场景化实践

...在5月22日北京站FlinkMeetup分享的议题《快手基于Flink构建实时数仓场景化实践》,内容包括:快手实时计算场景快手实时数仓架构及保障措施快手场景问题及解决方案未来规划1.快手实时计算场景快手业务中的实时计算场... 查看详情

快手基于flink构建实时数仓场景化实践

简介: 一文了解快手基于Flink构建的实时数仓架构,以及一些难题的解决方案。本文整理自快手数据技术专家李天朔在5月22日北京站FlinkMeetup分享的议题《快手基于Flink构建实时数仓场景化实践》,内容包括:快... 查看详情

flink实战系列flinkcdc实时同步mysql全量加增量数据到hudi(代码片段)

【Flink实战系列】FlinkCDC实时同步Mysql全量加增量数据到Hudi前言FlinkCDC是基于Flink开发的变化数据获取组件(Changedatacapture),简单的说就是来捕获变更的数据,ApacheHudi是一个数据湖平台,又支持对数据做增删改查操作,所以FlinkCD... 查看详情

数仓系列第11篇:实时数仓

...仓库的发展3.数据仓库建设方法论4.数据仓库架构的演变5.实时数仓案例6.实时数仓与离线数仓的对比导读:本文将从数据仓库的简介、经历了怎样的发展、如何建设、架构演变、应用案例以及实时数仓与离线数仓的对比六个... 查看详情

数据仓库数据同步策略(代码片段)

 1.实时数仓同步数据实时数仓数据由Flink源源不断从Kafka当中读数据计算,无需手动同步数据到实时数仓。2.离线数仓同步数据2.1用户行为数据同步2.1.1数据通道用户行为数据由Flume从Kafka直接同步到HDFS,由于离线数仓采用... 查看详情

数据仓库之电商数仓--1用户行为数据采集(代码片段)

目录一、数据仓库概念二、项目需求及架构设计2.1项目需求分析2.2项目框架2.2.1技术选型2.2.2系统数据流程设计2.2.3框架版本选型2.2.4服务器选型2.2.5集群规模2.2.6集群资源规划设计三、数据生成模块3.1目标数据3.1.1页面日志3.1.2事... 查看详情

阿里云flink+hologres:构建企业级一站式实时数仓

...以最大化发挥数据价值。企业最常见的做法就是通过构建实时数仓来满足对数据的快速探索。在业务建设过程中,实时数仓需要支持数据实时写入与更新、业务敏捷快速响应、数据自助分析、运维操作便捷、云原生弹性扩缩容等... 查看详情

基于flink+iceberg的全场景实时数仓建设实践

...0c;主要介绍腾讯大数据部门基于ApacheFlink和ApacheIceberg构建实时数仓的应用实践,介绍主要包括如下几个方面:背景及痛点数据湖ApacheIceberg介绍Flink+Iceberg构建实时数仓未来规划一、背景及痛点如下图所示,这是当前... 查看详情

数仓系列第11篇:实时数仓

...仓库的发展3.数据仓库建设方法论4.数据仓库架构的演变5.实时数仓案例6.实时数仓与离线数仓的对比导读:本文将从数据仓库的简介、经历了怎样的发展、如何建设、架构演变、应用案例以及实时数仓与离线数仓的对比六个... 查看详情

快手基于flink构建实时数仓场景化实践

一、快手实时计算场景快手业务中的实时计算场景主要分为四块:公司级别的核心数据:包括公司经营大盘,实时核心日报,以及移动版数据。相当于团队会有公司的大盘指标,以及各个业务线,比如视频... 查看详情

aliexpress基于flink的广告实时数仓建设

...者。 放心关注我,获取更多行业的一手消息。摘要:实时数仓以提供低延时数据指标为目的供业务实时决策,本文主要介绍基于Flink的广告实时数仓建设,主要包括以下内容:1.建设背景2.技术架构3.数仓架构4. 实时OLAP5.... 查看详情

基于flink构建实时数仓实践

...会员、游戏等非常多的业务板块。与此同时产品及运营对实时数据需求逐渐增多,帮助他们更快的做出决策,更好的进行产品迭代,实时数仓的建设变得越发重要起来。本文主要介绍用户增长业务基于Flink构建实时数... 查看详情

干货|携程酒店实时数仓架构和案例

...数据治理;魁伟,携程资深数据工程师,关注实时&离线大数据产品及技术。一、实时数仓当前,企业对于数据实时性的需求越来越迫切,因此需要实时数仓来满足这些需求。传统的离线数仓的数据时效性通... 查看详情

实时数据仓库的演进

目录前言:1.实时数仓1.02.实时数仓2.03实时数仓3.0前言:数据处理现状:当前基于Hive的离线数据仓库已经非常成熟,数据中台体系也基本上是围绕离线数仓进行建设。但是随着实时计算引擎的不断发展以及业务对... 查看详情

flink实战系列flinkcdc实时同步mysql全量加增量数据到hudi(代码片段)

【Flink实战系列】FlinkCDC实时同步Mysql全量加增量数据到Hudi前言FlinkCDC是基于Flink开发的变化数据获取组件(Changedatacapture),简单的说就是来捕获变更的数据,ApacheHudi是一个数据湖平台,又支持对数据做增删改查操作,所以FlinkCD... 查看详情