关键词:
FlinkCDC从Mongodb同步数据至elasticsearch(ES)
DataStreamingAPI方式
网上挺多flinksql方式同步数据,但是遇到数据比较杂乱,会经常无缘无故报错,笔者被逼无奈,采用API方式处理数据后同步,不知为何API资料笔者找到的资料很少,还很不全,摸着石头过河总算完成任务,收获颇丰,以此分享给大家。
pom.xml
<modelVersion>4.0.0</modelVersion>
<groupId>com.cece</groupId>
<artifactId>Mongo-ES</artifactId>
<version>1.0</version>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven.compiler.source>$java.version</maven.compiler.source>
<maven.compiler.target>$java.version</maven.compiler.target>
<flink.version>1.13.0</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.12</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_$scala.binary.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.11</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_$scala.version</artifactId>
<version>$flink.version</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>3.11.0</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mongodb-cdc</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mongodb-cdc</artifactId>
<version>2.2.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/connect-api -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.7.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>$flink.version</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
<exclude>org.apache.hadoop:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. -->
<!-- 打包时不复制META-INF下的签名文件,避免报非法签名文件的SecurityExceptions异常-->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers combine.children="append">
<!-- The service transformer is needed to merge META-INF/services files -->
<!-- connector和format依赖的工厂类打包时会相互覆盖,需要使用ServicesResourceTransformer解决-->
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
主程序
public class Config
public static final String MONGODB_URL = "1xxx";
public static final String MONGODB_USER = "sxx";
public static final String MONGODB_PWD = "xx";
public static final String MONGODB_DATABASE = "xx";
public static final String MONGODB_COLLECTION = "xx";
public static final String ES_URL = "xx";
public static final int ES_PORT = xx;
public static final String ES_USER = "xxx";
public static final String ES_PWD = "xxxx";
public static final String ES_INDEX = "xxxx";
public static final int BUFFER_TIMEOUT_MS =100;
public static final int CHECKPOINT_INTERVAL_MS =3*60*1000;
public static final int CHECKPOINT_TIMEOUT_MS = 1*60*1000;
public static final int CHECKPOINT_MIN_PAUSE_MS = 1*60*1000;
主程序
public class FlinkCdcSyn_API
public static void main(String[] args) throws Exception
//1.构建flink环境及配置checkpoint
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.setParallelism(1);
env.setBufferTimeout(BUFFER_TIMEOUT_MS);
env.enableCheckpointing(CHECKPOINT_INTERVAL_MS, CheckpointingMode.AT_LEAST_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(CHECKPOINT_TIMEOUT_MS);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE_MS);
env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(1), Time.seconds(10)));
//2.通过FlinkCDC构建SourceFunction
SourceFunction<String> mongoDBSourceFunction = MongoDBSource.<String>builder()
.hosts(MONGODB_URL)
.username(MONGODB_USER)
.password(MONGODB_PWD)
.databaseList(MONGODB_DATABASE)
.collectionList(MONGODB_COLLECTION)
.deserializer(new JsonDebeziumDeserializationSchema())
// .deserializer(new CoustomParse())
.build();
//3.数据初步处理,因为es的keyword最大不能超过32766
SingleOutputStreamOperator<String> stream = env.addSource(mongoDBSourceFunction)
.setParallelism(1)
.name("mongo_to_es")
.filter(new FilterFunction<String>()
@Override
public boolean filter(String s) throws Exception
try
//判断是否是json格式,不是过滤掉
JSONObject obj = JSON.parseObject(s);
return true;
catch (Exception e)
System.out.println("json格式错误:"+ s) ;
return false;
//不处理会报whose UTF8 encoding is longer than the max length 32766,将过大的字段过滤掉
).map(new MapFunction<String, String>()
@Override
public String map(String s) throws Exception
JSONObject obj = JSON.parseObject(s);
String str = obj.getString("operationType");
if("insert".equals(str) || "update".equals(str))
JSONObject obj1 = obj.getJSONObject("fullDocument");
if(obj1.toString().getBytes("utf-8").length > 36000)
Set<Map.Entry<String, Object>> entries = obj1.entrySet();
Iterator<Map.Entry<String, Object>> iterator = entries.iterator();
while(iterator.hasNext())
// String s1 = iterator.next().toString();
// System.out.println("iterator含义:" + s1);
if(iterator.next().toString().getBytes("utf-8").length > 30000)
iterator.remove();
obj.fluentPut("fullDocument",obj1.toString());
return obj.toString();
);
List<HttpHost> httpHosts = new ArrayList<>();
//4.对insert/update/delete分别处理
httpHosts.add(new HttpHost(ES_URL, ES_PORT, "http"));
ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<String>(
httpHosts, new ElasticsearchSinkFunction<String>()
public ActionRequest createIndexRequest(String element)
JSONObject obj = JSON.parseObject(element);
// System.out.println("create:" + obj.toString());
String str = obj.getString("operationType");
// Map<String, String> json = new HashMap<>();
String id = null;
try
id = obj.getJSONObject("documentKey").getJSONObject("_id").getString("$oid");
catch (Exception e)
try
id = obj.getJSONObject("documentKey").getString("_id");
catch (Exception ex)
System.out.println("格式不对:" + obj);
flink系列之:基于flinkcdc2.0实现海量数据的实时同步和转换
Flink系列之:基于FlinkCDC2.0实现海量数据的实时同步和转换一、CDC技术二、FlinkCDC技术三、传统数据集成方案的痛点1.传统数据入仓架构1.02.传统数据入仓架构2.03.传统CDCETL分析四、基于FlinkCDC的海量数据的实时同步和转换1.FlinkCDC增... 查看详情
dbsync新增对mongodbes的支持
...步工具DBSync近日进行了升级,最新版本为V1.9,新增了对MongoDB、Elasticseach(ES)的支持,具体情况:1、支持同型库之间的同步,如:MongoDB至MongoDB,ES至ES。2、支持异型库之间的同步,能将SQL数据同步到NoSQL,也就是把row转换为docu... 查看详情
flinkcdc(更新中)(代码片段)
一、CDC是什么?CDC(ChangeDataCapture),变更数据获取的简称,使用CDC我们可以从数据库中获取已提交的更改并将这些更改发送到下游,供下游使用。这些变更可以包括INSERT,DELETE,UPDATE等.用户可以在如下的场景使用cdc:实时数... 查看详情
flink实战系列flinkcdc实时同步mysql全量加增量数据到hudi(代码片段)
【Flink实战系列】FlinkCDC实时同步Mysql全量加增量数据到Hudi前言FlinkCDC是基于Flink开发的变化数据获取组件(Changedatacapture),简单的说就是来捕获变更的数据,ApacheHudi是一个数据湖平台,又支持对数据做增删改查操作,所以FlinkCD... 查看详情
flink实战系列flinkcdc实时同步mysql全量加增量数据到hudi(代码片段)
【Flink实战系列】FlinkCDC实时同步Mysql全量加增量数据到Hudi前言FlinkCDC是基于Flink开发的变化数据获取组件(Changedatacapture),简单的说就是来捕获变更的数据,ApacheHudi是一个数据湖平台,又支持对数据做增删改查操作,所以FlinkCD... 查看详情
ogg从oracle备库同步数据至kafka(代码片段)
OGG从Oracle备库同步数据至kafkaTableofContents1.目的2.环境及规划3.安装配置JDK3.1.安装jdk3.2.配置环境变量4.安装Dataguard4.1.安装备库软件4.2.配置dataguard4.2.1.主库4.2.2.备库4.3.完成操作4.4.启动实时复制5.zookeeper集群5.1.上传并... 查看详情
flinkcdc原理实践和优化(代码片段)
1.CDC是什么CDC是变更数据捕获(ChangeDataCapture)技术的缩写,它可以将源数据库(Source)的增量变动记录,同步到一个或多个数据目的(Sink)。在同步过程中,还可以对数据进行一定的处理,... 查看详情
flinkcdc+kafka加速业务实时化
...,在9月24日ApacheFlinkMeetup的分享。主要内容包括:FlinkCDC技术对比与分析Flink+Kafka实时数据集成方案Demo:Flink+Kafka实现CDC数据的实时集成和实时分析一、FlinkCDC技术对比与分析1.1.变更数据捕获(CDC)技术广... 查看详情
如何在两个 MongoDB 数据库之间同步一些集合?
】如何在两个MongoDB数据库之间同步一些集合?【英文标题】:HowtosyncsomecollectionsbetweentwoMongoDBdatabases?【发布时间】:2019-12-1016:57:10【问题描述】:我正在寻找一些方法在两个MongoDB数据库之间同步一些集合,例如将一些集合从本... 查看详情
基于flinkcdc打通数据实时入湖
...rg两种技术,来解决业务数据实时入湖相关的问题。01FlinkCDC介绍CDC全称是ChangeDataCapture,捕获变更数据,是一个比较广泛的概念,只要是能够捕获 查看详情
flinkcdc2.0(代码片段)
1.Flinkcdc概念CDC的全称是ChangeDataCapture,在广义的概念上,只要能捕获数据变更的技术,我们都可以称为CDC。通常我们说的CDC技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。2.应用场景1. 数... 查看详情
【mongodb】mongodb主从从裁架构重启
参考技术AmongoDB主从数据同步主库数据小于从库,从库回滚到跟主库同步状态从上可知,假如原来主从数据同步出了问题,主库数据量远大于从库数据量,而你并没有察觉,某次主、从、仲裁进程全部挂了如果先起原来从库和仲... 查看详情
oracle数据怎么实时同步到mongodb|亲测干货分享建议收藏
...一种方式,可以非常方便地完成Oracle数据实时同步到MongoDB,跟大家分享一下,希望对你有帮助。Oracle数据实时同步到MongoDB等非关系型数据 查看详情
mongodb的主从复制和副本集
mongoDB的两个特性主从复制和副本集,实现了数据的同步备份一、主从复制主从复制是一个简单的数据库同步备份的集群技术.例如主服务器宕机了,可以直接使用从服务器,主服务器恢复后在进行同步,保证了业务的连续性 ... 查看详情
如何将数据从 MongoDB 迁移到 SQL-Server? [关闭]
】如何将数据从MongoDB迁移到SQL-Server?[关闭]【英文标题】:HowtomigratedatafromMongoDBtoSQL-Server?[closed]【发布时间】:2015-05-0111:53:34【问题描述】:我四处搜索,发现有一些方法可以将数据从sql-server传输/同步到mongodb。我也知道Mongodb... 查看详情
tidb-使用ticdc将数据同步至下游kafka中(代码片段)
...用TiCDC怎么将数据同步至下游Kafka中,以实现TIDB到ES、MongoDB、Redis等NoSql数据库的同步。上篇博客地址:https://blog.csdn.net/qq_43692950/article/details/121731278注意:使用TiCDC,需将TIDB版本上级至v4.0.6以上。二、TICDC配制数据... 查看详情
tidb-使用ticdc将数据同步至下游kafka中(代码片段)
...用TiCDC怎么将数据同步至下游Kafka中,以实现TIDB到ES、MongoDB、Redis等NoSql数据库的同步。上篇博客地址:https://blog.csdn.net/qq_43692950/article/details/121731278注意:使用TiCDC,需将TIDB版本上级至v4.0.6以上。二、TICDC配制数据... 查看详情
mysql到mongodb实时数据同步实操分享
MySQL数据怎么实时同步到MongoDB实践分享系列摘要:很多DBA同学经常会遇到要从一个数据库实时同步到另一个数据库的问题,同构数据还相对容易,遇上异构数据、表多、数据量大等情况就难以同步。我自己亲测了一种... 查看详情