sparksql字段血缘扩展实践!(代码片段)

浊酒南街 浊酒南街     2023-02-15     451

关键词:

目录

一、背景

字段血缘是在表处理的过程中将字段的处理过程保留下来。为什么会需要字段血缘呢?

有了字段间的血缘关系,便可以知道数据的来源去处,以及字段之间的转换关系,这样对数据的质量,治理有很大的帮助。

Spark SQL 相对于 Hive 来说通常情况下效率会比较高,对于运行时间、资源的使用上面等都会有较大的收益。

平台计划将 Hive 任务迁移到 Spark SQL 上,同时也需要实现字段血缘的功能。

二、前期调研

开发前我们做了很多相关调研,从中得知 Spark 是支持扩展的:允许用户对 Spark SQL 的 SQL 解析、逻辑计划的分析和检查、逻辑计划的优化、物理计划的形成等进行扩展。

该方案可行,且对 Spark 的源码没有改动,代价也比较小,确定使用该方案。

三、Spark SQL 扩展

3.1 Spark 可扩展的内容

SparkSessionExtensions是比较重要的一个类,其中定义了注入规则的方法,现在支持以下内容:

【Analyzer Rules】逻辑计划分析规则

【Check Analysis Rules】逻辑计划检查规则

【Optimizer Rules.】 逻辑计划优化规则

【Planning Strategies】形成物理计划的策略

【Customized Parser】自定义的sql解析器

【(External) Catalog listeners catalog】监听器

在以上六种可以用户自定义的地方,我们选择了【Check Analysis Rules】。因为该检查规则在方法调用的时候是不需要有返回值的,也就意味着不需要对当前遍历的逻辑计划树进行修改,这正是我们需要的。

而【Analyzer Rules】、【Optimizer Rules】则需要对当前的逻辑计划进行修改,使得我们难以迭代整个树,难以得到我们想要的结果。

3.2 实现自己的扩展

class ExtralSparkExtension extends (SparkSessionExtensions => Unit) 
  override def apply(spark: SparkSessionExtensions): Unit = 

    //字段血缘
    spark.injectCheckRule(FieldLineageCheckRuleV3)

    //sql解析器
    spark.injectParser  case (_, parser) => new ExtraSparkParser(parser) 

  

上面按照这种方式实现扩展,并在 apply 方法中把自己需要的规则注入到 SparkSessionExtensions 即可,除了以上四种可以注入的以外还有其他的规则。要让 ExtralSparkExtension 起到作用的话我们需要在spark-default.conf下配置

spark.sql.extensions=org.apache.spark.sql.hive.ExtralSparkExtension

在启动 Spark 任务的时候即可生效。

注意到我们也实现了一个自定义的SQL解析器,其实该解析器并没有做太多的事情。只是在判断如果该语句包含insert的时候就将 SQLText(SQL语句)设置到一个为FIELD_LINE_AGE_SQL,之所以将SQLText放到FIELD_LINE_AGE_SQL里面。因为在 DheckRule 里面是拿不到SparkPlan的我们需要对SQL再次解析拿到 SprkPlan,而FieldLineageCheckRuleV3的实现也特别简单,重要的在另一个线程实现里面。

这里我们只关注了insert语句,因为插入语句里面有从某些个表里面输入然后写入到某个表。

class ExtraSparkParser(delegate: ParserInterface) extends ParserInterface with Logging

  override def parsePlan(sqlText: String): LogicalPlan = 
    val lineAgeEnabled = SparkSession.getActiveSession
      .get.conf.getOption("spark.sql.xxx-xxx-xxx.enable").getOrElse("false").toBoolean
    logDebug(s"SqlText: $sqlText")
    if(sqlText.toLowerCase().contains("insert"))
      if(lineAgeEnabled)
        if(FIELD_LINE_AGE_SQL_COULD_SET.get())
          //线程本地变量在这里
          FIELD_LINE_AGE_SQL.set(sqlText)
        
        FIELD_LINE_AGE_SQL_COULD_SET.remove()
      
    
    delegate.parsePlan(sqlText)
  
  //调用原始的sqlparser
  override def parseExpression(sqlText: String): Expression = 
    delegate.parseExpression(sqlText)
  
  //调用原始的sqlparser
  override def parseTableIdentifier(sqlText: String): TableIdentifier = 
    delegate.parseTableIdentifier(sqlText)
  
  //调用原始的sqlparser
  override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = 
    delegate.parseFunctionIdentifier(sqlText)
  
  //调用原始的sqlparser
  override def parseTableSchema(sqlText: String): StructType = 
    delegate.parseTableSchema(sqlText)
  
  //调用原始的sqlparser
  override def parseDataType(sqlText: String): DataType = 
    delegate.parseDataType(sqlText)
  

3.3 扩展的规则类

case class FieldLineageCheckRuleV3(sparkSession:SparkSession) extends (LogicalPlan=>Unit ) 

  val executor: ThreadPoolExecutor =
    ThreadUtils.newDaemonCachedThreadPool("spark-field-line-age-collector",3,6)

  override def apply(plan: LogicalPlan): Unit = 
    val sql = FIELD_LINE_AGE_SQL.get
    FIELD_LINE_AGE_SQL.remove()
    if(sql != null)
      //这里我们拿到sql然后启动一个线程做剩余的解析任务
      val task = new FieldLineageRunnableV3(sparkSession,sql)
      executor.execute(task)
    

  

很简单,我们只是拿到了 SQL 然后便启动了一个线程去得到 SparkPlan,实际逻辑在

FieldLineageRunnableV3。

3.4 具体的实现方法

3.4.1 得到 SparkPlan

我们在 run 方法中得到 SparkPlan:

override def run(): Unit = 
  val parser = sparkSession.sessionState.sqlParser
  val analyzer = sparkSession.sessionState.analyzer
  val optimizer = sparkSession.sessionState.optimizer
  val planner = sparkSession.sessionState.planner
      ............
  val newPlan = parser.parsePlan(sql)
  PASS_TABLE_AUTH.set(true)
  val analyzedPlan = analyzer.executeAndCheck(newPlan)

  val optimizerPlan = optimizer.execute(analyzedPlan)
  //得到sparkPlan
  val sparkPlan = planner.plan(optimizerPlan).next()
  ...............
if(targetTable != null)
  val levelProject = new ArrayBuffer[ArrayBuffer[NameExpressionHolder]]()
  val predicates = new ArrayBuffer[(String,ArrayBuffer[NameExpressionHolder])]()
  //projection
  projectionLineAge(levelProject, sparkPlan.child)
  //predication
  predicationLineAge(predicates, sparkPlan.child)
  ...............

为什么要使用 SparkPlan 呢?当初我们考虑的时候,物理计划拿取字段关系的时候是比较准的,且链路比较短也更直接。

在这里补充一下 Spark SQL 解析的过程如下:

经过SqlParser后会得到逻辑计划,此时表名、函数等都没有解析,还不能执行;经过Analyzer会分析一些绑定信息,例如表验证、字段信息、函数信息;经过Optimizer 后逻辑计划会根据既定规则被优化,这里的规则是RBO,当然 Spark 还支持CBO的优化;经过SparkPlanner后就成了可执行的物理计划。

我们看一个逻辑计划与物理计划对比的例子:
一个 SQL 语句:


select item_id,TYPE,v_value,imei from t1
union all
select item_id,TYPE,v_value,imei from t2
union all
select item_id,TYPE,v_value,imei from t3

逻辑计划是这样的:

物理计划是这样的:

显然简化了很多。
得到 SparkPlan 后,我们就可以根据不同的SparkPlan节点做迭代处理。
我们将字段血缘分为两种类型:projection(select查询字段)、predication(wehre查询条件)。
这两种是一种点对点的关系,即从原始表的字段生成目标表的字段的对应关系。
想象一个查询是一棵树,那么迭代关系会如下从树的顶端开始迭代,直到树的叶子节点,叶子节点即为原始表:

那么我们迭代查询的结果应该为

id ->tab1.id ,

name->tab1.name,tabb2.name,

age→tabb2.age。

注意到有该变量

val levelProject = new ArrayBuffer

ArrayBuffer[NameExpressionHolder],通过projecti-onLineAge 迭代后 levelProject 存储了顶层id,name,age对应的(tab1.id),(tab1.name,tabb2.name),(tabb2.age)。

当然也不是简单的递归迭代,还需要考虑特殊情况例如:Join、ExplandExec、Aggregate、Explode、GenerateExec等都需要特殊考虑。
例子及效果:

SQL:

with A as (select id,name,age from tab1 where id > 100 ) ,
C as (select id,name,max(age) from A group by A.id,A.name) ,
B as (select id,name,age from tabb2 where age > 28)
insert into tab3
   select C.id,concat(C.name,B.name) as name, B.age from
     B,C where C.id = B.id

效果:


  "edges": [
    
      "sources": [
        3
      ],
      "targets": [
        0
      ],
      "expression": "id",
      "edgeType": "PROJECTION"
    ,
    
      "sources": [
        4,
        7
      ],
      "targets": [
        1
      ],
      "expression": "name",
      "edgeType": "PROJECTION"
    ,
    
      "sources": [
        5
      ],
      "targets": [
        2
      ],
      "expression": "age",
      "edgeType": "PROJECTION"
    ,
    
      "sources": [
        6,
        3
      ],
      "targets": [
        0,
        1,
        2
      ],
      "expression": "INNER",
      "edgeType": "PREDICATE"
    ,
    
      "sources": [
        6,
        5
      ],
      "targets": [
        0,
        1,
        2
      ],
      "expression": "((((default.tabb2.`age` IS NOT NULL) AND (CAST(default.tabb2.`age` AS INT) > 28)) AND (B.`id` > 100)) AND (B.`id` IS NOT NULL))",
      "edgeType": "PREDICATE"
    ,
    
      "sources": [
        3
      ],
      "targets": [
        0,
        1,
        2
      ],
      "expression": "((default.tab1.`id` IS NOT NULL) AND (default.tab1.`id` > 100))",
      "edgeType": "PREDICATE"
    
  ],
  "vertices": [
    
      "id": 0,
      "vertexType": "COLUMN",
      "vertexId": "default.tab3.id"
    ,
    
      "id": 1,
      "vertexType": "COLUMN",
      "vertexId": "default.tab3.name"
    ,
    
      "id": 2,
      "vertexType": "COLUMN",
      "vertexId": "default.tab3.age"
    ,
    
      "id": 3,
      "vertexType": "COLUMN",
      "vertexId": "default.tab1.id"
    ,
    
      "id": 4,
      "vertexType": "COLUMN",
      "vertexId": "default.tab1.name"
    ,
    
      "id": 5,
      "vertexType": "COLUMN",
      "vertexId": "default.tabb2.age"
    ,
    
      "id": 6,
      "vertexType": "COLUMN",
      "vertexId": "default.tabb2.id"
    ,
    
      "id": 7,
      "vertexType": "COLUMN",
      "vertexId": "default.tabb2.name"
    
  ]


四、总结

在 Spark SQL 的字段血缘实现中,我们通过其自扩展,首先拿到了 insert 语句,在我们自己的检查规则中拿到

SQL 语句,通过SparkSqlParser、Analyzer、Optimizer、SparkPlanner,最终得到了物理计划。

我们通过迭代物理计划,根据不同执行计划做对应的转换,然后就得到了字段之间的对应关系。当前的实现是比较简单的,字段之间是直线的对应关系,中间过程被忽略,如果想实现字段的转换的整个过程也是没有问题的。

sparksql字段血缘扩展实践!(代码片段)

目录一、背景二、前期调研三、SparkSQL扩展3.1Spark可扩展的内容3.2实现自己的扩展3.3扩展的规则类3.4具体的实现方法四、总结一、背景字段血缘是在表处理的过程中将字段的处理过程保留下来。为什么会需要字段血缘呢?有了... 查看详情

sparksql字段血缘在vivo互联网的实践

...间的转换关系,这样对数据的质量,治理有很大的帮助。SparkSQL相对于Hive来说通常情况下效率会比较高,对于运行时间、资源的使用上面等都会有较大的收益。平台计划将Hive任务迁移到Spar 查看详情

atlas初体验(代码片段)

介绍最近由于内部需要做sparksql的字段血缘关系,碰巧看到github有人提供了spark的atlas插件,准备调研一下看能否满足需求。介绍:Atlas是Hadoop的数据治理和元数据框架。Atlas是一组可扩展和可扩展的核心基础治理服务&#... 查看详情

第五周周二练习:实验5sparksql编程初级实践(代码片段)

1.题目:源码:importjava.util.Propertiesimportorg.apache.spark.sql.types._importorg.apache.spark.sql.Rowimportorg.apache.spark.sql.SparkSessionimportorg.apache.spark.sql.DataFrameReaderobjectTestMySQLdefma 查看详情

全链路数据血缘在满帮的实践

...数据血缘是数据资产的重要组成部分,用于分析表和字段从数据源到当前表的血缘路径,以及血缘字段之间存在的关系是否满足,并关注数据一致性以及表设计的合理。它描述了数据从收集,生产到服务的全链路... 查看详情

sparksparksql物化视图技术原理与实践(代码片段)

1.概述转载:SparkSQL物化视图技术原理与实践2.导言本文将基于SparkSQL(2.4.4)+Hive(2.3.6),介绍物化视图在SparkSQL中的实现及应用。3.什么是物化视图物化视图主要用于预先计算并保存表连接或聚合等耗时较多的操作的结果,这样... 查看详情

基于antlr-3.5.2+python实现一般hivesql血缘解析(代码片段)

目录 前言一、目标二、准备工作1.Hivegrammar语法文件获取2.antrl下载3.pyjnius下载 4.cython下载三、源码改写及实现1.修订HiveLexer.g2.HiveParser.g代码修订3.局限性 前言作为一个开发实践项目,实现对HiveSQL语句的解析可以很有效的作... 查看详情

spark列级血缘(字段级别血缘)开发与实现

...能一定程度上解决数据的依赖关系,但是对于精确到字段之间的关系识别则显得捉襟见肘。开发此项目的用意是为了能够加强spark在列级血缘上的追踪优势。知识铺垫datase 查看详情

sparksql下的parquet使用最佳实践和代码实战

一:SparkSQL下的Parquet使用最佳实践1,过去整个业界对大数据的分析的技术栈的Pipeline一般分为一下两种方式:A)DataSource->HDFS->MR/Hive/Spark(相当于ETL)->HDFSParquet->SparkSQL/impala->ResultService(可以放在DB 查看详情

实验5sparksql编程初级实践

SparkSQL基本操作 (1)查询所有数据; (2)查询所有数据,并去除重复的数据; (3)查询所有数据,打印时去除id字段; (4)筛选出age>30的记录; (5)将数据按age分组; (6)将数据按name升序排列; (7)取出前3行数... 查看详情

git工程开发实践——git工程实践扩展(代码片段)

Git工程开发实践(六)——Git工程实践扩展一、Git提交日志规范1、Git提交日志模板Git支持对每次提交的日志信息进行规范,可以通过设置提交模板实现。建立一个gitCommitTemplate文件,内容为:#commitmessage包含三部分,header,body和fo... 查看详情

sparksql下的parquet使用最佳实践和代码实战

...)DataSource->HDFS->MR/Hive/Spark(相当于ETL)->HDFSParquet->SparkSQL/impala->ResultService(可以放在DB中,也有可能被通过JDBC/ODBC来作为数 查看详情

sparksql实现自定义函数(代码片段)

Sparksql实现自定义函数文章目录一、为什么要自定义function?二、实现自定义的函数三、测试效果总结一、为什么要自定义function?有小伙伴可能会疑问:SparkSql提供了编写UDF和UDAF的接口扩展,为什么还有开发自定... 查看详情

sparksql实现自定义函数(代码片段)

Sparksql实现自定义函数文章目录一、为什么要自定义function?二、实现自定义的函数三、测试效果总结一、为什么要自定义function?有小伙伴可能会疑问:SparkSql提供了编写UDF和UDAF的接口扩展,为什么还有开发自定... 查看详情

通过udr扩展gbase8s查询行为的工程实践(代码片段)

​应用场景最近遇到一个有意思的需求,即某个字段里面存放着一些列数字,以逗号分割,数字两两一组,33.112,23.11,22.321,24.3333没错,每一组就是一个坐标,现在需要将他们处理成Json字符串 ["lnt":... 查看详情

dubbo源码实践-spi扩展-自适应扩展机制(代码片段)

目录1前提必备知识2术语定义3自适应扩展机制的特点4扩展点实践4.1用户自定义自适应扩展4.2dubbo生成自适应扩展4自适应扩展类的用途1前提必备知识具体的使用和原理就不说了,网上有人写的挺好的了。可以参考:DubboSPI... 查看详情

实践|sentinel扩展性设计(代码片段)

摘要:Sentinel提供多样的SPI接口用于提供扩展的能力。用户可以在用同一个sentinel-core的基础上自行扩展接口实现,从而可以方便地给Sentinel添加自定义的逻辑。初始化逻辑扩展机制为了统一初始化的流程,我们抽象出了InitFunc接... 查看详情

golang/go语言go语言代码实践——高复用易扩展性代码训练(代码片段)

...功能:对数据库中同一张表的所有数据的两个不同的字段完成赋值。为了给每个字段赋值,会分别设置两个定时任务,一个是正常的任务处理,5分钟执行一次,负责捞取最近30分钟的数据并完成赋值。另一个... 查看详情