apacheflink从入门到放弃——快速上手(java版)(代码片段)

╭⌒若隐_RowYet——大数据 ╭⌒若隐_RowYet——大数据     2022-11-04     149

关键词:

目 录

1. 环境准备和创建项目

1.1 软件准备及版本

  • Java(JDK) 1.8
  • Flink 1.3.0
  • IDEA
  • CentOS 7 Or MacOS
  • Scala 2.12
  • sfl4j 1.7.30

1.2 IDEA下创建Java项目FlinkTutorial

  利用IDEA创建JavaMaven项目FlinkTutorial,创建项目时的一些参数填写;

    <name>FlinkTutorial</name>
    <groupId>com.rowyet</groupId>
    <artifactId>FlinkTutorial</artifactId>
    <version>1.0-SNAPSHOT</version>

  最终的项目如图1.1;

图1.1 项目树结构
  • 输入的样例文件:项目目录下新建文件夹input,新建一个txt文件word.txt,内容如下:
hello world
hello flink
hello java
hello rowyet
  • maven配置文件:pom.xml内容如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.rowyet</groupId>
    <artifactId>FlinkTutorial</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.13.0</flink.version>
        <java.version>1.8</java.version>
        <scala.banary.version>2.12</scala.banary.version>
        <slf4j.version>1.7.30</slf4j.version>
    </properties>

    <dependencies>
        <!--引入Flink相关的依赖-->
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>$flink.version</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_$scala.banary.version</artifactId>
            <version>$flink.version</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_$scala.banary.version</artifactId>
            <version>$flink.version</version>
        </dependency>

        <!--引入日志相关的依赖-->
        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>$slf4j.version</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>$slf4j.version</version>
            <type>pom</type>
            <scope>test</scope>
        </dependency>


        <!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-to-slf4j -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>$slf4j.version</version>
        </dependency>


    </dependencies>

</project>
  • log日志格式:在resources下新建日志文件log4j.propertries,内容如下:
### 设置###
log4j.rootLogger = error,stdout

### 输出信息到控制抬 ###
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = %-4r [%t] %-5p %c %x -%m%n

  • 最后,在src/main/java下新建Javacom.rowyet.wc,开始编写Flink的练手项目;

2. DataSet API 批处理实现word count

  com.rowyet.wc包下创建Java class文件BatchWorldCount,内容如下:

package com.rowyet.wc;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class BatchWorldCount 
    public static void main(String[] args) throws Exception 
        // 1. 创建执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 2. 从文件中读取数据
        DataSource<String> lineDataSource = env.readTextFile("input/word.txt");

        // 3. 将每行数据进行分词,转换成二元组类型,利用java lambda表达式实现flatMap
        FlatMapOperator<String, Tuple2<String, Long>> wordAndOneTuple = lineDataSource.flatMap((String line, Collector<Tuple2<String, Long>> out) ->
        
            String[] words = line.split(" ");
            for (String word : words) 
                out.collect(Tuple2.of(word, 1L));
            
        ).returns(Types.TUPLE(Types.STRING, Types.LONG));

        // 4. 按照word进行分组,利用word的索引0,即第一个元素进行分组
        UnsortedGrouping<Tuple2<String, Long>> wordAndOneGroup = wordAndOneTuple.groupBy(0);

        // 5. 分组内进行聚合统计,根据word分组后的索引1,即第二个元素进行求和
        AggregateOperator<Tuple2<String, Long>> sum = wordAndOneGroup.sum(1);

        // 6. 打印结果
        sum.print();


    


  运行结果,如图2.1;

图2.1 DataSet API批处理运行结果

3. DataSet API VS DataStream API

  在Flink 1.12版本开始,官方就推荐使用DataSteam API,在提交任务时只需要通过以下shell参数指定模式为BATCH即可;

bin/flink run -Dexecution.runtime-mode=BATCH BatchWorldCount.jar

  如此一来,DataSet API就已经处于软弃用(soft deprecated)的状态,而且实际应用中只需要维护一套DataStream API即可,真正的向流批一体迈进。

4. DataStream API 流处理实现word count

4.1 有界的流处理

  com.rowyet.wc包下创建Java class文件BoundedStreamWordCount,内容如下:

package com.rowyet.wc;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class BoundedStreamWordCount 
    public static void main(String[] args) throws Exception 
        // 1. 创建流式的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. 读取文件
        DataStreamSource<String> lineDataStreamSource = env.readTextFile("input/word.txt");

        // 3. 转化计算
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = lineDataStreamSource.flatMap((String line, Collector<Tuple2<String, Long>> out) -> 
                    String[] words = line.split(" ");
                    for (String word : words) 
                        out.collect(Tuple2.of(word, 1L));
                    
                
        ).returns(Types.TUPLE(Types.STRING, Types.LONG));

        // 4. 分组
        KeyedStream<Tuple2<String, Long>, String> wordAndOneKeyedStream = wordAndOneTuple.keyBy(data -> data.f0);

        // 5. 求和
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordAndOneKeyedStream.sum(1);

        // 6. 打印
        sum.print();

        // 7. 启动执行
        env.execute();


    


  运行结果,如图2.2,发现跟之前图2.1的运行结果有些不一样,具体区别在哪呢?

  • 数据出现无序了,而且是来一条处理一条,最终的结果才是准确的结果;
  • 结果前面有一个序号,而且相同的word序号相同,这是因为Flink最终运行在分布式的集群内,而这个序号是IDEA模拟分布式集群,代表你的CPU的核数的一个CPU序号,博主的CPU是8核的(可以理解为有CPU8个),所以序号不会大于8,以此类推自己的CPU总核数和运行结果,至于为什么相同的word序号是一样的,是因为相同的word作为分区的key,最终肯定要在同一个处理器上才可以进行后续的sum统计呢。

图2.2 有界流处理运行结果

4.2 无界的流处理

  这里利用linux的netcat命令监听端口7777的连续不断输入的word为例,实现无界的流处理word count的统计;

  com.rowyet.wc包下创建Java class文件StreamWordCount,内容如下:

package com.rowyet.wc;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


public class StreamWordCount 
    public static void main(String[] args) throws Exception 
        // 1. 创建流式环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. 读取文本流
       // DataStreamSource<String> lineDataSource = env.socketTextStream("127.0.0.1", 7777);  //测试可以写死参数

        //生产中一般,通过main函数后接参数实现
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String host = parameterTool.get("host");
        int port = parameterTool.getInt("port");

        // 运行时在菜单栏Run—>Edit Configuration—>Program arguments文本框内填入  --host "127.0.0.1" --port 7777
        DataStreamSource<String> lineDataSource = env.socketTextStream(host, port);  


        // 3. 转换处理
        SingleOutputStreamOperator<Tuple2<String, Long>> wordOneTuple = lineDataSource.flatMap((String line, Collector<Tuple2<String, Long>> out) ->
                
                    String[] words = line.split(" ");
                    for (String word : words) 
                        out.collect(Tuple2.of(word, 1L));
                    
                
        ).returns(Types.TUPLE(Types.STRING, Types.LONG));

        // 4. 分组
        KeyedStream<Tuple2<String, Long>, String> wordAndOneKeyedStream = wordOneTuple.keyBy(data -> data.f0);

        // 5. 求和
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordAndOneKeyedStream.sum(1);

        // 6. 输出
        sum.print();

        // 7. 启动执行
        env.execute();

    

  写完代码后接下来的操作顺序很重要,要注意!!!
  写完代码后接下来的操作顺序很重要,要注意!!!
  写完代码后接下来的操作顺序很重要,要注意!!!

  1. 在某一台Linux或者MacOS开启netcat命令监听本地7777端口,博主的是本地的MacOS终端,指令是:
nc -lk 7777   
# 回车启动,先不要输入内容
  1. 启动刚刚写好的Java Class文件StreamWordCount,暂时看不到任何东西,一直等待输出的空白输出框,如图2.3;
图2.3 无界流处理等待结果
  1. 如图2.4,在步骤1的MacOS终端启动的netcat环境内输入一些聊天消息;

图2.4 开启netcat命令监听7777端口
  1. 最中在IDEA的运行结果内会实时得到运算结果,如图2.5
图2.5 无界流计算得到word count

5. 试炼项目代码链接

  以上就是以经典的大数据word count统计为例,讲述传统Apache Flink DataSet API(批处理API)和新的流式DataStream API的两种实现,从代码动手开始揭开Apache Flink的神秘面纱,整个试炼项目代码链接如下:

apacheflink从入门到放弃——快速上手(java版)(代码片段)

目录1.环境准备和创建项目1.1软件准备及版本1.2IDEA下创建Java项目FlinkTutorial2.DataSetAPI批处理实现wordcount3.DataSetAPIVSDataStreamAPI4.DataStreamAPI流处理实现wordcount4.1有界的流处理4.2无界的流处理5.试炼项目代码链接1.环境准备和创建项目1.1... 查看详情

apacheflink从入门到放弃——flink简介(代码片段)

目录1.计算引擎的发展历史2.什么是Flink2.1概念2.2什么是有界的数据流和无界数据流?什么是状态?2.3Fink的历史2.4Flink的特点2.5Flink的应用2.6流批架构的演变2.7Flink的分层API3.FlinkVSSpark4.FlinkOrSpark?1.计算引擎的发展历史... 查看详情

初识pytorch:从安装到入门,从入门到放弃(代码片段)

目录PyTorch安装配置安装验证PyTorchPyTorch是Facebook团队于2017年1月发布的一个深度学习框架,虽然晚于TensorFlow,也没有TensorFlow火,但目前已经与TensorFlow奇虎相当。而且PyTorch采用了Python语言的接口,可以说它才是Pytho... 查看详情

《java从入门到放弃》javase入门篇:集合

今天来讲讲Java中的集合和常见集合类型的使用。什么是集合呢?刚好最近学校里面军训,只听到教官一声喊:“集合!!!”各位小萌新们就屁颠屁颠的跑过来排列整齐了,这就是集合···650)this.width=650;"src="https://img.baidu.com/hi/... 查看详情

《java从入门到放弃》入门篇:hibernate查询——hql

不知不觉又到了hibernate的最后一篇了,只感觉时光飞逝~,岁月如梭~!转眼之间,我们就···························,好吧,想装个X,结果装不下去了,还是直接开始吧·650)this.width=650;"src="https://img.baidu.com/hi/jx2/j_00... 查看详情

云开发系列课程让你从入门到精通快速上手serverless和云开发技术

简介:云开发系列课程主要介绍了从入门到精通快速上手Serverless和云开发技术。学习内容涵盖云开发协同、云函数、云数据库、多媒体托管、前后端一体化框架等ServerlessWeb开发必备知识。希望通过云开发系列课程的学习与... 查看详情

初识pytorch:从安装到入门,从入门到放弃(代码片段)

目录PyTorch安装配置安装验证PyTorchPyTorch是Facebook团队于2017年1月发布的一个深度学习框架,虽然晚于TensorFlow,也没有TensorFlow火,但目前已经与TensorFlow奇虎相当。而且PyTorch采用了Python语言的接口,可以说它才是Pytho... 查看详情

spark从入门到上手实战

Spark从入门到上手实战课程学习地址:http://www.xuetuwuyou.com/course/186课程出自学途无忧网:http://www.xuetuwuyou.com讲师:轩宇老师课程简介:Spark属于新起的基于内存处理海量数据的框架,由于其快速被众公司所青睐。Spark生态栈框架... 查看详情

shiro从入门到放弃

ApacheShiro是Java的一个安全框架。目前,使用ApacheShiro的人越来越多,因为它相当简单,对比SpringSecurity,可能没有SpringSecurity做的功能强大,但是在实际工作时可能并不需要那么复杂的东西,所以使用小而简单的Shiro就足够了。对... 查看详情

《java从入门到放弃》入门篇:struts2的常用验证方式

感觉过了一个周末,人都懒得不要不要的,今天就来点简单的内容吧--,各位看官如果欲求不满的话,可以自行解决或再去宠幸其他“勃主”···650)this.width=650;"src="https://img.baidu.com/hi/jx2/j_0036.gif"alt="j_0036.gif"/>struts2的验证方式主... 查看详情

《java从入门到放弃》入门篇:hibernate中的多表对应关系

hibernate中的对应关系其实就是数据库中表的对应关系,就跟某些电影中的某些场景是一样一样滴。比如可以是一男一女,还可以是一男多女,更可以是多男一女,最后最后最后还可以是多男多女!!!650)this.width=650;"src="https://img... 查看详情

ldap从入门到放弃

 OpenLDAP快速入门 一、Ldap简介  1、目录服务  目录是一个为查询、浏览和搜索而优化的专业分布式数据库,它呈树状结构组织数据,就好象Linux/Unix系统中的文件目录一样。  目录服务的组成:      1、目录... 查看详情

qml从入门到放弃第二卷

第二卷如何更快速的放弃,注重的是C++和QML的交互<1>记事本。。 (1)先测试下不在QML创建C++对象,仅仅在main.cpp添加一个属性函数供调用.TextStreamLoader.h#ifndefTEXTSTREAMLOADER_H#defineTEXTSTREAMLOADER_H#include<QObject>#include<QTextStr 查看详情

rn从上手到“放弃”(代码片段)

RN从上手到“放弃”前言:react-native,相对于最近??的飞起的flutter,不算是一个新技术,2015年Facebook开源,到现在已经45个年头,一直在维护当中,但是至今未发布v1版本,目前已经更新到0.59。该技术目标:跨平台实现原生应用... 查看详情

一篇搞懂springcloudgateway(从入门到放弃)(代码片段)

首先理解什么是网关​传统的单体应用中,所有的请求都是由一个应用来处理的,所以没有网关的这个概念。但在微服务架构中,每个服务都有对外提供服务的地址和端口,都有一些共同的功能,比如:权... 查看详情

vue从入门到放弃(代码片段)

vue介绍Vue(读音/vjuː/,类似于view)是一套用于构建用户界面的渐进式框架。与其它大型框架不同的是,Vue被设计为可以自底向上逐层应用。Vue的核心库只关注视图层,不仅易于上手,还便于与第三方库或既有项目整... 查看详情

如何学好spark大数据-从入门到上手

ApacheSpark是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UCBerkeleyAMPlab(加州大学伯克利分校的AMP实验室)所开源的类HadoopMapReduce的通用并行框架,Spark,拥有HadoopMapReduce所具有的优点;但不同于MapReduce的是Job中间输出结... 查看详情

《java从入门到放弃》javase入门篇:练习——单身狗租赁系统

今天,我们要玩个大的!!!我们把之前使用数组做的这个单身狗系统改版成数据库版本,并且使用面向对象里面的一些简单思想。如果有不知道这个系统的看官,请跳转到目录页,然后再选择单身狗系统(数组版)先围观五分钟... 查看详情