关键词:
目 录
1. 环境准备和创建项目
1.1 软件准备及版本
Java(JDK) 1.8
Flink 1.3.0
IDEA
CentOS 7 Or MacOS
Scala 2.12
sfl4j 1.7.30
1.2 IDEA下创建Java项目FlinkTutorial
利用IDEA
创建Java
的Maven
项目FlinkTutorial
,创建项目时的一些参数填写;
<name>FlinkTutorial</name>
<groupId>com.rowyet</groupId>
<artifactId>FlinkTutorial</artifactId>
<version>1.0-SNAPSHOT</version>
最终的项目如图1.1;
输入的样例文件
:项目目录下新建文件夹input
,新建一个txt文件word.txt
,内容如下:
hello world
hello flink
hello java
hello rowyet
maven配置文件
:pom.xml内容如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.rowyet</groupId>
<artifactId>FlinkTutorial</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.13.0</flink.version>
<java.version>1.8</java.version>
<scala.banary.version>2.12</scala.banary.version>
<slf4j.version>1.7.30</slf4j.version>
</properties>
<dependencies>
<!--引入Flink相关的依赖-->
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>$flink.version</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_$scala.banary.version</artifactId>
<version>$flink.version</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_$scala.banary.version</artifactId>
<version>$flink.version</version>
</dependency>
<!--引入日志相关的依赖-->
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>$slf4j.version</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>$slf4j.version</version>
<type>pom</type>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-to-slf4j -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>$slf4j.version</version>
</dependency>
</dependencies>
</project>
log日志格式
:在resources
下新建日志文件log4j.propertries
,内容如下:
### 设置###
log4j.rootLogger = error,stdout
### 输出信息到控制抬 ###
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = %-4r [%t] %-5p %c %x -%m%n
- 最后,在
src/main/java
下新建Java
包com.rowyet.wc
,开始编写Flink的练手项目;
2. DataSet API 批处理实现word count
com.rowyet.wc
包下创建Java class文件BatchWorldCount
,内容如下:
package com.rowyet.wc;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class BatchWorldCount
public static void main(String[] args) throws Exception
// 1. 创建执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 2. 从文件中读取数据
DataSource<String> lineDataSource = env.readTextFile("input/word.txt");
// 3. 将每行数据进行分词,转换成二元组类型,利用java lambda表达式实现flatMap
FlatMapOperator<String, Tuple2<String, Long>> wordAndOneTuple = lineDataSource.flatMap((String line, Collector<Tuple2<String, Long>> out) ->
String[] words = line.split(" ");
for (String word : words)
out.collect(Tuple2.of(word, 1L));
).returns(Types.TUPLE(Types.STRING, Types.LONG));
// 4. 按照word进行分组,利用word的索引0,即第一个元素进行分组
UnsortedGrouping<Tuple2<String, Long>> wordAndOneGroup = wordAndOneTuple.groupBy(0);
// 5. 分组内进行聚合统计,根据word分组后的索引1,即第二个元素进行求和
AggregateOperator<Tuple2<String, Long>> sum = wordAndOneGroup.sum(1);
// 6. 打印结果
sum.print();
运行结果,如图2.1;
3. DataSet API VS DataStream API
在Flink 1.12
版本开始,官方就推荐使用DataSteam API
,在提交任务时只需要通过以下shell参数指定模式为BATCH
即可;
bin/flink run -Dexecution.runtime-mode=BATCH BatchWorldCount.jar
如此一来,DataSet API就已经处于软弃用(soft deprecated)
的状态,而且实际应用中只需要维护一套DataStream API即可,真正的向流批一体
迈进。
4. DataStream API 流处理实现word count
4.1 有界的流处理
com.rowyet.wc
包下创建Java class文件BoundedStreamWordCount
,内容如下:
package com.rowyet.wc;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class BoundedStreamWordCount
public static void main(String[] args) throws Exception
// 1. 创建流式的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 读取文件
DataStreamSource<String> lineDataStreamSource = env.readTextFile("input/word.txt");
// 3. 转化计算
SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = lineDataStreamSource.flatMap((String line, Collector<Tuple2<String, Long>> out) ->
String[] words = line.split(" ");
for (String word : words)
out.collect(Tuple2.of(word, 1L));
).returns(Types.TUPLE(Types.STRING, Types.LONG));
// 4. 分组
KeyedStream<Tuple2<String, Long>, String> wordAndOneKeyedStream = wordAndOneTuple.keyBy(data -> data.f0);
// 5. 求和
SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordAndOneKeyedStream.sum(1);
// 6. 打印
sum.print();
// 7. 启动执行
env.execute();
运行结果,如图2.2,发现跟之前图2.1的运行结果有些不一样,具体区别在哪呢?
- 数据出现无序了,而且是来一条处理一条,最终的结果才是准确的结果;
- 结果前面有一个序号,而且相同的word序号相同,这是因为
Flink
最终运行在分布式的集群
内,而这个序号是IDEA模拟分布式集群,代表你的CPU的核数的一个CPU序号,博主的CPU是8核的(可以理解为有CPU8个),所以序号不会大于8,以此类推自己的CPU总核数和运行结果,至于为什么相同的word序号是一样的,是因为相同的word作为分区的key,最终肯定要在同一个处理器上才可以进行后续的sum统计呢。
4.2 无界的流处理
这里利用linux的netcat
命令监听端口7777
的连续不断输入的word为例,实现无界的流处理word count的统计;
com.rowyet.wc
包下创建Java class文件StreamWordCount
,内容如下:
package com.rowyet.wc;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class StreamWordCount
public static void main(String[] args) throws Exception
// 1. 创建流式环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 读取文本流
// DataStreamSource<String> lineDataSource = env.socketTextStream("127.0.0.1", 7777); //测试可以写死参数
//生产中一般,通过main函数后接参数实现
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String host = parameterTool.get("host");
int port = parameterTool.getInt("port");
// 运行时在菜单栏Run—>Edit Configuration—>Program arguments文本框内填入 --host "127.0.0.1" --port 7777
DataStreamSource<String> lineDataSource = env.socketTextStream(host, port);
// 3. 转换处理
SingleOutputStreamOperator<Tuple2<String, Long>> wordOneTuple = lineDataSource.flatMap((String line, Collector<Tuple2<String, Long>> out) ->
String[] words = line.split(" ");
for (String word : words)
out.collect(Tuple2.of(word, 1L));
).returns(Types.TUPLE(Types.STRING, Types.LONG));
// 4. 分组
KeyedStream<Tuple2<String, Long>, String> wordAndOneKeyedStream = wordOneTuple.keyBy(data -> data.f0);
// 5. 求和
SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordAndOneKeyedStream.sum(1);
// 6. 输出
sum.print();
// 7. 启动执行
env.execute();
写完代码后接下来的操作顺序很重要,要注意!!!
写完代码后接下来的操作顺序很重要,要注意!!!
写完代码后接下来的操作顺序很重要,要注意!!!
- 在某一台Linux或者MacOS开启
netcat
命令监听本地7777
端口,博主的是本地的MacOS终端,指令是:
nc -lk 7777
# 回车启动,先不要输入内容
- 启动刚刚写好的Java Class文件
StreamWordCount
,暂时看不到任何东西,一直等待输出的空白输出框,如图2.3;
- 如图2.4,在步骤1的MacOS终端启动的
netcat
环境内输入一些聊天消息;
- 最中在IDEA的运行结果内会实时得到运算结果,如图2.5
5. 试炼项目代码链接
以上就是以经典的大数据word count统计为例,讲述传统Apache Flink DataSet API(批处理API)和新的流式DataStream API的两种实现,从代码动手开始揭开Apache Flink的神秘面纱,整个试炼项目代码链接如下:
apacheflink从入门到放弃——快速上手(java版)(代码片段)
目录1.环境准备和创建项目1.1软件准备及版本1.2IDEA下创建Java项目FlinkTutorial2.DataSetAPI批处理实现wordcount3.DataSetAPIVSDataStreamAPI4.DataStreamAPI流处理实现wordcount4.1有界的流处理4.2无界的流处理5.试炼项目代码链接1.环境准备和创建项目1.1... 查看详情
apacheflink从入门到放弃——flink简介(代码片段)
目录1.计算引擎的发展历史2.什么是Flink2.1概念2.2什么是有界的数据流和无界数据流?什么是状态?2.3Fink的历史2.4Flink的特点2.5Flink的应用2.6流批架构的演变2.7Flink的分层API3.FlinkVSSpark4.FlinkOrSpark?1.计算引擎的发展历史... 查看详情
初识pytorch:从安装到入门,从入门到放弃(代码片段)
目录PyTorch安装配置安装验证PyTorchPyTorch是Facebook团队于2017年1月发布的一个深度学习框架,虽然晚于TensorFlow,也没有TensorFlow火,但目前已经与TensorFlow奇虎相当。而且PyTorch采用了Python语言的接口,可以说它才是Pytho... 查看详情
《java从入门到放弃》javase入门篇:集合
今天来讲讲Java中的集合和常见集合类型的使用。什么是集合呢?刚好最近学校里面军训,只听到教官一声喊:“集合!!!”各位小萌新们就屁颠屁颠的跑过来排列整齐了,这就是集合···650)this.width=650;"src="https://img.baidu.com/hi/... 查看详情
《java从入门到放弃》入门篇:hibernate查询——hql
不知不觉又到了hibernate的最后一篇了,只感觉时光飞逝~,岁月如梭~!转眼之间,我们就···························,好吧,想装个X,结果装不下去了,还是直接开始吧·650)this.width=650;"src="https://img.baidu.com/hi/jx2/j_00... 查看详情
云开发系列课程让你从入门到精通快速上手serverless和云开发技术
简介:云开发系列课程主要介绍了从入门到精通快速上手Serverless和云开发技术。学习内容涵盖云开发协同、云函数、云数据库、多媒体托管、前后端一体化框架等ServerlessWeb开发必备知识。希望通过云开发系列课程的学习与... 查看详情
初识pytorch:从安装到入门,从入门到放弃(代码片段)
目录PyTorch安装配置安装验证PyTorchPyTorch是Facebook团队于2017年1月发布的一个深度学习框架,虽然晚于TensorFlow,也没有TensorFlow火,但目前已经与TensorFlow奇虎相当。而且PyTorch采用了Python语言的接口,可以说它才是Pytho... 查看详情
spark从入门到上手实战
Spark从入门到上手实战课程学习地址:http://www.xuetuwuyou.com/course/186课程出自学途无忧网:http://www.xuetuwuyou.com讲师:轩宇老师课程简介:Spark属于新起的基于内存处理海量数据的框架,由于其快速被众公司所青睐。Spark生态栈框架... 查看详情
shiro从入门到放弃
ApacheShiro是Java的一个安全框架。目前,使用ApacheShiro的人越来越多,因为它相当简单,对比SpringSecurity,可能没有SpringSecurity做的功能强大,但是在实际工作时可能并不需要那么复杂的东西,所以使用小而简单的Shiro就足够了。对... 查看详情
《java从入门到放弃》入门篇:struts2的常用验证方式
感觉过了一个周末,人都懒得不要不要的,今天就来点简单的内容吧--,各位看官如果欲求不满的话,可以自行解决或再去宠幸其他“勃主”···650)this.width=650;"src="https://img.baidu.com/hi/jx2/j_0036.gif"alt="j_0036.gif"/>struts2的验证方式主... 查看详情
《java从入门到放弃》入门篇:hibernate中的多表对应关系
hibernate中的对应关系其实就是数据库中表的对应关系,就跟某些电影中的某些场景是一样一样滴。比如可以是一男一女,还可以是一男多女,更可以是多男一女,最后最后最后还可以是多男多女!!!650)this.width=650;"src="https://img... 查看详情
ldap从入门到放弃
OpenLDAP快速入门 一、Ldap简介 1、目录服务 目录是一个为查询、浏览和搜索而优化的专业分布式数据库,它呈树状结构组织数据,就好象Linux/Unix系统中的文件目录一样。 目录服务的组成: 1、目录... 查看详情
qml从入门到放弃第二卷
第二卷如何更快速的放弃,注重的是C++和QML的交互<1>记事本。。 (1)先测试下不在QML创建C++对象,仅仅在main.cpp添加一个属性函数供调用.TextStreamLoader.h#ifndefTEXTSTREAMLOADER_H#defineTEXTSTREAMLOADER_H#include<QObject>#include<QTextStr 查看详情
rn从上手到“放弃”(代码片段)
RN从上手到“放弃”前言:react-native,相对于最近??的飞起的flutter,不算是一个新技术,2015年Facebook开源,到现在已经45个年头,一直在维护当中,但是至今未发布v1版本,目前已经更新到0.59。该技术目标:跨平台实现原生应用... 查看详情
一篇搞懂springcloudgateway(从入门到放弃)(代码片段)
首先理解什么是网关传统的单体应用中,所有的请求都是由一个应用来处理的,所以没有网关的这个概念。但在微服务架构中,每个服务都有对外提供服务的地址和端口,都有一些共同的功能,比如:权... 查看详情
vue从入门到放弃(代码片段)
vue介绍Vue(读音/vjuː/,类似于view)是一套用于构建用户界面的渐进式框架。与其它大型框架不同的是,Vue被设计为可以自底向上逐层应用。Vue的核心库只关注视图层,不仅易于上手,还便于与第三方库或既有项目整... 查看详情
如何学好spark大数据-从入门到上手
ApacheSpark是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UCBerkeleyAMPlab(加州大学伯克利分校的AMP实验室)所开源的类HadoopMapReduce的通用并行框架,Spark,拥有HadoopMapReduce所具有的优点;但不同于MapReduce的是Job中间输出结... 查看详情
《java从入门到放弃》javase入门篇:练习——单身狗租赁系统
今天,我们要玩个大的!!!我们把之前使用数组做的这个单身狗系统改版成数据库版本,并且使用面向对象里面的一些简单思想。如果有不知道这个系统的看官,请跳转到目录页,然后再选择单身狗系统(数组版)先围观五分钟... 查看详情