flink+kafka实现wordcount实时计算(代码片段)

技术即艺术 技术即艺术     2022-11-12     601

关键词:

Flink介绍:

Flink 是一个针对流数据和批数据的分布式处理引擎。它主要是由 Java 代码实现。目前主要还是依靠开源社区的贡献而发展。对 Flink 而言,其所要处理的主要场景就是流数据,批数据只是流数据的一个极限特例而已。再换句话说,Flink 会把所有任务当成流来处理,这也是其最大的特点。Flink 可以支持本地的快速迭代,以及一些环形的迭代任务。

Flink的特性:

Flink是个分布式流处理开源框架:
1>. 即使数据源是无序的或者晚到达的数据,也能保持结果准确性
2>. 有状态并且容错,可以无缝的从失败中恢复,并可以保持exactly-once
3>. 大规模分布式
4>. 实时计算场景的广泛应用(阿里双十一实时交易额使用的Blink就是根据Flink改造而来)

Flink可以确保仅一次语义状态计算;Flink有状态意味着,程序可以保持已经处理过的数据;
Flink支持流处理和窗口事件时间语义,Flink支持灵活的基于时间窗口,计数,或会话数据驱动的窗户;
Flink容错是轻量级和在同一时间允许系统维持高吞吐率和提供仅一次的一致性保证,Flink从失败中恢复,零数据丢失;
Flink能够高吞吐量和低延迟;
Flink保存点提供版本控制机制,从而能够更新应用程序或再加工历史数据没有丢失并在最小的停机时间。

2. Kafka

Kafka介绍

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

Kafka特性

Kafka是一种高吞吐量的分布式发布订阅消息系统,有如下特性:
1>. 通过磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
2>. 高吞吐量即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。
3>. 支持通过Kafka服务器和消费机集群来分区消息。
4>. 支持Hadoop并行数据加载。

Kafka的安装配置及基础使用

因为此篇博客是本地Flink消费Kafka的数据实现WordCount,所以Kafka不需要做过多配置,从Apache官网下载安装包直接解压即可使用
这里我们创建一个名为test的topic
在producer输入数据流:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

在consumer监控从producer输入的数据流:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

1>. 创建maven project

<dependencies>
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients_2.11</artifactId>
			<version>1.0.0</version>
		</dependency>

		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java_2.11</artifactId>
			<version>1.0.0</version>
			<scope>provided</scope>
		</dependency>

		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-java</artifactId>
			<version>1.0.0</version>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-kafka-0.8_2.11</artifactId>
			<version>1.0.0</version>
		</dependency>
	</dependencies>
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

4>. 设置监控数据流时间间隔(官方叫状态与检查点)

env.enableCheckpointing(1000);

5>. 配置kafka和zookeeper的ip和端口

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.1.20:9092");
properties.setProperty("zookeeper.connect", "192.168.1.20:2181");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<String>("test", new SimpleStringSchema(),properties);

7>. 将Kafka的数据转成flink的DataStream类型

DataStream<String> stream = env.addSource(myConsumer);

8>. 实施计算模型并输出结果

DataStream<Tuple2<String, Integer>> counts = stream.flatMap(new LineSplitter()).keyBy(0).sum(1);

counts.print();

计算模型具体逻辑代码

public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> 
		private static final long serialVersionUID = 1L;

		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) 
			String[] tokens = value.toLowerCase().split("\\\\W+");
			for (String token : tokens) 
				if (token.length() > 0) 
					out.collect(new Tuple2<String, Integer>(token, 1));
				
			
		
	

4. 验证

1>. Kafka producer输入

2>. Flink客户端立刻得出结果

完整代码

package com.scn;

import java.util.Properties;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;

public class FilnkCostKafka 
	public static void main(String[] args) throws Exception 
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.enableCheckpointing(1000);

		Properties properties = new Properties();
		properties.setProperty("bootstrap.servers", "192.168.1.20:9092");
		properties.setProperty("zookeeper.connect", "192.168.1.20:2181");
		properties.setProperty("group.id", "test");

		FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<String>("test", new SimpleStringSchema(),
				properties);

		DataStream<String> stream = env.addSource(myConsumer);

		DataStream<Tuple2<String, Integer>> counts = stream.flatMap(new LineSplitter()).keyBy(0).sum(1);

		counts.print();

		env.execute("WordCount from Kafka data");
	

	public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> 
		private static final long serialVersionUID = 1L;

		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) 
			String[] tokens = value.toLowerCase().split("\\\\W+");
			for (String token : tokens) 
				if (token.length() > 0) 
					out.collect(new Tuple2<String, Integer>(token, 1));
				
			
		
	



flink基于java的wordcount,根据滑动窗口动态排序实现(代码片段)

背景刚学习Flink没多久,之前用Spark做WordCount实现排序很简单,因为Spark做的WC是简单的批处理,直接排序就完事了,但是Flink的流处理需要考虑到状态(Stateful),并且时间语义我选择的是ProcessingTimeÿ... 查看详情

flink系列之:基于scala语言实现flink实时消费kafkatopic中的数据(代码片段)

Flink系列之:基于scala语言实现flink实时消费KafkaTopic中的数据一、引入flink相关依赖二、properties保存连接kafka的配置三、构建flink实时消费环境四、添加Kafka源和处理数据五、完整代码六、执行程序查看消费到的数据一、引入fli... 查看详情

大数据——flink入门程序(wordcount)(代码片段)

目录 一、编程模型二、编程步骤三、DataStream实时wordcount​四、DataSet离线wordcount 一、编程模型 Flink提供了不同级别的编程抽象,通过调用抽象的数据集调用算子构建DataFlow就可以实现对分布式的数据进行流式计算和离线计... 查看详情

实时即未来,大数据项目车联网之创建flink实时计算子工程(代码片段)

文章目录写在前面车联网项目全新升级创建Flink实时计算子工程1在原工程下创建实时分析子模块2导入实时分析子模块pom依赖3配置实时分析子模块资源文件写在前面车联网项目全新升级更全8-》21+篇更细-》图文并茂、部分代码... 查看详情

实时数仓系列-网易云音乐基于flink+kafka的实时数仓建设实践

简介:本文由网易云音乐实时计算平台研发工程师岳猛分享,主要从以下四个部分将为大家介绍Flink+Kafka在网易云音乐的应用实战:背景Flink+Kafka平台化设计Kafka在实时数仓中的应用问题&改进直播回放:https://developer.aliyun.com/li... 查看详情

flink消费kafka到hdfs实现及详解(代码片段)

...版本如下:组件版本Kafka2.4.0Flink1.10.0Hadoop2.10.0 2.2代码实现Flink消费Kafka集群中的数据,需要依赖Flink包,依赖如下:<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-filesystem_2.12</artifactId><version... 查看详情

实时即未来,大数据项目车联网之创建flink实时计算子工程(代码片段)

文章目录写在前面车联网项目全新升级创建Flink实时计算子工程1在原工程下创建实时分析子模块2导入实时分析子模块pom依赖3配置实时分析子模块资源文件写在前面车联网项目全新升级更全8-》21+篇更细-》图文并茂、部分代码... 查看详情

flink+kafka实时统计本地环境搭建与代码实战(代码片段)

1.搭建zookeeper与kafka本地环境flink经常用来消费上游kafka的数据,而kafka又依赖zookeeper进行。所以在进行测试之前,先要准备好本地的zookeeper与kafka环境。关于准备zk与kafka环境,具体可以参考SparkStreamingkafkazookeeper本地环... 查看详情

flink+kafka实时统计本地环境搭建与代码实战(代码片段)

1.搭建zookeeper与kafka本地环境flink经常用来消费上游kafka的数据,而kafka又依赖zookeeper进行。所以在进行测试之前,先要准备好本地的zookeeper与kafka环境。关于准备zk与kafka环境,具体可以参考SparkStreamingkafkazookeeper本地环... 查看详情

flink实战系列flinksql实时同步kafka数据到hudi(parquet+snappy)(代码片段)

FlinkSQL实时同步Kafka数据到Hudi(parquet+snappy)版本信息Flink1.15.1Hadoop2.9.0Hudi0.12.0Kafka2.4.1需要注意的是Flink1.15.x版本对应的Hudi必须使用0.12.0及以上版本,因为Hudi0.12.0版本才开始支持Flink1.15.0版本。jar包依赖把hudi-flink1.15-bundle-0.12.0.jar 查看详情

flink从kafka获取数据写入mysql的实现(代码片段)

需求获取实时热门商品1、按1分钟的窗口大小,每3秒统计一次,做滑动窗口聚合2、每个窗口聚合,输出每个窗口中点击量前5名的商品3、水印延迟容忍时间为3秒Kafka生产数据packageexam0714;importcom.alibaba.fastjson.JSON;importlom... 查看详情

指标统计:基于流计算oceanus(flink)实现实时uvpv统计

...ff0c;腾讯CSIG高级工程师导语|最近梳理了一下如何用Flink来实现实时的UV、PV指标的统计,并和公司内微视部门的同事交流。然后针对该场景做了简化,并发现使用FlinkSQL来实现这些指标的统计会更加便捷。一、解决方案描... 查看详情

实时监控:基于流计算oceanus(flink)实现系统和应用级实时监控(代码片段)

...如何使用腾讯云大数据组件来完成实时监控系统的设计和实现,通过实时采集并分析云服务器(CVM)及其App应用的CPU和内存等资源消耗数据,以短信、电话、微信消息等方式实时反馈监控告警信息,高效地保... 查看详情

基于kafka的实时计算引擎:flink能否替代spark?

...欺诈检测、出租车预订、患者监控等场景处理时,需要对实时数据进行实时处理,以便做出快速可行的决策。目前业界有开源不少实时计算引擎,以Apache基金会的两款开源实时计算引擎最受欢迎,它们分 查看详情

flink从kafka获取数据写入mysql的实现(代码片段)

需求获取实时热门商品1、按1分钟的窗口大小,每3秒统计一次,做滑动窗口聚合2、每个窗口聚合,输出每个窗口中点击量前5名的商品3、水印延迟容忍时间为3秒Kafka生产数据packageexam0714;importcom.alibaba.fastjson.JSON;importlom... 查看详情

flink从kafka获取数据写入mysql的实现(代码片段)

需求获取实时热门商品1、按1分钟的窗口大小,每3秒统计一次,做滑动窗口聚合2、每个窗口聚合,输出每个窗口中点击量前5名的商品3、水印延迟容忍时间为3秒Kafka生产数据packageexam0714;importcom.alibaba.fastjson.JSON;importlom... 查看详情

flink实战系列flinksql实时同步kafka数据到hudi(parquet+snappy)并且自动同步数据到hive

FlinkSQL实时同步Kafka数据到Hudi(parquet+snappy)并且自动同步数据到Hive今天这篇文章主要来介绍一下Flink流式写入Hudi,并把数据加载到Hive表里做查询的两种方式,为了方便演示,下文采用sqlclient的方式提交任务,不写一行代码,纯... 查看详情

实时数仓flink生产环境部署+提交作业步骤(代码片段)

文章目录1、基础环境2、开发环境2.1、pom.xml2.2、log4j.properties2.3、测试用的代码2.3.1、Flink执行环境工具2.3.2、Kafka工具2.3.3、测试Flink读写Kafka2.3.4、测试FlinkSQL读写Kafka2.4、打包后上传到服务器3、生产环境3.1、Flink安装3.2、FlinkonYARN... 查看详情