一文带你入门flinksql(代码片段)

犀牛饲养员 犀牛饲养员     2022-12-15     274

关键词:

一文带你入门flink sql

写在前面

本次实战主要是通过Flink SQL Client消费kafka的实时消息,再用各种SQL操作对数据进行查询统计。

环境准备

具体的环境安装过程就不在这里写了,网上很多资料,大家自己查阅按照就好了。我说下我本地的环境:

  • flink 1.12.4
  • mysql 8.0.25
  • kafka 2.8.0

另外就是,本次示例需要用到以下几个jar包:

flink-sql-connector-kafka_2.11-1.12.4.jar
flink-connector-jdbc_2.11-1.12.4.jar
mysql-connector-java-5.1.48.jar

把他们拷贝到flink安装目录lib目录下。

flink输出的结果,会落到一张mysql的表,也就是我们的sink表,这个表要提前建好。

CREATE TABLE `pvuv_sink` (
  `dt` varchar(100) DEFAULT NULL,
  `pv` bigint DEFAULT NULL,
  `uv` bigint DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3

三个字段分别表示时间,pv值和uv值。

正文

先启动flink以及flink sql的客户端。

$ ./bin/start-cluster.sh
$ .bin/sql-client.sh embedded

这样就开启了一个sql client的客户端。

接着在客户端执行下面这段sql,这相当于启动了一个source table进行监听我们的输入数据流。

CREATE TABLE user_log (
     user_id VARCHAR,
     item_id VARCHAR,
     category_id VARCHAR,
     behavior VARCHAR,
     ts TIMESTAMP(3)
 ) WITH (
     'connector.type' = 'kafka', -- 使用 kafka connector
     'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本
     'connector.topic' = 'user',  -- kafka topic
     'connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取
     'connector.properties.0.key' = 'zookeeper.connect',  -- 连接信息
     'connector.properties.0.value' = 'localhost:2181',
     'connector.properties.1.key' = 'bootstrap.servers',
     'connector.properties.1.value' = 'localhost:9092',
     'update-mode' = 'append',
     'format.type' = 'json',  -- 数据源格式为 json
     'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则
 );

执行成功的话,会返回:

[INFO] Table has been created

解释下这段sql,flink会帮我们创建一张表,这个表的数据来源于kafka的消息,对应的topic是user,数据的格式是json。其它的信息都好理解,不做过多解释了。执行成功后,就开启监听了。

我们可以select下,看看表的情况:

因为还没有输入数据,所以表是空的。

然后执行sink sql,也就是输出数据的表,这个表前面我们提前建好了,在flink sql这里配置下:

CREATE TABLE pvuv_sink (
    dt VARCHAR,
    pv BIGINT,
    uv BIGINT
) WITH (
    'connector.type' = 'jdbc',
    'connector.url' = 'jdbc:mysql://localhost:3306/flink-test',
    'connector.table' = 'pvuv_sink',
    'connector.username' = 'root',
    'connector.password' = '11111111',
    'connector.write.flush.max-rows' = '1'
);

然后编写计算逻辑,逻辑比较简单,统计每个小时的pv和uv。

INSERT INTO pvuv_sink(dt, pv, uv)
SELECT
  DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt,
  COUNT(*) AS pv,
  COUNT(DISTINCT user_id) AS uv
FROM user_log
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00');

执行后,flink就会启动一个job在后台执行。

我们可以通过

http://localhost:8081/#/overview

这个地址看到任务的详细情况。

然后我们在本地启动一个kafka的服务,然后再启动一个producer模拟发送数据。

kafka是基于zookeeper的,启动kafka之前,需要先启动zookeeper

/usr/local/Cellar/kafka/2.8.0/bin/zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties &

启动kafka

/usr/local/Cellar/kafka/2.8.0/bin/kafka-server-start /usr/local/etc/kafka/server.properties &

查看启动是否成功

创建topic,注意和上面source table的配置保持一致。

kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic user

启动一个控制台的生产者,

kafka-console-producer --broker-list localhost:9092 --topic user

发送两条消息试试:

"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"
"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"

去mysql看下pvuv_sink表,发现已经有数据了。

遇到的一些问题

在运行flink sql的时候踩过一些坑,这里列举下帮大家避坑。

错误一

java.lang.NoSuchMethodError: 'boolean org.apache.flink.table.api.TableColumn.isGenerated()'

这个是因为flink-jdbc的版本搞错了导致的。

错误二

Flink SQL> INSERT INTO pvuv_sink(dt, pv, uv)
> SELECT
>   DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt,
>   COUNT(*) AS pv,
>   COUNT(DISTINCT user_id) AS uv
> FROM user_log
> GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00');
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer

这个是因为我一开始用错了lib,应该是

flink-sql-connector-kafka_2.11-1.12.4.jar

而不是

flink-connector-kafka_2.12-1.12.4.jar

错误三

[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassCastException: class org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.MissingNode cannot be cast to class org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode (org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.MissingNode and org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode are in unnamed module of loader 'app')

参考

  • https://blog.csdn.net/boling_cavalry/article/details/106038219
  • https://issues.apache.org/jira/browse/FLINK-19995

英雄哪里出来一文带你吃透算法(代码片段)

文章目录前言一、语言基础1、「光天化日学C语言」二、刷题必读1、「LeetCode零基础指南」三、语言入门1、「C语言入门100例」四、算法入门1、「算法零基础100讲」五、算法进阶1、「画解数据结构」2、「算法进阶50讲」3、「LeetC... 查看详情

一文快速入门docker(代码片段)

Docker提供一种安全、可重复的环境中自动部署软件的方式,拉开了基于与计算平台发展方式的变革序幕。如今Docker在互联网公司使用已经非常普遍。本文用十分钟时间,带你快速入门Docker。Docker是什么Docker是什么?官网首页的介... 查看详情

小白都能看懂的sql零基础入门,一文带你轻松学会增删改查!(代码片段)

作者简介作者:LuciferLiu,中国DBA联盟(ACDU)成员。目前从事OracleDBA工作,曾从事Oracle数据库开发工作,主要服务于生产制造,汽车金融等行业。现拥有OracleOCP,OceanBaseOBCA认证,擅长Oracle数据库运维开发&#... 查看详情

还不会ts?一文带你打开ts的大门(代码片段)

一文带你打开ts的大门序言一、什么是TypeScript?1、编程语言的类型2、TypeScript究竟是什么?二、为什么要学习TypeScript?1、程序更容易理解2、效率更高3、更少的错误4、非常好的包容性5、一点小缺点三、typescript入门1... 查看详情

多个岗位需要的sql语言你掌握了吗?简单例子+详细代码带你一文掌握(代码片段)

...要的sql语言你掌握了吗?简单例子+详细代码带你一文掌握带你熟练掌握–进阶版:什么样的程度才是“熟练掌握sql”–MySQL进阶版简单例子+详细代码带你一文掌握关于数据库三大范式关于SQL语言SQL语句分类DDL(... 查看详情

web前端一文带你吃透html(上篇)(代码片段)

前端学习路线小总结:基础入门:HTMLCSSJavaScript三大主流框架:VUEREACTAngular深入学习:小程序NodejQueryTypeScript前端工程化🍁开始前端之旅吧!一.HTML简介1.什么是HTML?2.HTML标签3.HTML元素4.HTML版本5.Web浏览器6.HTM... 查看详情

一文带你掌握java开发利器:maven(代码片段)

Maven如果作为一个Java程序员,那么在日常的开发过程中,maven是很常见的项目构建工具。maven可以极大的提高我们的开发效率,帮助我们简化开发过程中一些解决依赖和项目部署的相关问题,所以学习掌握maven的相... 查看详情

一文读懂:python爬虫超详细讲解带你实战爬知乎(零基础入门,男女老少都看的懂)(代码片段)

爬虫的基本流程网络爬虫的基本工作流程如下:首先选取一部分精心挑选的种子URL将种子URL加入任务队列从待抓取URL队列中取出待抓取的URL,解析DNS,并且得到主机的ip,并将URL对应的网页下载下来,存储进已... 查看详情

英雄哪里出来一文带你吃透算法(代码片段)

文章目录前言一、语言基础1、「光天化日学C语言」二、刷题必读1、「LeetCode零基础指南」三、语言入门1、「C语言入门100例」四、算法入门1、「算法零基础100讲」五、算法进阶1、「画解数据结构」2、「算法进阶50讲」3、「LeetC... 查看详情

spring入门到精通,一文带你轻松搞定spring!

Spring是一个开放源代码的设计层面框架,他解决的是业务逻辑层和其他各层的松耦合问题,因此它将面向接口的编程思想贯穿整个系统应用。本文章将深入浅出讲解Spring的核心技术IoC、AOP,剖析框架的源代码。让大家快速掌握框... 查看详情

多图:一文带你入门掌握jvm所有知识点(代码片段)

本JVM系列属于本人学习过程当中总结的一些知识点,目的是想让读者更快地掌握JVM相关的知识要点,难免会有所侧重,若想要更加系统更加详细的学习JVM知识,还是需要去阅读专业的书籍和文档。本文主题内容:JVM内存区域概览... 查看详情

一文带你搭建rocketmq源码调试环境(代码片段)

tothetargetVM,address:\'127.0.0.1:52279\',transport:\'socket\'Thebroker[broker-a,192.168.10.197:10911]bootsuccess.serializeType=JSONandnameserveris127.0.0.1:98764.3查看启动日志另外我们到logs目录看下启动的详细日志,打开broker. 查看详情

python爬虫-35-scrapy实操入门,一文带你入门,保姆级教程

1、安装​​scrapy​​相关组件pipinstallscrapy-ihttps://pypi.tuna.tsinghua.edu.cn/simple如果在​​windows​​​系统下,提示这个错误​​ModuleNotFoundError:Nomodulenamedwin32api​​​,那么使用以下命令可以解决:​​pipinstallpypiwin32​​。2、创... 查看详情

node进阶一文带你快速入门koa框架(代码片段)

✅作者简介:一名普通本科大三的学生,致力于提高前端开发能力✨个人主页:前端小白在前进的主页🔥系列专栏:node.js学习专栏⭐️个人社区:个人交流社区🍀学习格言:☀️打不倒你的会使你更强!... 查看详情

一文带你吃透java中的继承(代码片段)

继承继承的概念继承的格式定义父类的格式:(一个普通的类定义)publicclass父类名称 //...定义子类的格式:publicclass子类名称extends父类名称 //...举例配合理解:继承中成员变量的访问特点举例配合理解:区分子类方法中重名的三种变... 查看详情

一文带你认识springaop(代码片段)

SpringAOP简介AOP(Aspect-OrientedProgramming:面向切面编程)是对OOP(Object-OrientedProgramming:面向对象编程)的补充和完善。OOP引入封装、继承和多态等概念来建立一种对象层次结构,用于模拟公共行为的一个集合。封装就要求将功能分散到... 查看详情

一文带你入门javastream流,太强了

两个星期以前,就有读者强烈要求我写一篇JavaStream流的文章,我说市面上不是已经有很多了吗,结果你猜他怎么说:“就想看你写的啊!”你看你看,多么苍白的喜欢啊。那就“勉为其难”写一篇吧,嘻嘻。 &nbs... 查看详情

一文带你快速掌握fastjson的使用(代码片段)

1.FastJson序列化API方法:JSON.toJSONString序列化:是指将Java对象转成json格式字符串的过程。JavaBean对象、List集合对象、Map集合为应用最广泛的。1.1序列化Java对象Java中的Student对象序列化为JSON格式字符串@TestpublicvoidobjectToJson()Studentstude... 查看详情