使用blinkcep实现差值聚合计算

author author     2022-12-09     171

关键词:

​​使用Blink SQL+UDAF实现差值聚合计算​​介绍了如何使用Blink SQL+UDAF实现实时流上的差值聚合计算,后来在与@付典就业务需求和具体实现方式进行探讨时,付典提出通过​​CEP​​实现的思路和方法。
本文介绍通过CEP实现实时流上的差值聚合计算。
感谢@付典在实现过程中的指导。笔者水平有限,若有纰漏,请批评指出。

使用Blink

一、客户需求

电网公司每天采集各个用户的电表数据(格式如下表),其中data_date为电表数据上报时间,cons_id为电表id,r1为电表度数,其他字段与计算逻辑无关,可忽略。为了后续演示方便,仅输入cons_id=100000002的数据。

no(string)

data_date(string)

cons_id(string)

org_no(string)

r1(double)

101

20190716

100000002

35401

13.76

101

20190717

100000002

35401

14.12

101

20190718

100000002

35401

16.59

101

20190719

100000002

35401

18.89

表1:输入数据
电网公司希望通过实时计算(Blink)对电表数据处理后,每天得到每个电表最近两天(当天和前一天)的差值数据,结果类似如下表:

cons_id(string)

data_date(string)

subDegreeR1(double)

100000002

20190717

0.36

100000002

20190718

2.47

100000002

20190719

2.3

表2:期望的输出数据

二、需求分析

根据业务需求以及CEP跨事件模式匹配的特性,定义两个CEP事件e1和e2,输出e2.r1-e1.r1即可得到差值。

三、CEP开发及测试结果

参考​​复杂事件处理(CEP)语句​​,CEP代码如下:

CREATE TABLE input_dh_e_mp_read_curve (
`no` VARCHAR,
data_date VARCHAR,
cons_id VARCHAR,
org_no VARCHAR,
r1 DOUBLE,
ts as TO_TIMESTAMP(concat(data_date,000000),yyyyMMddHHmmss)
,WATERMARK wk FOR ts as withOffset(ts, 2000)
) WITH (
type = datahub,
endPoint = http://dh-cn-shanghai.aliyun-inc.com,
roleArn=acs:ram::XXX:role/aliyunstreamdefaultrole,
project = jszc_datahub,
topic = input_dh_e_mp_read_curve
);

CREATE TABLE data_out(
cons_id varchar
,data_date varchar
,subDegreeR1 DOUBLE
)with(
type = print
);

insert into data_out
select
cons_id,
data_date,
subDegreeR1
from input_dh_e_mp_read_curve
MATCH_RECOGNIZE(
PARTITION BY cons_id
ORDER BY ts
MEASURES
e2.data_date as data_date,
e2.r1 - e1.r1 as subDegreeR1
ONE ROW PER MATCH
AFTER MATCH SKIP TO NEXT ROW
PATTERN(e1 e2)
DEFINE
e1 as TRUE,
e2 as TRUE
);

由于使用了print connector,从对应的sink的taskmanager.out日志中可以查看到输出如下:

task-1> (+)100000002,20190717,0.35999999999999943
task-1> (+)100000002,20190718,2.4700000000000006

对比期望输出(表2),20190717和20190718两个窗口的数据均正确,表明业务逻辑正确,但此输出与期望输出有少许差异:
(1)20190719的数据没有输出,这是因为我们设置了watermark,测试环境下20190719之后没有数据进来触发20190719对应的窗口的结束。


四、其他说明

1、对比​​使用Blink SQL+UDAF实现差值聚合计算(1)​​,我们可以看出使用CEP开发代码非常简洁,所以在跨事件处理的情况下CEP还是非常的合适。从另外一个方面讲,同样的需求有不同的实现方式,所以融会贯通Blink SQL中的各种语法,利用更合适的语法来实现业务需求,将可能大大提升工作效率和业务性能。
2、在实现本案例时,笔者发现使用CEP时有如下需要注意的地方:
(1)partiton by里的字段(如本案的cons_id),默认会带到输出里,若同时在MEASURES中定义,则可能会报类似如下错误:

使用Blink


(2)define及其内容必须定义,否则前端页面提示类似如下错误:

使用Blink

使用Blink

moment实现计算两个时间的差值

varm1=moment(‘2018-08-1411:00:00‘),m2=moment(‘2018-08-1412:10:00‘);console.log(m1)console.log(m2)console.log(m2.diff(m1,‘minute‘));minute为分钟,可更改为秒或毫秒,具体看官方说明 查看详情

求如何用mysql实现计算上下两条记录的差

...置函数(聚合函数),它是统计组合的。5.分组之后,可以使用聚合函数执行一系列查询操作,询问每个类中有多少个查询操作。6.组后面跟着过滤器,如下所示。参考技术A方法挺多的,很多是采用排序直接对等连接,这样对于主... 查看详情

使用 ClickHouse 实现最终聚合值(不是状态)

】使用ClickHouse实现最终聚合值(不是状态)【英文标题】:Materializingthefinalaggregationvalue(notstate)withClickHouse【发布时间】:2021-09-3016:13:24【问题描述】:我想在ClickHouse中创建一个物化视图,用于存储聚合函数的最终结果。最佳实... 查看详情

mysql中使用sql计算两个日期时间差值

...时间的秒数差值进行判断。代码部分:  说明:数据库使用的是Mysql,持久层框架使用的是Mybatis。代码如下:  FLOOR((SUM(UNIX_TIMESTAMP(开始时间)-UNIX_TIMESTAMP(结束时间))/C 查看详情

计算两个时间之间的差值

...时分钟,然后还要将结果进行转换,实在是麻烦。这里我使用了NSCalendar类。下面来看一下具体的实现:首先先拿到当前时间,并转换成字符串:同样也将要比较的时间(我们这里将其称为开始时间)转换成相同格式的字符串:... 查看详情

小松教你手游开发unity实用技能线性差值计算实现(代码片段)

其实这个unity本身就有的函数Mathf.Lerp(),为什么还要自己实现呢。有一个原因就是这个函数返回的是float型,float型如果数字非常大,转出int时会有精度丢失,也就是转出来的值不对。而且非常简单。看下公式publicintLerp(inta,intb,int... 查看详情

两个个数相同的数组,通过交换数组内容,实现数组之和之间的差值最小(代码片段)

这个思路比较简单:对两个数组排序,计算数组和计算当前差值和较大的数组从前到后,较小的数组从后到前,比较数值的差值,如果差值的2倍小于数组和的差值就进行交换,直到数组和差值不再减小//简单的插入排序voidsort_arr... 查看详情

计算几何---曲面三角形差值公式

...线方向开始进行差值,即从三个端点值,以及留个且向量使用Hermite差值完成。  对于曲面三角形的任一条边,如上图所示。如果向量定点v0处的法向量n0没有给出通过标签<normal>给出,则通过计算v0点的两个边的切向量的叉... 查看详情

spark通过combinebykey算子实现条件性聚合的方法

...据,满足条件的记录进行聚合,不满足条件的则不聚合。使用spark处理这种计算场景时,想到了使用combineByKey算子,先将输入数据中的value映射成含一个元素的ArrayBuffer(scala中相当于java中的ArrayList),然后在聚合时对满足聚合条... 查看详情

使用 MongoDB 聚合框架计算一阶导数

】使用MongoDB聚合框架计算一阶导数【英文标题】:ComputefirstorderderivativewithMongoDBaggregationframework【发布时间】:2016-12-2119:26:58【问题描述】:是否可以使用聚合框架计算一阶导数?例如,我有数据:time_series:[10,20,40,70,110]我正在... 查看详情

如何使用聚合计算运行总数?

】如何使用聚合计算运行总数?【英文标题】:Howtocalculatetherunningtotalusingaggregate?【发布时间】:2015-03-1517:09:01【问题描述】:我正在开发一个简单的财务应用程序来跟踪收入和结果。为了简单起见,假设这些是我的一些文档:d... 查看详情

使用 MongoDB 聚合计算计数和平均值

】使用MongoDB聚合计算计数和平均值【英文标题】:CalculatingcountandaveragewithMongoDBaggregation【发布时间】:2012-10-1100:32:04【问题描述】:我有一个像这样的简单数据库布局:clientidsex(male/female)birthday(date)clientidsex(male/female)birthday(date)(... 查看详情

pandas使用groupby函数agg函数获取每个分组聚合对应的均值(mean)实战:计算分组聚合单数据列的均值计算分组聚合多数据列的均值

pandas使用groupby函数、agg函数获取每个分组聚合对应的均值(mean)实战:计算分组聚合单数据列的均值、计算分组聚合多数据列的均值 目录 查看详情

使用 Django 从数据库中选择用于聚合计算的日期格式

】使用Django从数据库中选择用于聚合计算的日期格式【英文标题】:SelectingdateformatsforaggregatecalculationsfromdatabasewithDjango【发布时间】:2010-10-1815:59:14【问题描述】:我想根据月份对日期时间字段进行聚合计算。我目前正在使用ext... 查看详情

使用lambda函数计算数据帧中2列中值之间的差值(代码片段)

...算abs,并在此数据框中将结果newcolumn添加为'pdiff'。应该使用lambda函数来完成。这就是我的工作(dat-我的数据框:f=lambdax,y:np.abs(x-y)dat['pdiff']=dat.loc[:,['price','pbeach']].apply(f,axis=1)怎么了?f=lambdax,y:np.abs(x-y)dat['pdiff']=dat.loc[:,['price','pb... 查看详情

使用 Group by 进行多次聚合计算

】使用Groupby进行多次聚合计算【英文标题】:MultipleaggregatecalculationsusingGroupby【发布时间】:2021-03-1223:23:30【问题描述】:我有一个数据集df1,我想在其中:根据对TotalB列的分组取TotalB列的平均值。然后我想取这个新列并减去空... 查看详情

r语言difftime函数计算时间差值实战

R语言difftime函数计算时间差值实战目录R语言difftime函数计算时间差值实战#基础语法#获取difftime语法帮助 查看详情

hivesql核心技能之窗口计算

...理解(计算三日留存、七日留存、三十日留存等方式可以使用这个函数。)3)对2018年每个月的近三个月进行移动的求平均支付金额用法:这三个 查看详情