华为云mrs基于hudi和hetuengine构建实时数据湖最佳实践(代码片段)

华为云官方博客 华为云官方博客     2022-10-21     270

关键词:

数据湖与实时数据湖是什么?

各个行业企业都在构建企业级数据湖,将企业内多种格式数据源汇聚的大数据平台,通过严格的数据权限和资源管控,将数据和算力开放给各种使用者。一份数据支持多种分析,是数据湖最大的特点。如果数据湖的数据,从数据源产生后,可以在1分钟以内实时进入到数据湖存储,支持各种交互式分析,这种数据湖通常叫做实时数据湖,如果可以做到15分钟之内,也可称为准实时数据湖。构建实时数据湖,正在成为5G和IOT时代,支撑各个企业实时分析业务的数据湖新目标。

华为MRS实时数据湖方案介绍

  1. 生产库数据通过CDC工具(debezium)实时录入到MRS集群中Kafka的指定topic里;
  2. 在MRS集群启动一个SparkStreaming任务,实时读取Kafka指定topic里的数据;
  3. 同时该SparkStreaming任务将读取到的数据进行解析处理并写入到一张hudi表中;
  4. 写入hudi表的同时可以指定该数据也写入hive表;
  5. 通过MRS提供的交互式查询引擎HetuEngine对数据进行快速的交互式查询。

使用华为MRS实时数据湖方案的优势:

  1. ACID事务能力得以保证,湖内一份数据满足所有的分析业务需求,减少数据搬迁,减少数据冗余;
  2. 数据一致性保证,保证增量数据与入湖后数据一致性检测;
  3. 数据加工流转,在一个存储层内闭环,数据流动更高效;
  4. 基于HetuEngine引擎实现交互式查询,性能不降低。

下面会针对方案的三个关键组件:CDC工具,数据存储引擎Hudi,交互式查询引擎HetuEngine进行详细的介绍

样例数据简介

生产库MySQL原始数据(前10条,共1000条):

CDC工具

简介

CDC(changed data capture)为动态数据抓取,常见的方式分为同步和异步。同步CDC主要是采用触发器记录新增数据,基本能够做到实时增量抽取。而异步CDC则是通过分析已经commit的日志记录来得到增量数据信息。常见的CDC工具有Canal, DataBus, Maxwell, Debezium, OGG等。本方案采用debezium作为CDC工具

对接步骤

具体参考:https://fusioninsight.github.io/ecosystem/zh-hans/Data_Integration/DEBEZIUM/

完成对接后,针对MySQL生产库分别做增、改、删除操作对应的kafka消息

增加操作: insert into hudi.hudisource values (1001,“蒋语堂”,38,“女”,“图”,"《星球大战》",28732);

对应kafka消息体:

更改操作: UPDATE hudi.hudisource SET uname=‘Anne Marie’ WHERE uid=1001;

对应kafka消息体:

删除操作: delete from hudi.hudisource where uid=1001;

对应kafka消息体:

Hudi

简介

Apache Hudi是一个Data Lakes的开源方案,Hudi是Hadoop Updates and Incrementals的简写。具有以下的特性

  • ACID事务能力,支持实时入湖和批量入湖。
  • 多种视图能力(读优化视图/增量视图/实时视图),支持快速数据分析。
  • MVCC设计,支持数据版本回溯。
  • 自动管理文件大小和布局,以优化查询性能准实时摄取,为查询提供最新数据。
  • 支持并发读写,基于snapshot的隔离机制实现写入时可读取。
  • 支持原地转表,将存量的历史表转换为Hudi数据集。

样例代码解析

使用Hudi实时入湖的样例代码分三个部分

  • Kafka数据消费
  • 数据内容解析、处理
  • 解析后数据的写入

Kafka数据消费部分样例代码:

String savePath = "hdfs://hacluster/huditest2/";
String groupId = "group1";
System.out.println("groupID is: " + groupId);
String brokerList = "172.16.5.51:21005";
System.out.println("brokerList is: " + brokerList);
String topic = "hudisource";
System.out.println("topic is: " + topic);
String interval = "5";

HashMap<String, Object> kafkaParam = new HashMap<>();
kafkaParam.put("bootstrap.servers", brokerList);
kafkaParam.put("group.id", groupId);
kafkaParam.put("auto.offset.reset", "earliest");
kafkaParam.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParam.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

HashSet<String> topics = new HashSet<>();
topics.add(topic);

String[] topicArray = topic;
Set<String> topicSet = new HashSet<String>(Arrays.asList(topicArray));
ConsumerStrategy consumerStrategy = ConsumerStrategies.Subscribe(topicSet, kafkaParam);

//本地调试
SparkConf conf = new SparkConf()
        .setMaster("local[*]")
        .setAppName("hudi-java-demo");

conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.set("spark.streaming.kafka.maxRatePerPartition", "10");
conf.set("spark.streaming.backpressure.enabled", "true");

JavaInputDStream<ConsumerRecord<String, String>> directStream = KafkaUtils.createDirectStream(jssc,
        LocationStrategies.PreferConsistent(),
        consumerStrategy);

数据内容解析、处理部分样例代码:

JavaDStream<List> lines =
        directStream.filter(
                //过滤空行和脏数据
                new Function<ConsumerRecord<String, String>, Boolean>() 
                    public Boolean call(ConsumerRecord<String, String> v1) throws Exception                      
                        if (v1.value() == null) 
                            return false;
                        
                        try
                            String op = debeziumJsonParser.getOP(v1.value());
                        catch (Exception e)
                            return false;
                        
                        return true;
                    
                
        ).map(
                new Function<ConsumerRecord<String, String>, List>() 
                    public List call(ConsumerRecord<String, String> v1) throws Exception 
                        //将debezium接进来的数据解析写进List                       
                        String op = debeziumJsonParser.getOP(v1.value());
                        JSONObject json_obj = JSON.parseObject(v1.value());                     
                        Boolean is_delete = false;
                        String out_str = "";
                        if(op.equals("c"))                            
                            out_str =  json_obj.getJSONObject("payload").get("after").toString();
                        
                        else if(op.equals("u"))                            
                            out_str =   json_obj.getJSONObject("payload").get("after").toString();
                        
                        else 
                            is_delete = true;
                            out_str =   json_obj.getJSONObject("payload").get("before").toString();
                        
                        LinkedHashMap<String, String> jsonMap = JSON.parseObject(out_str, new TypeReference<LinkedHashMap<String, String>>() 
                        );
                        int cnt =0;
                        List out_list = new ArrayList();
                        for (Map.Entry<String, String> entry : jsonMap.entrySet()) 
                            out_list.add(entry.getValue());
                            cnt++;
                        
                        out_list.add(is_delete);
                        String commitTime = Long.toString(System.currentTimeMillis());

                        out_list.add(commitTime);
                        System.out.println(out_list);

                        out_list.add(op);

                        return out_list;
                    
                );

debezium更新字段解析样例代码:

public class debeziumJsonParser 
    public static String getOP(String message)
        JSONObject json_obj = JSON.parseObject(message);
        String op = json_obj.getJSONObject("payload").get("op").toString();
        return  op;
    

解析后数据的写入hudi表,hive表样例代码:

lines.foreachRDD(new VoidFunction<JavaRDD<List>>() 
    @Override
    public void call(JavaRDD<List> stringJavaRDD) throws Exception 
        if (!stringJavaRDD.isEmpty()) 
            System.out.println("stringJavaRDD collect---"+stringJavaRDD.collect());
            List<Row> rowList =new ArrayList<>();
            //把数据上一步数据写进stringJavaRdd
            for(List row: stringJavaRDD.collect())

                String uid = row.get(0).toString();
                String name = row.get(1).toString();
                String age = row.get(2).toString();
                String sex = row.get(3).toString();
                String mostlike = row.get(4).toString();
                String lastview = row.get(5).toString();
                String totalcost = row.get(6).toString();
                Boolean _hoodie_is_deleted = Boolean.valueOf(row.get(7).toString());      
                String commitTime = row.get(8).toString();
                String op = row.get(9).toString();
                Row returnRow = RowFactory.create(uid, name, age, sex, mostlike, lastview, totalcost, _hoodie_is_deleted, commitTime, op);
                rowList.add(returnRow);

            
            JavaRDD<Row> stringJavaRdd = jsc.parallelize(rowList);
            //写入表的字段schema设计
            List<StructField> fields = new ArrayList<>();
            fields.add(DataTypes.createStructField("uid", DataTypes.StringType, true));
            fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
            fields.add(DataTypes.createStructField("age", DataTypes.StringType, true));
            fields.add(DataTypes.createStructField("sex", DataTypes.StringType, true));
            fields.add(DataTypes.createStructField("mostlike", DataTypes.StringType, true));
            fields.add(DataTypes.createStructField("lastview", DataTypes.StringType, true));
            fields.add(DataTypes.createStructField("totalcost", DataTypes.StringType, true));
            fields.add(DataTypes.createStructField("_hoodie_is_deleted", DataTypes.BooleanType, true));
            fields.add(DataTypes.createStructField("commitTime", DataTypes.StringType, true));
            fields.add(DataTypes.createStructField("op", DataTypes.StringType, true));
            StructType schema = DataTypes.createStructType(fields);
            Dataset<Row> dataFrame = sqlContext.createDataFrame(stringJavaRdd, schema);
            Dataset<Row> rowDataset = dataFrame.withColumn("ts", dataFrame.col("commitTime"))
                    .withColumn("uuid", dataFrame.col("uid"));        

            //将数据写入hudi表以及hive表
            rowDataset.write().format("org.apache.hudi")
                    .option("PRECOMBINE_FIELD_OPT_KEY", "ts")
                    .option("RECORDKEY_FIELD_OPT_KEY", "uuid")
                    .option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.NonpartitionedKeyGenerator")                  
                    .option("hoodie.table.name", "huditesttable")
                    .option("hoodie.upsert.shuffle.parallelism", "10")                  
                    .option("hoodie.delete.shuffle.parallelism", "10")
                    .option("hoodie.insert.shuffle.parallelism", "10")
                    .option("hoodie.bulkinsert.shuffle.parallelism", "10")
                    .option("hoodie.finalize.write.parallelism", "10")
                    .option("hoodie.cleaner.parallelism", "10")
                    .option("hoodie.datasource.write.operation", "upsert")                  
                    .option("hoodie.datasource.hive_sync.enable", "true")                    
                    .option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.NonPartitionedExtractor")
                    .option("hoodie.datasource.hive_sync.database", "default")
                    .option("hoodie.datasource.hive_sync.table", "hudidebezium")
                    .option("hoodie.datasource.hive_sync.use_jdbc", "false")
                    .mode(SaveMode.Append)
                    .save(savePath);
        
    
);

jssc.start();
jssc.awaitTermination();
jssc.close();

Hudi任务提交命令

source /opt/client/bigdata_env
source /opt/client/Hudi/component_env
spark-submit --master yarn --deploy-mode client --jars /opt/hudi-demo4/fastjson-1.2.4.jar --class hudiIn /opt/hudi-demo4/HudiJavaDemo-1.0-SNAPSHOT.jar

HetuEngine

简介

HetuEngine是华为FusionInsight MRS提供的高性能分布式SQL查询、数据虚拟化引擎。能与大数据生态无缝融合,实现海量数据秒级查询;支持多源异构协同,提供数据湖内一站式SQL融合分析。

同时HetuEngine拥有开放的接口,能够支持各报表、分析软件对接,具体可参考生态地图:https://fusioninsight.github.io/ecosystem/zh-hans/

下面我们以帆软FineBI为例进行查询、分析。

配置FineBI对接HetuEngine

JDBC URL: jdbc:presto://172.16.5.51:29860,172.16.5.52:29860/hive/default?serviceDiscoveryMode=hsbroker&tenant=default

查看初始同步数据:

通过HetuEngine检查增、改、删除操作

Mysql增加操作对应hive表结果:

Mysql更改操作对应hive表结果:

Mysql删除操作对应hive表结果:

报表:

电影喜爱度分析:

电影标签喜爱度分析:

云小课|mrs基础原理之hudi介绍(代码片段)

阅识风云是华为云信息大咖,擅长将复杂信息多元化呈现,其出品的一张图(云图说)、深入浅出的博文(云小课)或短视频(云视厅)总有一款能让您快速上手华为云。更多精彩内容请单击此处。摘要:Hudi是数据湖的文件... 查看详情

技术干货|阿里云基于hudi构建lakehouse实践探索

简介:阿里云高级技术专家王烨(萌豆)在ApacheHudi与ApachePulsar联合Meetup杭州站上的演讲整理稿件,本议题介绍了阿里云如何使用Hudi和OSS对象存储构建Lakehouse,为大家分享了什么是Lakehouse,阿里云数据库OLAP团队如何... 查看详情

技术干货|阿里云基于hudi构建lakehouse实践探索「内附干货ppt下载渠道」

简介: 阿里云高级技术专家王烨(萌豆)在ApacheHudi与ApachePulsar联合Meetup杭州站上的演讲整理稿件,本议题介绍了阿里云如何使用Hudi和OSS对象存储构建Lakehouse,为大家分享了什么是Lakehouse,阿里云数据库OLAP团队如何... 查看详情

mrshetuegine的数据虚拟化实践

摘要:华为MRS云原生数据湖平台的HetuEngine就是一款解决大数据时代跨源跨域问题的数据虚拟化引擎。本文分享自华为云社区《基于华为云原生数据湖MRSHetuEgine的数据虚拟化实践》,作者:前锋。数据虚拟化是指一种... 查看详情

fusioninsightmrsflinkdatastreamapi读写hudi实践(代码片段)

...客户存在使用FlinkDataStreamAPI读写Hudi的诉求。本文分享自华为云社区《FusionInsightMRSFlinkDataStreamAPI读写Hudi实践》,作者:yangxiao_mrs。目前Hudi只支持FlinkSQL进行数据读写ÿ 查看详情

一文带你体验mrshetuengine如何实现跨源跨域分析(代码片段)

摘要: HetuEngine作为MRS服务中交互式分析&多源统一SQL引擎,亲自全程体验其如何实现多数据源的跨源跨域分析能力。本文分享自华为云社区《MRSHetuEngine体验跨源跨域分析【玩转华为云】》,作者:龙哥手记。H... 查看详情

mrs+apachezeppelin,让数据分析更便捷

...同步自建一套Hadoop生态成本太高!因此我们通过结合华为云MRS服务构建数据中台。本文分享自华为云社区《MRS大数据平台结合ApacheZeppelin让数据分析更便捷》,作者:dullman。ApacheZeppelin:一款大 查看详情

浅析华为云基于hbasemttr上的优化实践

摘要:主要介绍华为云在HBase2.x内核所做的一些MTTR优化实践。本文分享自华为云社区《华为云在HBaseMTTR上的优化实践》,作者:搬砖小能手。随着HBase在华为云的广泛应用,HBase的数据节点规模也越来越大。最新版... 查看详情

浅析华为云基于hbasemttr上的优化实践

摘要:主要介绍华为云在HBase2.x内核所做的一些MTTR优化实践。本文分享自华为云社区《华为云在HBaseMTTR上的优化实践》,作者:搬砖小能手。随着HBase在华为云的广泛应用,HBase的数据节点规模也越来越大。最新版... 查看详情

云小课|mrs数据分析-通过sparkstreaming作业消费kafka数据(代码片段)

阅识风云是华为云信息大咖,擅长将复杂信息多元化呈现,其出品的一张图(云图说)、深入浅出的博文(云小课)或短视频(云视厅)总有一款能让您快速上手华为云。更多精彩内容请单击此处。摘要:SparkStreaming是一种构... 查看详情

华为云mrs支持lakeformation能力,打造一站式湖仓,释放数据价值

...ff1a;对云端用户而言,业务价值发现是最重要的,华为MRS支持LakeFormation后,成功降低了数据应用的成本,帮助客户落地“存”与“算”的管理,加快推进了数智融合进程,更大程度地释放业务数据价值。本... 查看详情

基于数据湖格式构建流式增量数仓—cdc

...diCDC实现4.湖格式Streaming的优化2021年中Databricks发布了一篇基于DeltaLake实现CDC场景的介绍文档,2022年初我们在阿里云EMR内部DeltaLake版本实现的CDC的能力,同期在ApacheHudi提案了Hudi基于Spark实现CDC的设计文档和实现代码。结合... 查看详情

云小课|mrs基础原理之flink组件介绍(代码片段)

阅识风云是华为云信息大咖,擅长将复杂信息多元化呈现,其出品的一张图(云图说)、深入浅出的博文(云小课)或短视频(云视厅)总有一款能让您快速上手华为云。更多精彩内容请单击此处。摘要:Flink是一个批处理和... 查看详情

华为云iot体验:基于iot平台构建智慧路灯应用(代码片段)

基于IoT平台构建智慧路灯应用基于华为云IoT平台,快速开发属于自己的智慧路灯应用。本文基于华为云iot实验制作而成1.资源下载(本文默认已经注册并登录华为云平台)在linux桌面下打开终端面板下载资源,下载保存“HubSi... 查看详情

云小课|mrs基础原理之clickhouse组件介绍

阅识风云是华为云信息大咖,擅长将复杂信息多元化呈现,其出品的一张图(云图说)、深入浅出的博文(云小课)或短视频(云视厅)总有一款能让您快速上手华为云。更多精彩内容请单击此处。摘要:在2016年开源的高性... 查看详情

37手游基于flinkcdc+hudi湖仓一体方案实践(代码片段)

...f1a; 介绍了37手游为何选择Flink作为计算引擎,并如何基于FlinkCDC+Hudi构建新的湖仓一体方案。本文作者是37手游大数据开发徐润柏,介绍了37手游为何选择Flink作为计算引擎,并如何基于FlinkCDC+Hudi构建新的湖仓一... 查看详情

云小课|mrs基础原理之oozie任务调度(代码片段)

阅识风云是华为云信息大咖,擅长将复杂信息多元化呈现,其出品的一张图(云图说)、深入浅出的博文(云小课)或短视频(云视厅)总有一款能让您快速上手华为云。更多精彩内容请单击此处。摘要:Oozie是一个基于工作... 查看详情

mrs+lakeformation:打造一站式湖仓,释放数据价值

摘要:华为LakeFormation是企业级的一站式湖仓构建服务。本文分享自华为云社区《华为云MRS支持LakeFormation能力,打造一站式湖仓,释放数据价值】》,作者:breakDawn。1背景1.1数仓和数据湖的概念数据分析技术在2010~2... 查看详情