flinksql--时态表或版本表(temporaltables或versionedtables)(代码片段)

宝哥大数据 宝哥大数据     2022-12-09     590

关键词:

文章目录

一、概念

时态表(Temporal Table)是一张随时间变化的表 – 在 Flink 中称为动态表,时态表中的每条记录都关联了一个或多个时间段,所有的 Flink 表都是时态的(动态的)。

时态表包含表的一个或多个有版本的表快照,时态表可以是一张跟踪所有变更记录的表(例如数据库表的 changelog,包含多个表快照),也可以是物化所有变更之后的表(例如数据库表,只有最新表快照)。

版本: 时态表可以划分成一系列带版本的表快照集合,表快照中的版本代表了快照中所有记录的有效区间,有效区间的开始时间和结束时间可以通过用户指定,根据时态表是否可以追踪自身的历史版本与否,时态表可以分为 版本表普通表

版本表: 如果时态表中的记录可以追踪和并访问它的历史版本,这种表我们称之为版本表,来自数据库的 changelog 可以定义成版本表。

普通表: 如果时态表中的记录仅仅可以追踪并和它的最新版本,这种表我们称之为普通表,来自数据库 或 HBase 的表可以定义成普通表。

二、设计初衷

2.1、关联一张版本表

以订单流关联产品表这个场景举例,orders 表包含了来自 Kafka 的实时订单流,product_changelog 表来自数据库表 products 的 changelog , 产品的价格在数据库表 products 中是随时间实时变化的。

SELECT * FROM product_changelog;

(changelog kind)  update_time  product_id product_name price
================= ===========  ========== ============ ===== 
+(INSERT)         00:01:00     p_001      scooter      11.11
+(INSERT)         00:02:00     p_002      basketball   23.11
-(UPDATE_BEFORE)  12:00:00     p_001      scooter      11.11
+(UPDATE_AFTER)   12:00:00     p_001      scooter      12.99
-(UPDATE_BEFORE)  12:00:00     p_002      basketball   23.11 
+(UPDATE_AFTER)   12:00:00     p_002      basketball   19.99
-(DELETE)         18:00:00     p_001      scooter      12.99 

表 product_changelog 表示数据库表 products不断增长的 changelog, 比如,产品 scooter 在时间点 00:01:00的初始价格是 11.11, 在 12:00:00 的时候涨价到了 12.99, 在 18:00:00 的时候这条产品价格记录被删除。

如果我们想输出 product_changelog 表在 10:00:00 对应的版本,表的内容如下所示:

update_time  product_id product_name price
===========  ========== ============ ===== 
00:01:00     p_001      scooter      11.11
00:02:00     p_002      basketball   23.11

如果我们想输出 product_changelog 表在 13:00:00 对应的版本,表的内容如下所示:

update_time  product_id product_name price
===========  ========== ============ ===== 
12:00:00     p_001      scooter      12.99
12:00:00     p_002      basketball   19.99

2.2、关联一张普通表

另一方面,某些用户案列需要连接变化的维表,该表是外部数据库表。

假设 LatestRates 是一个物化的最新汇率表 (比如:一张 HBase 表),LatestRates 总是表示 HBase 表 Rates 的最新内容。

我们在 10:15:00 时查询到的内容如下所示:

10:15:00 > SELECT * FROM LatestRates;

currency  rate
========= ====
US Dollar 102
Euro      114
Yen       1

我们在 11:00:00 时查询到的内容如下所示:

11:00:00 > SELECT * FROM LatestRates;

currency  rate
========= ====
US Dollar 102
Euro      116
Yen       1

三、时态表

注意 仅 Blink planner 支持此功能。

Flink 使用主键约束和事件时间来定义一张版本表版本视图

3.1、声明版本表

create table currency_rates ( 
 currency STRING, 
 conversion_rate DECIMAL(32, 2), 
 update_time TIMESTAMP(3), 
 PRIMARY KEY (currency) NOT ENFORCED, 	-- 1、定义主键
 WATERMARK FOR update_time AS update_time  -- 2、事件时间
) WITH ( 
 'connector.type' = 'filesystem', 
 'connector.path' = '/tmp/ratesHistory.csv', 
 'format.type' = 'csv' 
)

3.2、声明版本视图

-- 定义一张 append-only 表
create table RatesHistory  ( 
 currency STRING, 
 conversion_rate DECIMAL(32, 2), 
 update_time TIMESTAMP(3), 
 -- PRIMARY KEY (currency) NOT ENFORCED, 
 WATERMARK FOR update_time AS update_time  
) WITH ( 
 'connector.type' = 'filesystem', 
 'connector.path' = '/tmp/ratesHistory.csv', 
 'format.type' = 'csv' 
);

为了在 RatesHistory 上定义版本表,Flink 支持通过去重查询定义版本视图, 去重查询可以产出一个有序的 changelog 流,去重查询能够推断主键并保留原始数据流的事件时间属性。

create view versioned_rates AS
SELECT  currency,conversion_rate,update_time FROM  --  (1) `currency_time` 保留了事件时间
    (SELECT *,
         row_number() over(partition by currency -- (2) `currency` 是去重 query 的 unique key,可以作为主键
    ORDER BY  update_time DESC) AS row_num
    FROM currency_rates)
WHERE row_num=1;

(1) 保留了事件时间作为视图 versioned_rates 的事件时间,
(2) 使得视图 versioned_rates 有了主键, 因此视图 versioned_rates 是一个版本视图。

视图中的去重 query 会被 Flink 优化并高效地产出 changelog stream, 产出的 changelog 保留了主键约束和事件时间。

3.3、声明普通表

没有设置主键

 create table orders ( 
 order_id    STRING, 
 price       DECIMAL(32,2), 
 currency    STRING, 
 order_time  TIMESTAMP(3), 
 WATERMARK FOR order_time AS order_time  
) WITH ( 
 'connector.type' = 'filesystem', 
 'connector.path' = '/tmp/rateOrder.csv', 
 'format.type' = 'csv' 
);

四、时态表函数

4.1、定义时态表函数

五、案例

5.1、

如果流延迟过来的数据要跟之前的维表数据做关联,即根据流的事件时间,查找某个时间点的维度数据而不是当前维度表数据。
比如这样一个场景:用户的订单表和和商品维度表,将维度表设置成时态表,这样用户就可以根据订单表中的下单时间Join下单时的商品当时最新的维度数据

样例数据

数据源:ratesHistory.csv
RMB,114,2015-01-01 00:00:00
RMB,115,2015-01-03 00:00:00
RMB,116,2015-01-19 00:00:00
Euro,119,2015-01-03 00:00:00
USD,99,2015-01-03 00:00:00
USD,100,2015-01-03 00:00:00
Euro,118,2015-01-03 00:00:00


数据源:rateOrder.csv
1,29,RMB,2015-01-02 00:00:00
2,19,RMB,2015-01-03 00:00:00
3,33,RMB,2015-01-11 00:00:00
4,55,RMB,2015-01-21 00:00:00
 
 
 
create table currency_rates ( 
 currency STRING, 
 conversion_rate DECIMAL(32, 2), 
 update_time TIMESTAMP(3), 
 PRIMARY KEY (currency) NOT ENFORCED, 
 WATERMARK FOR update_time AS update_time  
) WITH ( 
 'connector.type' = 'filesystem', 
 'connector.path' = '/tmp/ratesHistory.csv', 
 'format.type' = 'csv' 
)

 create table orders ( 
 order_id    STRING, 
 price       DECIMAL(32,2), 
 currency    STRING, 
 order_time  TIMESTAMP(3), 
 WATERMARK FOR order_time AS order_time  
) WITH ( 
 'connector.type' = 'filesystem', 
 'connector.path' = '/tmp/rateOrder.csv', 
 'format.type' = 'csv' 
);

SELECT  order_id, price, orders.currency, conversion_rate, order_time 
FROM orders 
LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time -- Event Time Temporal Join
ON orders.currency = currency_rates.currency;

参考:

https://nightlies.apache.org/flink/flink-docs-release-1.12/zh/dev/table/streaming/versioned_tables.html
https://www.jianshu.com/p/733a53bcb9b9

1.19.5.3.时态表关联一张版本表关联一张普通表时态表声明版本表声明版本视图声明普通表时态表函数等(代码片段)

1.19.5.3.时态表(TemporalTables)1.19.5.3.1.设计初衷1.19.5.3.1.1.关联一张版本表1.19.5.3.1.2.关联一张普通表1.19.5.3.2.时态表1.19.5.3.2.1.声明版本表1.19.5.3.2.2.声明版本视图1.19.5.3.2.3.声明普通表1.19.5.3.3.时态表函数1.19.5.3.3.1.定义时态表 查看详情

删除系统版本化时态表的过程

】删除系统版本化时态表的过程【英文标题】:proceduretodropsystem-versionedtemporaltables【发布时间】:2019-04-0417:56:34【问题描述】:我正在寻找,最好不使用动态SQL。我查看了Microsoft文档并弄清楚了如何获取自动生成的历史表名称,... 查看详情

如何将系统版本化的时态表与实体框架一起使用?

】如何将系统版本化的时态表与实体框架一起使用?【英文标题】:HowcanIuseSystem-VersionedTemporalTablewithEntityFramework?【发布时间】:2017-05-2117:50:16【问题描述】:我可以在SQLServer2016中使用临时表。不幸的是,EntityFramework6还不知道... 查看详情

flinkintervaljoin,temporaljoin,lookupjoin区别(代码片段)

...实现变更的变更维度表(例如包含最新快照的数据库表)。FlinkSQL流批一体的核心是:流表二象性。围绕这一核心有若干概念,例如,动态表(DynamicTab 查看详情

在 Entity Framework Core 中查询系统版本时态表中的数据

】在EntityFrameworkCore中查询系统版本时态表中的数据【英文标题】:QueryingDatainaSystem-VersionedTemporalTableinEntityFrameworkCore【发布时间】:2019-11-0906:02:27【问题描述】:我们正在实施一种查询临时表的解决方案。当在SQLServer上为任何表... 查看详情

如何使用 SQL Server - 带有 EntityFramework Core 的时态表

】如何使用SQLServer-带有EntityFrameworkCore的时态表【英文标题】:HowtouseSQLServer-TemporaltableswithEntityFrameworkCore【发布时间】:2021-11-1216:13:13【问题描述】:所以,首先我在SQLServer中创建了一个临时(系统版本)表。为简单起见:CREATE... 查看详情

Laravel 5.5 错误基表或视图已存在:1050 表“用户”已存在

】Laravel5.5错误基表或视图已存在:1050表“用户”已存在【英文标题】:Laravel5.5ErrorBasetableorviewalreadyexists:1050Table\'users\'alreadyexists【发布时间】:2018-02-1803:14:45【问题描述】:规格:Laravel版本:5.5.3PHP版本:7.1数据库驱动程序和... 查看详情

如何复制时态表

】如何复制时态表【英文标题】:HowdoIreplicateatemporaltable【发布时间】:2019-01-0702:37:00【问题描述】:我有一个临时表,我想使用事务复制来复制它。历史表不能有事务复制所需的主键。当我尝试复制当前表时,复制失败,因为... 查看详情

实体框架不使用时态表

】实体框架不使用时态表【英文标题】:EntityFrameworknotworkingwithtemporaltable【发布时间】:2017-04-0603:28:52【问题描述】:我正在使用数据库优先实体框架6。将架构中的一些表更改为临时表后,我在尝试插入新数据时开始收到以下... 查看详情

使用 CTE 优化时态表

】使用CTE优化时态表【英文标题】:OptimizetemporaltablewithCTE【发布时间】:2019-07-2603:29:56【问题描述】:我创建临时表以设置级别:CREATETABLE[#DesignLvl]([DesignKey]INT,[DesignLevel]INT);WITHRCTEAS(SELECT*,1AS[Lvl]FROM[Design]WHERE[ParentDesignKey]ISNULLUNIO... 查看详情

在 oracle 中加入时态表

】在oracle中加入时态表【英文标题】:joiningtemporaltablesinoracle【发布时间】:2021-01-0816:43:05【问题描述】:我正在寻找更好的解决方案来解决一个相当普遍的时态表问题。说我们有table_a(some_valueint,date_fromdate,date_todate)还有一系列... 查看详情

如何使用 JPA 实现时态表?

】如何使用JPA实现时态表?【英文标题】:HowtoimplementatemporaltableusingJPA?【发布时间】:2012-03-2111:47:54【问题描述】:我想知道如何使用EclipseLink在JPA2中实现temporaltables。我所说的时间是指定义有效期的表格。我面临的一个问题是... 查看详情

识别 Teradata Database 中的时态表

】识别TeradataDatabase中的时态表【英文标题】:IdentifyTemporalTableinTeradataDatabase【发布时间】:2021-02-2110:15:45【问题描述】:是否有可以从TeradataDatabase中获取所有TemporalTable列表的SQL查询?是否有任何临时表的特殊列来标识它们属于... 查看详情

在 EF6 中使用时态表 - PostgreSQL

】在EF6中使用时态表-PostgreSQL【英文标题】:UsingtemporaltableswithEF6-PostgreSQL【发布时间】:2022-01-1203:31:33【问题描述】:我将PostgreSQL用作我的EFCore6项目的关系数据库,但在运行迁移后,我无法弄清楚如何让时态表与PostgreSQL一起使... 查看详情

Entity Framework 6 不适用于时态表

】EntityFramework6不适用于时态表【英文标题】:EntityFramework6doesn\'tworkwithTemporaltable【发布时间】:2017-12-0906:11:00【问题描述】:我已更改现有表以使其成为临时表,然后我从数据库中更新了模型。我在表中添加了两个新列时遇到... 查看详情

95-910-150-源码-flinksql-flinksql的元数据管理(代码片段)

...务成为可能。3.Catalog和CatalogManager​在1.9版本发布之前,FlinkSQL完全借助于Calcite的Schema接口来管理注册的表,并且提供了ExternalCatalog接口,通过TableDescriptor定义外部系统 查看详情

NHibernate HQL 生成器支持 SQL Server 2016 时态表

】NHibernateHQL生成器支持SQLServer2016时态表【英文标题】:NHibernateHQLGeneratortosupportSQLServer2016temporaltables【发布时间】:2018-06-2503:12:45【问题描述】:我正在尝试在NHibernate4.x中实现对SQLServer2016时态表的基本支持。这个想法是从更改S... 查看详情

95-910-170-源码-flinksql-flinksql中的流和动态表

1.美图2.概述​SQL和关系代数在设计之初就针对的是静态的数据。静态数据是有界的,因此可以很容易地和表(关系)进行映射。但是对于一个不断变化的实时数据流而言,数据是无边界不断更新的,在将SQL应用在流上的时候,... 查看详情