sparksql字段血缘在vivo互联网的实践

vivo互联网      2022-04-26     527

关键词:

 作者:vivo互联网服务器团队-Hao Guangshi


一、背景

字段血缘是在表处理的过程中将字段的处理过程保留下来。为什么会需要字段血缘呢?

有了字段间的血缘关系,便可以知道数据的来源去处,以及字段之间的转换关系,这样对数据的质量,治理有很大的帮助。

Spark SQL 相对于 Hive 来说通常情况下效率会比较高,对于运行时间、资源的使用上面等都会有较大的收益。

平台计划将 Hive 任务迁移到 Spark SQL 上,同时也需要实现字段血缘的功能。

二、前期调研

开发前我们做了很多相关调研,从中得知 Spark 是支持扩展的:允许用户对 Spark SQL 的 SQL 解析、逻辑计划的分析和检查、逻辑计划的优化、物理计划的形成等进行扩展。

该方案可行,且对 Spark 的源码没有改动,代价也比较小,确定使用该方案。

三、Spark SQL 扩展

3.1 Spark 可扩展的内容

SparkSessionExtensions是比较重要的一个类,其中定义了注入规则的方法,现在支持以下内容:

  • 【Analyzer Rules】逻辑计划分析规则
  • 【Check Analysis Rules】逻辑计划检查规则
  • 【Optimizer Rules.】 逻辑计划优化规则
  • 【Planning Strategies】形成物理计划的策略
  • 【Customized Parser】自定义的sql解析器
  • 【(External) Catalog listeners catalog】监听器

在以上六种可以用户自定义的地方,我们选择了【Check Analysis Rules】。因为该检查规则在方法调用的时候是不需要有返回值的,也就意味着不需要对当前遍历的逻辑计划树进行修改,这正是我们需要的。

而【Analyzer Rules】、【Optimizer Rules】则需要对当前的逻辑计划进行修改,使得我们难以迭代整个树,难以得到我们想要的结果。

3.2 实现自己的扩展

class ExtralSparkExtension extends (SparkSessionExtensions => Unit) 
override def apply(spark: SparkSessionExtensions): Unit =

//字段血缘
spark.injectCheckRule(FieldLineageCheckRuleV3)

//sql解析器
spark.injectParser case (_, parser) => new ExtraSparkParser(parser)


上面按照这种方式实现扩展,并在 apply 方法中把自己需要的规则注入到 SparkSessionExtensions 即可,除了以上四种可以注入的以外还有其他的规则。要让 ExtralSparkExtension 起到作用的话我们需要在spark-default.conf 下配置 spark.sql.extensions=org.apache.spark.sql.hive.ExtralSparkExtension 在启动 Spark 任务的时候即可生效。

注意到我们也实现了一个自定义的SQL解析器,其实该解析器并没有做太多的事情。只是在判断如果该语句包含insert的时候就将 SQLText(SQL语句)设置到一个为 FIELD_LINE_AGE_SQL,之所以将SQLText放到 FIELD_LINE_AGE_SQL 里面。因为在 DheckRule 里面是拿不到SparkPlan的我们需要对SQL再次解析拿到 SprkPlan,而FieldLineageCheckRuleV3的实现也特别简单,重要的在另一个线程实现里面。

这里我们只关注了insert语句,因为插入语句里面有从某些个表里面输入然后写入到某个表。

class ExtraSparkParser(delegate: ParserInterface) extends ParserInterface with Logging

override def parsePlan(sqlText: String): LogicalPlan =
val lineAgeEnabled = SparkSession.getActiveSession
.get.conf.getOption("spark.sql.xxx-xxx-xxx.enable").getOrElse("false").toBoolean
logDebug(s"SqlText: $sqlText")
if(sqlText.toLowerCase().contains("insert"))
if(lineAgeEnabled)
if(FIELD_LINE_AGE_SQL_COULD_SET.get())
//线程本地变量在这里
FIELD_LINE_AGE_SQL.set(sqlText)

FIELD_LINE_AGE_SQL_COULD_SET.remove()


delegate.parsePlan(sqlText)

//调用原始的sqlparser
override def parseExpression(sqlText: String): Expression =
delegate.parseExpression(sqlText)

//调用原始的sqlparser
override def parseTableIdentifier(sqlText: String): TableIdentifier =
delegate.parseTableIdentifier(sqlText)

//调用原始的sqlparser
override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier =
delegate.parseFunctionIdentifier(sqlText)

//调用原始的sqlparser
override def parseTableSchema(sqlText: String): StructType =
delegate.parseTableSchema(sqlText)

//调用原始的sqlparser
override def parseDataType(sqlText: String): DataType =
delegate.parseDataType(sqlText)

3.3 扩展的规则类

case class FieldLineageCheckRuleV3(sparkSession:SparkSession) extends (LogicalPlan=>Unit ) 

val executor: ThreadPoolExecutor =
ThreadUtils.newDaemonCachedThreadPool("spark-field-line-age-collector",3,6)

override def apply(plan: LogicalPlan): Unit =
val sql = FIELD_LINE_AGE_SQL.get
FIELD_LINE_AGE_SQL.remove()
if(sql != null)
//这里我们拿到sql然后启动一个线程做剩余的解析任务
val task = new FieldLineageRunnableV3(sparkSession,sql)
executor.execute(task)



很简单,我们只是拿到了 SQL 然后便启动了一个线程去得到 SparkPlan,实际逻辑在FieldLineageRunnableV3。

3.4 具体的实现方法

3.4.1 得到 SparkPlan

我们在 run 方法中得到 SparkPlan:

override def run(): Unit = 
val parser = sparkSession.sessionState.sqlParser
val analyzer = sparkSession.sessionState.analyzer
val optimizer = sparkSession.sessionState.optimizer
val planner = sparkSession.sessionState.planner
............
val newPlan = parser.parsePlan(sql)
PASS_TABLE_AUTH.set(true)
val analyzedPlan = analyzer.executeAndCheck(newPlan)

val optimizerPlan = optimizer.execute(analyzedPlan)
//得到sparkPlan
val sparkPlan = planner.plan(optimizerPlan).next()
...............
if(targetTable != null)
val levelProject = new ArrayBuffer[ArrayBuffer[NameExpressionHolder]]()
val predicates = new ArrayBuffer[(String,ArrayBuffer[NameExpressionHolder])]()
//projection
projectionLineAge(levelProject, sparkPlan.child)
//predication
predicationLineAge(predicates, sparkPlan.child)
...............

为什么要使用 SparkPlan 呢?当初我们考虑的时候,物理计划拿取字段关系的时候是比较准的,且链路比较短也更直接。

在这里补充一下 Spark SQL 解析的过程如下:

Spark

经过SqlParser后会得到逻辑计划,此时表名、函数等都没有解析,还不能执行;经过Analyzer会分析一些绑定信息,例如表验证、字段信息、函数信息;经过Optimizer 后逻辑计划会根据既定规则被优化,这里的规则是RBO,当然 Spark 还支持CBO的优化;经过SparkPlanner后就成了可执行的物理计划。

我们看一个逻辑计划与物理计划对比的例子:

一个 SQL 语句:

select item_id,TYPE,v_value,imei from t1
union all
select item_id,TYPE,v_value,imei from t2
union all
select item_id,TYPE,v_value,imei from t3

逻辑计划是这样的:

Spark

物理计划是这样的:

Spark

显然简化了很多。

得到 SparkPlan 后,我们就可以根据不同的SparkPlan节点做迭代处理。

我们将字段血缘分为两种类型:projection(select查询字段)、predication(wehre查询条件)。

这两种是一种点对点的关系,即从原始表的字段生成目标表的字段的对应关系。

想象一个查询是一棵树,那么迭代关系会如下从树的顶端开始迭代,直到树的叶子节点,叶子节点即为原始表:

Spark

那么我们迭代查询的结果应该为

 id ->tab1.id ,

 name->tab1.name,tabb2.name,

 age→tabb2.age。

注意到有该变量 val levelProject = new ArrayBuffer ArrayBuffer[NameExpressionHolder],通过projecti-onLineAge 迭代后 levelProject 存储了顶层id,name,age对应的(tab1.id),(tab1.name,tabb2.name),(tabb2.age)。

当然也不是简单的递归迭代,还需要考虑特殊情况例如:Join、ExplandExec、Aggregate、Explode、GenerateExec等都需要特殊考虑。

例子及效果:

SQL:

with A as (select id,name,age from tab1 where id > 100 ) ,
C as 查看详情

sparksql字段血缘扩展实践!(代码片段)

目录一、背景二、前期调研三、SparkSQL扩展3.1Spark可扩展的内容3.2实现自己的扩展3.3扩展的规则类3.4具体的实现方法四、总结一、背景字段血缘是在表处理的过程中将字段的处理过程保留下来。为什么会需要字段血缘呢?有了... 查看详情

vivo互联网机器学习平台的建设与实践

vivo互联网产品团队-Wangxiao随着广告和内容等推荐场景的扩展,算法模型也在不断演进迭代中。业务的不断增长,模型的训练、产出迫切需要进行平台化管理。vivo互联网机器学习平台主要业务场景包括游戏分发、商店、商城、内... 查看详情

异构混排在vivo互联网的技术实践

作者:vivo互联网算法团队-ShenJiyi本文根据沈技毅老师在“2022vivo开发者大会"现场演讲内容整理而成。混排层负责将多个异构队列的结果如广告、游戏、自然量等进行融合,需要在上下游和业务多重限制下取得最优解,相对复杂... 查看详情

redis内存优化在vivo的探索与实践

...内存优化建议及内存异常处理相关的优化措施。分享vivo互联网技术干货与沙龙活动,推荐最新行业动态与热门会议。 查看详情

全链路数据血缘在满帮的实践

...数据血缘是数据资产的重要组成部分,用于分析表和字段从数据源到当前表的血缘路径,以及血缘字段之间存在的关系是否满足,并关注数据一致性以及表设计的合理。它描述了数据从收集,生产到服务的全链路... 查看详情

vivo大数据日志采集agent设计实践

作者:vivo互联网存储技术团队-QiuSidi在企业大数据体系建设过程中,数据采集是其中的首要环节。然而,当前行业内的相关开源数据采集组件,并无法满足企业大规模数据采集的需求与有效的数据采集治理,所以大部分企业都采... 查看详情

redis在vivo推送平台的应用与优化实践

作者:vivo互联网服务器团队-YuQuan一、推送平台特点vivo推送平台是vivo公司向开发者提供的消息推送服务,通过在云端与客户端之间建立一条稳定、可靠的长连接,为开发者提供向客户端应用实时推送消息的服务,支持百亿级的... 查看详情

vivo服务端监控体系建设实践

作者:vivo互联网服务器团队-ChenNingning本文根据“2022vivo开发者大会"现场演讲内容整理而成。经过几年的平台建设,vivo监控平台产品矩阵日趋完善,在vivo终端庞大的用户群体下,承载业务运行的服务数量众多,监控服务体系是... 查看详情

事件驱动架构在vivo内容平台的实践(代码片段)

作者:vivo互联网服务器团队-GaoXiang一、什么是事件驱动架构当下,随着微服务的兴起,容器化技术的发展,以及云原生、serverless概念的普及,事件驱动再次引起业界的广泛关注。所谓事件驱动的架构,也就是使用事件来实现跨... 查看详情

vivo基于jacoco的测试覆盖率设计与实践

作者:vivo互联网服务器团队-XuShen本文主要介绍vivo内部研发平台使用JaCoCo实现测试覆盖率的实践,包括JaCoCo原理介绍以及在实践过程中遇到的新增代码覆盖率统计问题和频繁发布导致覆盖率丢失问题的解决办法。一、为什么需要... 查看详情

httpclient在vivo内销浏览器的高并发实践优化

作者:vivo互联网服务器团队-ZhiGuangquanHttpClient作为Java程序员最常用的Http工具,其对Http连接的管理能简化开发,并且提升连接重用效率;在正常情况下,HttpClient能帮助我们高效管理连接,但在一些并发高,报文体较大的情况下... 查看详情

vivo服务端监控架构设计与实践(代码片段)

作者:vivo互联网服务器团队-DengHaibo一、业务背景当今时代处在信息大爆发的时代,信息借助互联网的潮流在全球自由的流动,产生了各式各样的平台系统和软件系统,越来越多的业务也会导致系统的复杂性。当核心业务出现了... 查看详情

vivo大规模特征存储实践

本文首发于vivo互联网技术微信公众号?链接:https://mp.weixin.qq.com/s/u1LrIBtY6wNVE9lzvKXWjA作者:黄伟锋本文旨在介绍vivo内部的特征存储实践、演进以及未来展望,抛砖引玉,吸引更多优秀的想法。一、需求分析AI技术在vivo内部应用越... 查看详情

vivo前端智能化实践:机器学习在自动网页布局中的应用

作者:vivo互联网前端团队-SuNing在设计稿转网页中运用基于self-attention机制设计的机器学习模型进行设计稿的布局,能够结合dom节点的上下文得出合理的方案。一、背景切图作为前端的传统手艺却是大多数前端开发者都不愿面对... 查看详情

端到端qoe优化实践,视频播放体验优化,视频评测体系构建,基于大数据的vmaf质量计算...

...画质评估模型及相关数据库建立3.端上网络轻量化加速vivo互联网视频播放体验优化Topic《vivo互联网视频播放体验优化的探索与实践》王道环 vivo互联网研发经理随着vivo互联网在视频业务领域的不断扩展,在多样化的业务场景... 查看详情

大数据sparksql连接查询中的谓词下推处理

本文首发于vivo互联网技术微信公众号作者:李勇目录:1.SparkSql2.连接查询和连接条件3.谓词下推4.内连接查询中的谓词下推规则4.1.Join后条件通过AND连接4.2.Join后条件通过OR连接4.3.分区表使用OR连接过滤条件1.SparkSqlSparkSql是架构在S... 查看详情

dubbo泛化调用在vivo统一配置系统的应用

作者:vivo互联网服务器团队-WangFei、LinYupanDubbo泛化调用特性可以在不依赖服务接口API包的场景中发起远程调用,这种特性特别适合框架集成和网关类应用开发。本文结合在实际开发过程中所遇到的需要远程调用多个三方系统的问... 查看详情