揭秘字节跳动埋点数据实时动态处理引擎(附源码)(代码片段)

zhisheng_blog zhisheng_blog     2022-12-21     660

关键词:

1.序篇-先说结论

宝贝们,还记得前几天博主去的火山引擎大数据场嘛,其中比较令大家感兴趣的就是最后一讲,字节一站式埋点平台的 flink 标准化清洗及拆流任务。

其中大家感觉比较流啤的就是的就是字节做到了:

  1. 不重启任务可以上下线新的拆流及清洗规则,所有的规则变更都不需要涉及到任务的重启。

  2. 清洗 udf,rpc 接口热加载

总的来说就是任务永不停,不可能停止的,好么,beiber。

字节火山引擎 PPT。公众号回复 20210724 获取。

6

本文博主就主要介绍第一点,即做到规则动态变化,可以做到动态添加一个 sink kafka topic,动态删除一个 sink kafka topic,而不重启任务。相信能抛砖引玉,给大家一些启发。

本文从以下几个章节详细介绍框架实现:

  1. 背景篇-为啥需要这么个框架

  2. 定义、目标篇-做这个框架的目标、预期效果是什么

  3. 难点剖析篇-此框架建设的难点、业界目前的实现

  4. 数据建设篇-框架具体方案设计

  5. 数据保障篇-框架的保障方案

  6. 总结与展望篇

2.背景篇-为啥需要这么个框架

首先来看看字节他们做这件事情的背景:

  1. 任务重启造成数据的延迟:对于字节这种企业来说且每天都会新上线很多的埋点,把这些新的埋点拆流条件加入 flink 任务就要重启,但是字节客户端日志流量都是千万级别 qps 的,就意味着这个 flink 任务一旦重启耗时肯定是很长的,这对时延敏感的业务是不可接受的。

  2. 减少对于原始客户端日志的烟囱式消费,节约资源

  3. 统一标准化的埋点平台:用户能通过埋点平台用到正确的数据

  4. 与埋点平台联动的、统一化的、标准化的流式数据处理平台:用户能通过这个平台去获取想要的统一标准化的数据

  5. 数据的分级保障能力:Dump 日志,日志的产出需要分优先级进行保障(死保、尽力保...),用户能放心的用数据

如图:

因此诞生了这个框架。

3.定义、目标篇-做这个框架的目标、预期效果是什么

上述的痛点很多,本节就从最痛的任务重启的延迟角度出发解决问题,揭秘字节动态配置化的 flink 任务的实现。

预期效果如下:

      1.即在任务不停止的情况下可以动态的上线一个动态规则、一个 sink kafka topic,上线某个、某类埋点对应的流数据的 kafka topic

如图左边是修改配置,添加了一个拆流规则以及对应 topic,右边这个规则 topic 就开始产出数据,对应的 console consumer 就消费到了复合规则的数据。(gif 加载可能比较慢)

8

      2.即在任务不停止的情况下可以动态的下线一个动态规则、一个 sink kafka topic,下线某个、某类埋点对应的流数据的 kafka topic

如图左边是修改配置,删除了一个拆流规则以及对应 topic,右边这个规则 topic 就不产出数据了,对应的 console consumer 就没有新数据可以消费了。(gif 加载可能比较慢)

9

      3.总体效果如下:

4.难点剖析篇-此框‍架建设的难点、业界目前的实现

首先带大家分析下,实现这个框架,最基本的模块都需要包含什么:

  1. flink 任务:本身就是一个 Map 任务,逻辑简单

  2. 动态上下线规则配置:肯定得有一个动态配置中心去告诉 flink 任务需要新上下线一个 kafka topic

  3. 动态规则过滤引擎:flink 任务监听到规则发生动态变化之后,要热更新规则,将新的规律规则应用起来。需要一个动态代码执行引擎

  4. 动态上下线 Kafka topic:目前大多数公司用的是 flink 自带的 kafka-connector,一旦涉及到需要添加一个下游,就需要添加一个 kafka producer operator,因为涉及到多加了一个 operator,那肯定得重启任务。需要动态添加删除 producer 的能力。

5.数据建设篇-框架具体方案设计

5.1.方案设计

5.1.1.方案

先说说方案选择的结论:

  1. flink 入口任务:Map 模型使用 ProcessFunction 底层算子

  2. 动态上下线规则配置:配置中心开源的有很多,这里为了实现轻量化,实现简单,使用 zookeeper 作为动态规则配置中心。当然如果对 zk 压力大,也可以使用广播配置实现。

  3. 动态规则引擎:规则引擎很多,比如常见的可以使用 JavaScript、Groovy、jython、mvel2、freemarker 等等,太多了。考虑到性能、易用性选用 janino 将动态规则动态编译出 class。然后作为动态规则引擎使用。后面会详述选用 janino 的原因。

  4. 动态上下线 Kafka topic:去除 flink-kafka-connector,直接在 ProcessFunction 中使用原生 kafka-clients 输出数据,维护一个 producer 池。

整体方案架构图如图所示:

5.1.2.预期效果

5.1.2.1.上线配置

4

5.1.2.2.下线配置

5

5.2.具体实现

整个任务的实现非常简单。

本地运行,可以参考下面两篇安装 zk 和 kafka。

  • zk:https://www.jianshu.com/p/5491d16e6abd

  • kafka:https://www.jianshu.com/p/dd2578d47ff6

5.2.1.flink 任务入口逻辑

首先来看看整个任务的入口逻辑,ProcessFunction 的功能很简单:

  1. 针对数据源的每一条日志数据,遍历动态规则引擎池

  2. 只要这条数据满足某一条规则的条件,就将这条日志数据写出到规则对应的 topic 中

 env.addSource(new UserDefinedSource())
    .process(new ProcessFunction<ClientLogSource, ClientLogSink>() 
        // 动态规则配置中心
        private ZkBasedConfigCenter zkBasedConfigCenter;
        // kafka producer 管理中心
        private KafkaProducerCenter kafkaProducerCenter;

        @Override
        public void open(Configuration parameters) throws Exception 
            super.open(parameters);
            this.zkBasedConfigCenter = ZkBasedConfigCenter.getInstance();
            this.kafkaProducerCenter = KafkaProducerCenter.getInstance();
        

        @Override
        public void processElement(ClientLogSource clientLogSource, Context context, Collector<ClientLogSink> collector)
                throws Exception 
            
            // 遍历所有的动态规则
            this.zkBasedConfigCenter.getMap().forEach(new BiConsumer<Long, DynamicProducerRule>() 
                @Override
                public void accept(Long id, DynamicProducerRule dynamicProducerRule) 
                    // 验证该条数据是否符合该条规则
                    if (dynamicProducerRule.eval(clientLogSource)) 
                        // 将符合规则的数据发向对应规则的 topic 中
                        kafkaProducerCenter.send(dynamicProducerRule.getTargetTopic(), clientLogSource.toString());
                    

                
            );
        

        @Override
        public void close() throws Exception 
            super.close();
            // 关闭规则池
            this.zkBasedConfigCenter.close();
            // 关闭 producer 池
            this.kafkaProducerCenter.close();
        
    );

env.execute();

5.2.2.动态上下线规则配置

来看 flink ProcessFunction 中的核心点,第一部分就是 ZkBasedConfigCenter。其功能包含:

  1. 任务启动时,初始化加载 zk 配置,初始化规则池,将规则池中的配置规则编译成 class 可执行规则

  2. 监听 zk 配置变更,将新增配置加入规则池,将下线配置从规则池删除

5.2.2.1.动态规则 schema 设计

动态规则包含的内容与用户需求息息相关:

举例:用户需要将在首页上报 +  id > 300 用户的客户端日志都写入 topic_id_bigger_than_300_and_main_page 的 kafka topic 中。

那么针对这个 flink 任务来说就有以下三项用户的输入:

  1. 动态规则的过滤条件:即上游每一条数据过来之后检验这条数据是否满足规则条件。上面这个例子的条件就是 clientLogSource.getId() > 300 && clientLogSource.getPage().equals("首页");其中 clientLogSource 是原始日志 model

  2. 动态规则要写入的 topic 名称:这条规则过滤出来的数据要写入哪个 topic。上面这个例子的 topic 就是 topic_id_bigger_than_300_and_main_page

  3. 动态规则的唯一 id:唯一标识一个过滤规则的 id

针对上述要求设计动态规则配置的 schema 如下:


    "id-数值类型 string": 
  "condition-过滤条件": "1==1",
  "targetTopic-目标 topic 名称": "tuzisir1"
 
 "1": 
  "condition": "clientLogSource.getId() > 300 && clientLogSource.getPage().equals(\\"首页\\")",
  "targetTopic": "topic_id_bigger_than_300_and_main_page"
 ,
 "2": 
  "condition": "clientLogSource.getPage().equals(\\"个人主页\\")",
  "targetTopic": "topic_profile_page"
 

对应动态规则 model 设计如下:

public class DynamicProducerRule implements Evaluable 

    // 具体过滤规则
    private String condition;

    // 具体写入 topic
    private String targetTopic;

    // 使用 janino 编译的规则过滤器
    private Evaluable evaluable;

    public void init(Long id) 
        try 
            // 使用 janino 初始化规则
            Class<Evaluable> clazz = JaninoUtils.genCodeAndGetClazz(id, targetTopic, condition);
            this.evaluable = clazz.newInstance();
         catch (Exception e) 
            throw new RuntimeException(e);
        
    

    @Override
    public boolean eval(ClientLogSource clientLogSource) 
        return this.evaluable.eval(clientLogSource);
    


重点在于 Evaluable 接口,动态生成代码就是继承了这个接口,用于执行过滤规则的基础接口。

代码动态生成下面会详细介绍。

public interface Evaluable 

    // 动态规则接口过滤方法
    boolean eval(ClientLogSource clientLogSource);


5.2.2.2.基于 zk 的动态配置中心

使用了 zk 作为动态配置中心,来动态监听规则配置以及更新规则池。

public class ZkBasedConfigCenter 
    
    // zk config 变化监听器
    private TreeCache treeCache;

    // zk 客户端
    private CuratorFramework zkClient;

    private ZkBasedConfigCenter() 
        try 
            open();
         catch (Exception e) 
            throw new RuntimeException(e);
        
    

    // !!!规则池!!!规则池!!!规则池
    private ConcurrentMap<Long, DynamicProducerRule> map = new ConcurrentHashMap<>();

    private void open() throws Exception 

        // 初始化规则
        // 初始化 zk config 监听器
        // 当有配置变更时
        // 调用 private void update(String json) 更新规则

    

    public void close() 
        this.treeCache.close();
        this.zkClient.close();
    

    private void update(String json) 

        Map<Long, DynamicProducerRule>
                result = getNewMap(json);

        // 1.将新增规则添加进规则池
        // 2.将下线规则从规则池删除
    

    private Map<Long, DynamicProducerRule> getNewMap(String json) 
        // 将新规则解析,并使用 janino 进行初始化
    

可以使用一个固定路径的配置,如图博主使用的是 /kafka-config 这个路径

7

5.2.3.动态规则引擎

目前字节使用的引擎是 Groovy,但是博主常用 flink sql,sql 中的代码生成是使用 janino 做的,因此就比较了 janino 和 groovy 的性能差异,janino 编译出的原生 class 性能接近原生 class,是 Groovy 的 4 倍左右。其他的引擎不考虑,要么易用性差,要么性能差。

Notes:性能这一点真的是很重要,1:4 的差距可以说是差别很大了。如果你的场景也是大流量,非常耗费性能的场景,建议直接入手 janino!!!

来看看具体的 benchmark case 代码:

// ClientLogSource 是原始日志
boolean eval(flink.examples.datastream._01.bytedance.split.model.ClientLogSource clientLogSource) 
    return String.valueOf(clientLogSource.getId()).equals("1");

上面这段代码,在博主 mac 本地执行,每次循环执行 5kw 次,总计执行 5 次 得出的结果如下:

java:847 ms
janino:745 ms
groovy:4110 ms

java:1097 ms
janino:1170 ms
groovy:4052 ms

java:916 ms
janino:1117 ms
groovy:4311 ms

java:915 ms
janino:1112 ms
groovy:4382 ms

java:921 ms
janino:1104 ms
groovy:4321 ms

重复执行了很多次:java object : janino 编译原生 class :groovy :几乎都是 1:1:4 的耗时。所以此处我们选择性能更好的 janino。

public class JaninoUtils 

    public static Class<Evaluable> genCodeAndGetClazz(Long id, String topic, String condition) throws Exception 
        // 动态生成代码
        // 初始化 Class<Evaluable> 并返回
    


5.2.4.动态上下线 Kafka topic

来看入口类中的第二个核心点,就是 KafkaProducerCenter。其功能包含:

  1. 维护所有的 producer 池

  2. 提供消息发送接口

public class KafkaProducerCenter 

    // kafka producer 池
    private final ConcurrentMap<String, Producer<String, String>> producerConcurrentMap
            = new ConcurrentHashMap<>();

    private Producer<String, String> getProducer(String topicName) 

        // 如果 kafka producer 池中有当前 topic 的 producer,则直接返回
        // 如果没有,则初始化一个新的 producer 然后返回

    

    public void send(String topicName, String message) 

        final ProducerRecord<String, String> record = new ProducerRecord<>(topicName,
                "", message);
        try 
            RecordMetadata metadata = getProducer(topicName).send(record).get();
         catch (Exception e) 
            throw new RuntimeException(e);
        
    

    public void close() 
        // 关闭所有 producer 连接
    

上面就是所有的代码、逻辑实现方案。其实整体看下来是非常简单的。

6. 数据保障篇-框架的保障方案

  1. 配置中心挂了怎么办?

为这个任务分配独立的队列资源,每当这个任务加载到最新配置时,都将配置在本地存储一份。当配置中心挂了的时候,还可以直接加载机器本地的配置,不至于什么都产出不了。

  1. 怎么保障用户的配置是无误的?

  • 上线前审批:有专门的埋点管理人员进行逻辑验证及管理

  • 上线前自动化测试:在埋点管理平台自动化验证逻辑正确性,保障上线到 flink 任务里的配置都是正确的

  • AOP 异常处理、报警:在环境中做 AOP 异常处理,将异常数据 dump 到专用异常 topic 中,也需要自动化把报警信息透出

  • 结果验证:针对最终的结果需要有数据准确性验证机制

7. 总结与展望篇

7.1.总结

本文主要揭秘、实现了字节跳动埋点数据实时动态处理引擎。

7.2.展望

  1. 本文主要实现了拆流的动态化,输出数据和输入数据完全相同,但是很多情况下,下游只需要其中的一些字段。因此之后还可以做到对于 sink message 字段、消息的个性化。比如可以加一个动态化的 Map 逻辑,将数据源中的 ClientLogSource 转化为任何用户想要的 Model。比如使用 Dynamic Message 或者使用代码生成去做。

  2. 目前过滤条件完全是 java 语法,之后可以扩展成为 sql 语法,提高可读性

  3. 函数、rpc 热加载

字节跳动埋点数据流建设与治理实践

本文将介绍字节跳动在埋点数据流业务场景遇到的需求和挑战以及具体实践。文|石伟 来自字节跳动数据平台开发套件团队出品| 字节跳动数据平台埋点数据流埋点数据流在字节跳动埋点数据流主要处理的数据是埋点,埋点... 查看详情

火山引擎dataleap:揭秘字节跳动数据血缘架构演进之路

更多技术交流、求职机会,欢迎关注字节跳动数据平台微信公众号,回复【1】进入官方交流群DataLeap是火山引擎数智平台VeDI旗下的大数据研发治理套件产品,帮助用户快速完成数据集成、开发、运维、治理、资产、安全等全套... 查看详情

火山引擎项亮揭秘字节跳动基于hpc的大规模机器学习技术|直播分享报名

机器学习是当前领先的AI范式,到目前为止取得了非常可观的成就,当前机器学习也是一个非常时髦的话题。2021年12月火山引擎云产品发布会上正式发布了 AI全系产品,其中的AI开发平台就是全流程、高效率、高性能... 查看详情

免费活动字节跳动背后的音视频技术揭秘

音视频技术在近几年呈现突飞猛进的发展,一方面满足了企业对于业务高速增长的需求,另一方面也为业务的发展创造了更多的可能性。活动介绍10月29日|北京LiveVideoStack将联合火山引擎的5位技术专家在本专题中,展... 查看详情

字节跳动背后的音视频技术揭秘

在过去的一年中,我们可以看到多媒体特别是音视频技术的能力在严峻的挑战下,为各行各业带来了巨大的变化。疫情过后,又会有哪些多媒体新技术、新实践呈现在大众的视野当中?为行业的发展与应用带来哪... 查看详情

字节跳动大规模实践埋点自动化测试框架设计

大数据时代,多数的web或app产品都会使用第三方或自己开发相应的数据系统,进行用户行为数据或其它信息数据的收集,在这个过程中,埋点是比较重要的一环。埋点收集的数据一般有以下作用:驱动决策ÿ... 查看详情

字节跳动开源序列推理引擎lightseq(代码片段)

背景介绍2017年Google提出了Transformer[1]模型,之后在它基础上诞生了许多优秀的预训练语言模型和机器翻译模型,如BERT[2]、GPT系列[13]等,不断刷新着众多自然语言处理任务的能力水平。与此同时,这些模型的参数... 查看详情

字节跳动开源序列推理引擎lightseq(代码片段)

背景介绍2017年Google提出了Transformer[1]模型,之后在它基础上诞生了许多优秀的预训练语言模型和机器翻译模型,如BERT[2]、GPT系列[13]等,不断刷新着众多自然语言处理任务的能力水平。与此同时,这些模型的参数... 查看详情

史上最全!2020面试阿里,字节跳动90%被问到的jvm面试题(附答案)

前言:最近老是收到小伙伴的私信问我能不能帮忙整理出一份JVM相关的面试题出来,说自己在大厂去面试的时候这一块问的是特别多的,每次自己学的时候每次都学不到重点去。这不他来了,一份详细的JVM面试真题给大家整理在... 查看详情

速度超快!字节跳动开源序列推理引擎lightseq(代码片段)

这应该是业界第一款完整支持Transformer、GPT等多种模型高速推理的开源引擎。2017年Google提出了Transformer[1]模型,之后在它基础上诞生了许多优秀的预训练语言模型和机器翻译模型,如BERT[2]、GPT系列[13]等,不断刷新着众多自然语... 查看详情

字节跳动大数据开发面试题-附答案(代码片段)

此面试题来自牛客网友分享的字节跳动应届一面,面试时长一小时。网友情况:985本硕。参考答案由本公众号提供。如有错误,欢迎指正!以下为面试过程中提问,岗位为大数据开发:自我介绍+项目介... 查看详情

字节跳动双11电商直播技术大揭秘

...团队中负责直播客户端SDK团队的徐鸿,请他来给大家揭秘双11抖音电商直播背后的技术实践,同时徐鸿也分享了他对音视频技术未来发展趋势的看法。字节跳动技术访谈#010#LiveVideoStack:徐老师,请问您目前在字节跳动... 查看详情

字节跳动总包50w的前端岗,压中原题了!(附答案)

大家都知道字节跳动已经大规模扩招半年了!半年来,身边的亲戚,朋友,同事不是入职了某条,就是在去往字节跳动面试的路上!我身边的一朋友更是百折不挠,去年就开始疯狂地投简历!时刻... 查看详情

字节跳动开源数据集成引擎bitsail的演进历程与能力解析

导读BitSail是字节跳动开源数据集成引擎,支持多种异构数据源间的数据同步,并提供离线、实时、全量、增量场景下全域数据集成解决方案,目前支撑了字节内部和火山引擎多个客户的数据集成需求。经过字节跳动各大业务线... 查看详情

字节跳动头条研发---测试开发一面面经(附答案)

...里有不少刚入门测试的同学,这两天抽空整理了一份字节测开实习的面试题答案,说实话这个题目真挺简单的,如果你面大厂碰到此类面试题,也算是运气很好啦。大家也可以先自测一下,看看自己 查看详情

面试阿里,字节跳动必问jvm问题!你不进来看看吗?附答案

...器,是线程私有的,就是一个指针,指向方法区中的方法字节码。?静态域:static定义的静态成员。?常量池:编译时被确定并保存在.class文件中的(final)常量值和一些文本修饰的符号引用(类和接口的全限定名,字段的名称和描... 查看详情

字节跳动java面试题,附详细答案解析

本篇文章主要内容数据缓存为何要使用缓存哪类数据适合缓存缓存的利与弊如何保证缓存和数据库一致性不更新缓存,而是删除缓存先操作缓存,还是先操作数据库非要保证数据库和缓存数据强一致该怎么办缓存和数据... 查看详情

presto在字节跳动的内部实践与优化

引言在字节跳动内部,Presto主要支撑了Ad-hoc查询、BI可视化分析、近实时查询分析等场景,日查询量接近100万条。功能性方面完全兼容SparkSQL语法,可以实现用户从SparkSQL到Presto的无感迁移;性能方面实现JoinReorder&#... 查看详情