flink流计算随笔(代码片段)

author author     2023-01-18     260

关键词:

Windows
聚合事件(例如计数、和)在流上的工作方式与批处理不同。例如,不可能计算流中的所有元素,因为流通常是无限的(×××的)。相反,流上的聚合(计数、和等)是由窗口 windows限定作用域的,例如“过去5分钟的计数”或“最后100个元素的总和”。

Windows可以是时间驱动(示例:每30秒)或数据驱动(示例:每100个元素)。一个典型的方法是区分不同类型的窗口,比如翻筋斗窗口(没有重叠)、滑动窗口(有重叠)和会话窗口(中间有一个不活跃的间隙)。

Time
当提到流程序中的时间(例如定义窗口)时,可以指不同的时间概念:

事件时间Event Time 是创建事件的时间。它通常由事件中的时间戳描述,例如由生产传感器或生产服务附加的时间戳。Flink通过timestamp assigners(时间戳指定人)访问事件时间戳。

摄入时间Ingestion time 是事件进入源操作符的Flink数据流的时间。

处理时间Processing Time是执行基于时间的操作的每个操作符的本地时间。

有状态操作(Stateful Operations)
虽然数据流中的许多操作一次只查看一个单独的事件(例如事件解析器),但是一些操作记住了跨多个事件的信息(例如窗口操作符)。这些操作称为有状态操作。

有状态操作的状态被维护在可以认为是嵌入式键/值存储中。状态与有状态操作符读取的流一起被严格地分区和分布。因此,在keyBy()函数之后,只能在键控流上访问键/值状态,并且只能访问与当前事件的键相关联的值。对齐流和状态的键确保所有的状态更新都是本地操作,保证一致性而不增加事务开销。这种对齐还允许Flink透明地重新分配状态和调整流分区。
容错检查点Checkpoints for Fault Tolerance
Flink通过流回放和检查点的组合实现了容错。检查点与每个输入流中的特定点以及每个操作符的对应状态相关。通过恢复操作符的状态并从检查点重新播放事件,流数据流可以在检查点恢复,同时保持一致性(准确地说是一次处理语义)。

检查点间隔是在执行期间用恢复时间(需要重放的事件数量)来权衡容错开销的一种方法。

关于容错的内部描述提供了关于Flink如何管理检查点和相关主题的更多信息。有关启用和配置检查点的详细信息在检查点API文档中。

批处理流Batch on Streamin
Flink执行批处理程序作为流程序的特殊情况,其中流是有界的(有限的元素数量)。数据集在内部被视为数据流。因此,上述概念同样适用于批处理程序,也适用于流程序,但有少数例外:

批处理程序的容错不使用检查点。恢复通过完全重放流来实现。这是可能的,因为输入是有界的。这将使成本更多地用于恢复,但使常规处理更便宜,因为它避免了检查点。

数据集API中的有状态操作使用简化的内存/核心外数据结构,而不是键/值索引。

DataSet API引入了特殊的synchronized(基于超步的)迭代,这只能在有界的流上实现。
Flink中的DataStream程序是在数据流上实现转换的常规程序(例如,过滤、更新状态、定义窗口、聚合)。数据流最初是从各种来源(例如,消息队列、套接字流、文件)创建的。结果通过sink返回,它可以将数据写入文件或写入标准输出(例如命令行终端)。Flink程序在各种上下文中运行,独立运行,或嵌入到其他程序中。执行可以在本地JVM中执行,也可以在许多机器的集群中执行。
下面的程序是一个完整的流窗口单词计数应用程序的工作示例,它在5秒的窗口中对来自web套接字的单词进行计数。您可以复制并粘贴代码在本地运行它。

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object WindowWordCount 
  def main(args: Array[String]) 

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val text = env.socketTextStream("localhost", 9999)

    val counts = text.flatMap  _.toLowerCase.split("\W+") filter  _.nonEmpty  
      .map  (_, 1) 
      .keyBy(0)
      .timeWindow(Time.seconds(5))
      .sum(1)

    counts.print()

    env.execute("Window Stream WordCount")
  

要运行示例程序,首先从终端启动netcat的输入流:
只需键入一些单词,然后按下回车键就可以得到一个新词。这些将是单词计数程序的输入。如果你想看到数大于1,输入相同的单词一遍又一遍地在5秒(如果你不能快速敲键盘,增加窗口大小的5秒内?)

Table API & SQL
Apache Flink具有两个用于统一流和批处理的关系API——Table API和SQL。Table API是Scala和Java的语言集成查询API,允许从关系操作符(如选择、筛选和以非常直观的方式连接)中组合查询。Flink的SQL支持基于实现SQL标准的Apache Calcite。无论输入是批输入(数据集)还是流输入(DataStream),任何接口中指定的查询都具有相同的语义并指定相同的结果。

Table API和SQL接口以及Flink的DataStream和DataSet API紧密集成在一起。您可以很容易地在所有api和基于这些api的库之间切换。例如,您可以使用CEP库从数据流中提取模式,然后使用表API分析模式,或者在对预处理数据运行Gelly图形算法之前,您可以使用SQL查询扫描、过滤和聚合批处理表。

请注意,Table API和SQL的特性还不完整,正在积极开发中。不是所有的操作都被[Table API, SQL]和[stream, batch]输入的每个组合所支持。

SQL标准的Apache Calcite

statement:
      setStatement
  |   resetStatement
  |   explain
  |   describe
  |   insert
  |   update
  |   merge
  |   delete
  |   query

setStatement:
      [ ALTER ( SYSTEM | SESSION ) ] SET identifier ‘=‘ expression

resetStatement:
      [ ALTER ( SYSTEM | SESSION ) ] RESET identifier
  |   [ ALTER ( SYSTEM | SESSION ) ] RESET ALL

explain:
      EXPLAIN PLAN
      [ WITH TYPE | WITH IMPLEMENTATION | WITHOUT IMPLEMENTATION ]
      [ EXCLUDING ATTRIBUTES | INCLUDING [ ALL ] ATTRIBUTES ]
      [ AS JSON | AS XML ]
      FOR ( query | insert | update | merge | delete )

describe:
      DESCRIBE DATABASE databaseName
   |  DESCRIBE CATALOG [ databaseName . ] catalogName
   |  DESCRIBE SCHEMA [ [ databaseName . ] catalogName ] . schemaName
   |  DESCRIBE [ TABLE ] [ [ [ databaseName . ] catalogName . ] schemaName . ] tableName [ columnName ]
   |  DESCRIBE [ STATEMENT ] ( query | insert | update | merge | delete )

insert:
      ( INSERT | UPSERT ) INTO tablePrimary
      [ ‘(‘ column [, column ]* ‘)‘ ]
      query

update:
      UPDATE tablePrimary
      SET assign [, assign ]*
      [ WHERE booleanExpression ]

assign:
      identifier ‘=‘ expression

merge:
      MERGE INTO tablePrimary [ [ AS ] alias ]
      USING tablePrimary
      ON booleanExpression
      [ WHEN MATCHED THEN UPDATE SET assign [, assign ]* ]
      [ WHEN NOT MATCHED THEN INSERT VALUES ‘(‘ value [ , value ]* ‘)‘ ]

delete:
      DELETE FROM tablePrimary [ [ AS ] alias ]
      [ WHERE booleanExpression ]

query:
      values
  |   WITH withItem [ , withItem ]* query
  |   
          select
      |   selectWithoutFrom
      |   query UNION [ ALL | DISTINCT ] query
      |   query EXCEPT [ ALL | DISTINCT ] query
      |   query MINUS [ ALL | DISTINCT ] query
      |   query INTERSECT [ ALL | DISTINCT ] query
      
      [ ORDER BY orderItem [, orderItem ]* ]
      [ LIMIT [ start, ]  count | ALL  ]
      [ OFFSET start  ROW | ROWS  ]
      [ FETCH  FIRST | NEXT  [ count ]  ROW | ROWS  ONLY ]

withItem:
      name
      [ ‘(‘ column [, column ]* ‘)‘ ]
      AS ‘(‘ query ‘)‘

orderItem:
      expression [ ASC | DESC ] [ NULLS FIRST | NULLS LAST ]

select:
      SELECT [ STREAM ] [ ALL | DISTINCT ]
           * | projectItem [, projectItem ]* 
      FROM tableExpression
      [ WHERE booleanExpression ]
      [ GROUP BY  groupItem [, groupItem ]*  ]
      [ HAVING booleanExpression ]
      [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]

selectWithoutFrom:
      SELECT [ ALL | DISTINCT ]
           * | projectItem [, projectItem ]* 

projectItem:
      expression [ [ AS ] columnAlias ]
  |   tableAlias . *

tableExpression:
      tableReference [, tableReference ]*
  |   tableExpression [ NATURAL ] [ ( LEFT | RIGHT | FULL ) [ OUTER ] ] JOIN tableExpression [ joinCondition ]
  |   tableExpression CROSS JOIN tableExpression
  |   tableExpression [ CROSS | OUTER ] APPLY tableExpression

joinCondition:
      ON booleanExpression
  |   USING ‘(‘ column [, column ]* ‘)‘

tableReference:
      tablePrimary
      [ matchRecognize ]
      [ [ AS ] alias [ ‘(‘ columnAlias [, columnAlias ]* ‘)‘ ] ]

tablePrimary:
      [ [ catalogName . ] schemaName . ] tableName
      ‘(‘ TABLE [ [ catalogName . ] schemaName . ] tableName ‘)‘
  |   tablePrimary [ EXTEND ] ‘(‘ columnDecl [, columnDecl ]* ‘)‘
  |   [ LATERAL ] ‘(‘ query ‘)‘
  |   UNNEST ‘(‘ expression ‘)‘ [ WITH ORDINALITY ]
  |   [ LATERAL ] TABLE ‘(‘ [ SPECIFIC ] functionName ‘(‘ expression [, expression ]* ‘)‘ ‘)‘

columnDecl:
      column type [ NOT NULL ]

values:
      VALUES expression [, expression ]*

groupItem:
      expression
  |   ‘(‘ ‘)‘
  |   ‘(‘ expression [, expression ]* ‘)‘
  |   CUBE ‘(‘ expression [, expression ]* ‘)‘
  |   ROLLUP ‘(‘ expression [, expression ]* ‘)‘
  |   GROUPING SETS ‘(‘ groupItem [, groupItem ]* ‘)‘

windowRef:
      windowName
  |   windowSpec

windowSpec:
      [ windowName ]
      ‘(‘
      [ ORDER BY orderItem [, orderItem ]* ]
      [ PARTITION BY expression [, expression ]* ]
      [
          RANGE numericOrIntervalExpression  PRECEDING | FOLLOWING 
      |   ROWS numericExpression  PRECEDING | FOLLOWING 
      ]
      ‘)‘

在insert中,如果insert或UPSERT语句没有指定目标列的列表,查询的列数必须与目标表相同,除非是在某些一致性级别。

在merge中,至少有一个匹配时和未匹配时的子句必须出现。

tablePrimary可能只包含特定符合性级别的扩展子句;在这些相同的一致性级别中,insert中的任何列都可以被columnDecl替换,其效果类似于将其包含在EXTEND子句中。

在orderItem中,如果表达式是正整数n,它表示SELECT子句中的第n项。

在查询中,count和start可以是无符号整型字面值,也可以是值为整型的动态参数。
aggregate聚合查询是包含GROUP BY或HAVING子句或SELECT子句中的聚合函数的查询。在SELECT中,具有和ORDER BY子句的聚合查询中,所有表达式都必须是当前组中的常量(即,按照group BY子句或常量的定义对常量进行分组)、聚合函数或常量与聚合函数的组合。聚合和分组函数只能出现在聚合查询中,而且只能出现在SELECT、HAVING或ORDER BY子句中。
标量子查询是用作表达式的子查询。如果子查询不返回行,则该值为空;如果它返回多个行,则为错误。

IN、EXISTS和scalar子查询可以出现在表达式的任何地方(例如SELECT子句、where子句、ON子句连接或聚合函数的参数)。

一个IN、EXISTS或scalar子查询可能相互关联;也就是说,它可以引用包含查询的FROM子句中的表。

selectWithoutFrom等价于值,但不是标准SQL,只允许在某些符合级别中使用。

MINUS相当于EXCEPT,但不是标准SQL,只允许在某些一致性级别上使用。

交叉应用和外部应用只允许在某些符合级别。

“限制开始,计数”相当于“限制计数偏移开始”,但只允许在某些符合级别。“LIMIT start, count” is equivalent to “LIMIT count OFFSET start” but is only allowed in certain conformance levels.

flink流计算随笔(代码片段)

...数、和)在流上的工作方式与批处理不同。例如,不可能计算流中的所有元素,因为流通常是无限的(×××的)。相反,流上的聚合(计数、和等)是由窗口windows限定作用域的,例如“过去5分钟的计数”或“最后100个元素的总和”。Wi... 查看详情

flink流处理随笔(上)(代码片段)

文章目录Flink基本处理流程(上)数据读取直接读取文件从列表当中读取文件从socket读取网络数据从Kafka读取数据addSource自定义数据源数据处理map与flatmap的区别过滤分组处理(滚动聚合)reduce使用Flink基本处理流程... 查看详情

flink流处理随笔(下)(代码片段)

文章目录Flink流处理(下)流的切分split分流getSideOutput进行分流合流connect合流union联合Flink流处理(下)流的切分也就是分流操作,这个对应的是一套方法,分流合流。每一个方法又对应了两个步骤。然后... 查看详情

flink流计算随笔

StatefulComputationsoverDataStreams(在数据流的有状态计算)ApacheFlink是一个用于分布式流和批处理数据的开源平台。Flink的核心是一个流数据流引擎,它为数据流上的分布式计算提供数据分布、通信和容错能力。Flink在流引擎之上构建批... 查看详情

flink流计算随笔

...KafkaStream、Storm等,为什么阿里会选择Flink作为新一代流式计算引擎?前期经过了哪些调研和对比?大沙:我们是2015年开始调研新一代流计算引擎的。我们当时的目标就是要设计一款低延迟、exactlyonce、流和批统一的,能够支撑足... 查看详情

flink系列窗口随笔(代码片段)

...操作。(先分组的原因:分组数据流将你的window计算通过多任务并发执行ÿ 查看详情

flink流计算随笔

Flink中的程序本质上是并行的和分布式的。在执行期间,流有一个或多个流分区,每个操作符有一个或多个操作符子任务。操作符子任务相互独立,在不同的线程中执行,可能在不同的机器或容器上执行。运算符子任务的数量是... 查看详情

flink学习随笔(项目结构预览&hellodome)(代码片段)

文章目录环境准备阶段FLINK的HELLOWORLD块处理流处理基于有界数据的流处理无界的流处理对比优化环境系统:ubuntu20java:open-java11(为了支持vscode插件)IDE:IDEA2021.2设备:DELLG5-55908xinter16GBRAM准备阶段1.打开IDEA创建MAVEN... 查看详情

flink+kafka实现wordcount实时计算(代码片段)

1.FlinkFlink介绍:Flink是一个针对流数据和批数据的分布式处理引擎。它主要是由Java代码实现。目前主要还是依靠开源社区的贡献而发展。对Flink而言,其所要处理的主要场景就是流数据,批数据只是流数据的一个极限特例而已。... 查看详情

什么是flink(流处理框架)(代码片段)

...柏林工业大学的一个研究性项目。早期,Flink是做Batch计算的,但是在2014年,StratoSphere里面的核心成员孵化出Flink,同年将Flink捐赠Apache,并在后来成为Apache的顶级大数据项目,同时Flink计算的主流方向被定... 查看详情

大数据(9d)flink流处理核心编程练习:计算pv和uv(代码片段)

文章目录概述数据样本代码pom.xmllog4j.propertiesPV计算UV计算UV计算优化:使用键控状态概述本地开发环境(WIN10+IDEA)(本文代码可作为Flink之Transform练习模板,在#####################################之间修改业务逻辑&... 查看详情

大数据(9d)flink流处理核心编程练习-计算pv和uv(代码片段)

文章目录概述数据样本代码pom.xmllog4j.propertiesPV计算UV计算概述本地开发环境(WIN10+IDEA)(本文代码可作为Flink之Transform练习模板,在#####################################之间修改业务逻辑)计算PV:每个页面的... 查看详情

flink基础入门(含案例)(代码片段)

...gt;DAG框架(tez)--->Spark流批处理框架,内存计算(伪实时)-->flink流批处理,内存计算(真正的实时计算)flinkvsspark什么是flinkflink是一个分布式,高性能,随时可用的以及准确的流处理... 查看详情

flink学习(代码片段)

...eFlink是一个面向分布式数据流处理和批量数据处理的开源计算平台,它能够基于同一个Flink运行时,提供支持流处理和批处理两种类型应用的功能。现有的开源计算方案,会把流处理和批处理作为两种不同的应用类型,因为它们... 查看详情

flink基于1.15.2的java开发-实时流计算商品销售热榜(代码片段)

需求每一个商品被卖出去一条就以以下格式通过kafka发送过来,只对status=101的productId进行统计:#格式:productId,statusCodea1001,101a1001,102a1001,101a1003,101a1002,101假设每过60s有上述内容被发送过来,那么flink应该会形成... 查看详情

apache-flink深度解析-state(代码片段)

摘要:实际问题在流计算场景中,数据会源源不断的流入ApacheFlink系统,每条数据进入ApacheFlink系统都会触发计算。如果我们想进行一个Count聚合计算,那么每次触发计算是将历史上所有流入的数据重新新计算一次,还是每次计算... 查看详情

flink流式计算从入门到实战一(代码片段)

文章目录一、理解Flink与流计算1、初识Flink2、Flink的适用场景3、流式计算梳理二、Flink安装部署1、Flink的部署方式2、获取Flink3、实验环境与前置软件4、集群搭建5、Standalone模式启动6、Yarn模式提交任务6.1、首先在yarn上启动yarn-sess... 查看详情

实时监控:基于流计算oceanus(flink)实现系统和应用级实时监控(代码片段)

作者:吴云涛,腾讯 CSIG 高级工程师本文描述了如何使用腾讯云大数据组件来完成实时监控系统的设计和实现,通过实时采集并分析云服务器(CVM)及其App应用的CPU和内存等资源消耗数据,以短信、电话... 查看详情