如何利用开源插件?又快又好地搞好数据接口开发,连通不同应用系统(代码片段)

java李杨勇 java李杨勇     2022-12-05     134

关键词:

目录

前言介绍:

开源插件 Tapdata PDK

快速开始目标数据库接入

准备环境

下载源码并编译

创建目标数据库的Connector工程

开发完成之后通过 TDD 进行测试验证

如何提交到 PDK 开源项目

彩蛋


前言介绍:

毫不夸张地说,没有开发者还没踢过“应用数据不互通”这块铁板——平台不同、技术不同、存储和部署方式不同的情况下,又缺少必要的接口,应用系统之间难以互通。而随着业务需求的不断扩展,应用也在不断向多元化、个性化发展,未来业务与陈旧技术栈间矛盾也日益凸显,需要的接口数量也越来越多。

如何简单快速地搞定接口开发,也就成了一个需要我们考虑的问题。最近,我挖到了一个很香的开源插件,仔细研究了技术文档之后,决定安利给大家:

开源插件 Tapdata PDK

GitHub 链接:https://github.com/tapdata/idaas-pdk

这个项目的发布者是国内一个专攻实时数据服务平台的创业团队 Tapdata,据官方透露,这次开源的这个小组件,也是其核心产品开源的投路石,背靠的是这个团队在数据实时同步方面相当成熟的实力。

PDK 是其数据接口技术抽象化而来的一个开源插件开发框架,通过 Source Plugin 接口或者 Target Plugin 接口,可以快速实现新数据库作为 Tapdata 的源或目标的适配兼容,从而通过 Tapdata Cloud产品和即将开源的 Tapdata,免费获得各种异构数据源到目标数据库或平台的实时数据对接能力。

按照 PDK 连接器的开发规范进行数据源和目标端的开发,可以简化数据链路的开发流程,通过详细的开发规划和内置的 TDD 测试,可简单、快速地完成新数据源和目标端的开发工作。

支持类型包括:

  • 接入数据库: MySQL、Oracle、PostgreSQL 等
  • 接入 SaaS 产品: Salesforce、vika 维格表、金数据表单、Zoho CRM 等
  • 接入自定义数据源: 可对接私有协议数据源

快速开始目标数据库接入

目前,PDK 团队已将技术文档公开,大家可以前往 GitHub(https://github.com/tapdata/idaas-pdk) 具体了解。

准备环境

  • Java 8
  • Maven
  • Git
  • IntelliJ IDEA

下载源码并编译

git clone https://github.com/tapdata/idaas-pdk.git

cd idaas-pdk

mvn clean install

创建目标数据库的Connector工程

例如 group 为 io.tapdata, 数据库 name 为 XDB, 版本 version 为 0.0.1, 通过以下命令创建 Connector 工程

  • 目标数据库无需建表时

./bin/tap template --type target --group io.tapdata --name XDB --version 0.0.1 --output ./connectors

用 ItelliJ IDEA 打开 idaas-pdk, 在 idaas-pdk/connectors 下就能看见 xdb-connector 工程。

  • 在 spec.json 里填写 configOptions

configOptions 集成到 Tapdata 站点之后, 配置给用户在使用该Connector的时候的输入项, 例如数据库的连接地址, 用户名, 密码等等



   ...

   "configOptions":

      "connection":

         "type":"object",

         "properties":

            "host":

               "type": "string",

               "title": "Host",

               "x-decorator": "FormItem",

               "x-component": "Input"

            , 

            "port":

               "type": "number",

               "title": "Port",

               "x-decorator": "FormItem",

               "x-component": "Input"

            

            

         

      

   


  • 编写接入目标数据库的代码
@TapConnectorClass("spec.json")

public class XDBConnector extends ConnectorBase implements TapConnector 

 @Override

    public void discoverSchema(TapConnectionContext connectionContext, Consumer<List<TapTable>> consumer) 

        //TODO Load tables from database, connection information in connectionContext#getConnectionConfig

        //Sample code shows how to define tables.

        consumer.accept(list(

                //Define first table

                table("empty-table1"),

                //Define second table

                table("empty-table2"))

        ));

    


 @Override

    public void connectionTest(TapConnectionContext connectionContext, Consumer<TestItem> consumer) 

        //Assume below tests are successfully, below tests are recommended, but not required.

        //Connection test

        //TODO execute connection test here

        consumer.accept(testItem(TestItem.ITEM_CONNECTION, TestItem.RESULT_SUCCESSFULLY));

        //Login test

        //TODO execute login test here

        consumer.accept(testItem(TestItem.ITEM_LOGIN, TestItem.RESULT_SUCCESSFULLY));

        //Read test

        //TODO execute read test by checking role permission

        consumer.accept(testItem(TestItem.ITEM_READ, TestItem.RESULT_SUCCESSFULLY));

        //Write test

        //TODO execute write test by checking role permission

        consumer.accept(testItem(TestItem.ITEM_WRITE, TestItem.RESULT_SUCCESSFULLY));

    


 private void writeRecord(TapConnectorContext connectorContext, List<TapRecordEvent> tapRecordEvents, Consumer<WriteListResult<TapRecordEvent>> writeListResultConsumer) 

        //TODO write records into database

        //Below is sample code to print received events which suppose to write to database.

        AtomicLong inserted = new AtomicLong(0); //insert count

        AtomicLong updated = new AtomicLong(0); //update count

        AtomicLong deleted = new AtomicLong(0); //delete count

        for(TapRecordEvent recordEvent : tapRecordEvents) 

            if(recordEvent instanceof TapInsertRecordEvent) 

                //TODO insert record

                inserted.incrementAndGet();

             else if(recordEvent instanceof TapUpdateRecordEvent) 

                //TODO update record

                updated.incrementAndGet();

             else if(recordEvent instanceof TapDeleteRecordEvent) 

                //TODO delete record

                deleted.incrementAndGet();

            

        

        //Need to tell incremental engine the write result

        writeListResultConsumer.accept(writeListResult()

                .insertedCount(inserted.get())

                .modifiedCount(updated.get())

                .removedCount(deleted.get()));

    

 private void queryByFilter(TapConnectorContext connectorContext, List<TapFilter> filters, Consumer<List<FilterResult>> listConsumer)

        //Filter is exactly match.

        //If query by the filter, no value is in database, please still create a FitlerResult with null value in it. So that flow engine can understand the filter has no value.

    


  • 目标数据库须建表时

./bin/tap template --type targetNeedTable --group io.tapdata --name XDB --version 0.0.1 --output ./connectors

用 ItelliJ IDEA 打开 idaas-pdk, 在 idaas-pdk/connectors 下就能看见 xdb-connector 工程。

  • 在 spec.json 里填写 configOptions

configOptions 集成到 Tapdata 站点之后, 配置给用户在使用该 Connector 的时候的输入项, 例如数据库的连接地址、用户名、密码等


   ...

   "configOptions":

      "connection":

         "type":"object",

         "properties":

            "host":

               "type": "string",

               "title": "Host",

               "x-decorator": "FormItem",

               "x-component": "Input"

            , 

            "port":

               "type": "number",

               "title": "Port",

               "x-decorator": "FormItem",

               "x-component": "Input"

                 
         
      
   

  • 在 spec.json 里填写 dataTypes(类型表达式)

dataTypes 用于描述该 Connector 接入数据库的所有字段的范围,以及转换到对应的 TapType。 源端数据库也会提供相同的 dataTypes 描述, 这样当源端数据流入到 Tapdata 里时, 会结合源端 dataTypes 的字段描述信息结合源端库表的字段信息, 通过 Tapdata 的中立数据结构进入到 Tapdata 的数据流中, 当数据要流入到目标数据库之前,Tapdata 会根据这些信息, 在目标库的 dataTypes 中找到最佳的存储字段, 通过 TapField 的 originType 告知给 PDK 开发者, 用以建表。



   ...

   "dataTypes":

      "boolean":"bit":8, "unsigned":"", "to":"TapNumber",

      "tinyint":"bit":8, "to":"TapNumber",

      "smallint":"bit":16, "to":"TapNumber",

      "int":"bit":32, "to":"TapNumber",

      "bigint":"bit":64, "to":"TapNumber",

      "largeint":"bit":128, "to":"TapNumber",

      "float":"bit":32, "to":"TapNumber",

      "double":"bit":64, "to":"TapNumber",

      "decimal[($precision,$scale)]":"bit": 128, "precision": [1, 27], "defaultPrecision": 10, "scale": [0, 9], "defaultScale": 0, "to": "TapNumber",

      "date":"byte":3, "range":["0000-01-01", "9999-12-31"], "to":"TapDate",

      "datetime":"byte":8, "range":["0000-01-01 00:00:00","9999-12-31 23:59:59"],"to":"TapDateTime",

      "char[($byte)]":"byte":255, "to": "TapString", "defaultByte": 1,

      "varchar[($byte)]":"byte":"65535", "to":"TapString",

      "string":"byte":"2147483643", "to":"TapString",

      "HLL":"byte":"16385", "to":"TapNumber", "queryOnly":true

   


  • 编写接入目标数据库的代码
 @TapConnectorClass("spec.json")

public class XDBConnector extends ConnectorBase implements TapConnector 
 @Override

 public void discoverSchema(TapConnectionContext connectionContext, Consumer<List<TapTable>> consumer) 
 //TODO Load schema from database, connection information in connectionContext#getConnectionConfig
 //Sample code shows how to define tables with specified fields
 consumer.accept(list(
 //Define first table
 table("empty-table1")
 //Define a field named "id", origin field type, whether is primary key and primary key position

 .add(field("id", "varchar").isPrimaryKey(true).partitionKeyPos(1))

 .add(field("description", "string"))

 .add(field("name", "varchar"))

 .add(field("age", "int")))

 ));

 

 @Override

 public void connectionTest(TapConnectionContext connectionContext, Consumer<TestItem> consumer) 

 //Assume below tests are successfully, below tests are recommended, but not required.
 //Connection test
 //TODO execute connection test here
 consumer.accept(testItem(TestItem.ITEM_CONNECTION, TestItem.RESULT_SUCCESSFULLY));
 //Login test

 //TODO execute login test here

 consumer.accept(testItem(TestItem.ITEM_LOGIN, TestItem.RESULT_SUCCESSFULLY));

 //Read test

 //TODO execute read test by checking role permission
 consumer.accept(testItem(TestItem.ITEM_READ, TestItem.RESULT_SUCCESSFULLY));

 //Write test
 //TODO execute write test by checking role permission
 consumer.accept(testItem(TestItem.ITEM_WRITE, TestItem.RESULT_SUCCESSFULLY));

 

 @Override

 public void registerCapabilities(ConnectorFunctions connectorFunctions, TapCodecRegistry codecRegistry) 

 connectorFunctions.supportWriteRecord(this::writeRecord);
 connectorFunctions.supportQueryByFilter(this::queryByFilter);

 //If database need insert record before table created, then please implement the below two methods.
 connectorFunctions.supportCreateTable(this::createTable);
 connectorFunctions.supportDropTable(this::dropTable);

 //If database need insert record before table created, please implement the custom codec for the TapValue that data types in spec.json didn't cover.

 //TapTimeValue, TapMapValue, TapDateValue, TapArrayValue, TapYearValue, TapNumberValue, TapBooleanValue, TapDateTimeValue, TapBinaryValue, TapRawValue, TapStringValue

 codecRegistry.registerFromTapValue(TapRawValue.class, "text", tapRawValue -> 

 if (tapRawValue != null && tapRawValue.getValue() != null)

 return toJson(tapRawValue.getValue());

 return "null";

 );

 


 private void writeRecord(TapConnectorContext connectorContext, List<TapRecordEvent> tapRecordEvents, Consumer<WriteListResult<TapRecordEvent>> writeListResultConsumer) 

 //TODO write records into database

 //Below is sample code to print received events which suppose to write to database.

 AtomicLong inserted = new AtomicLong(0); //insert count

 AtomicLong updated = new AtomicLong(0); //update count

 AtomicLong deleted = new AtomicLong(0); //delete count

 for(TapRecordEvent recordEvent : tapRecordEvents) 

 if(recordEvent instanceof TapInsertRecordEvent) 

 //TODO insert record

 inserted.incrementAndGet();

 PDKLogger.info(TAG, "Record Write TapInsertRecordEvent ", toJson(recordEvent));

  else if(recordEvent instanceof TapUpdateRecordEvent) 

 //TODO update record

 updated.incrementAndGet();

 PDKLogger.info(TAG, "Record Write TapUpdateRecordEvent ", toJson(recordEvent));

  else if(recordEvent instanceof TapDeleteRecordEvent) 

 //TODO delete record

 deleted.incrementAndGet();

 PDKLogger.info(TAG, "Record Write TapDeleteRecordEvent ", toJson(recordEvent));

 

 

 //Need to tell incremental engine the write result

 writeListResultConsumer.accept(writeListResult()

 .insertedCount(inserted.get())

 .modifiedCount(updated.get())

 .removedCount(deleted.get()));

 

 private void queryByFilter(TapConnectorContext connectorContext, List<TapFilter> filters, Consumer<List<FilterResult>> listConsumer)

 //Filter is exactly match.

 //If query by the filter, no value is in database, please still create a FitlerResult with null value in it. So that flow engine can understand the filter has no value.

 

 private void dropTable(TapConnectorContext connectorContext, TapDropTableEvent dropTableEvent) 

 TapTable table = connectorContext.getTable();

 //TODO implement drop table

 

 private void createTable(TapConnectorContext connectorContext, TapCreateTableEvent createTableEvent) 

 //TODO implement create table.

 TapTable table = connectorContext.getTable();

 LinkedHashMap<String, TapField> nameFieldMap = table.getNameFieldMap();

 for(Map.Entry<String, TapField> entry : nameFieldMap.entrySet()) 

 TapField field = entry.getValue();

 String originType = field.getOriginType();

 //originType is the data types defined in spec.json

 //TODO use the generated originType to create table.

 

 

 

开发完成之后通过 TDD 进行测试验证

提供 configOptions 里需要用户填写的内容的 json 文件, 例如上述 configOptions 里要求用户填写的是数据库的 Host 和 Port, 那么 tdd 的 xdb_tdd.json 文件内容如下:


    "connection": 

      "host": "192.168.153.132",

      "port": 9030,

    

执行 TDD 测试命令:

./bin/tap tdd --testConfig xdb_tdd.json ./connectors/xdb-connector

当 TDD 测试没有通过, 请根据错误提示修改对应错误,直至通过 TDD测试;

当 TDD 测试通过后, PDK Connector 就处于可以提交 Pull Request 的状态。

如何提交到 PDK 开源项目

① fork idaas-pdk, 基于远程的 main 分支建立本地分支

② 根据要接入数据库名称, 在 idaas-pdk/connectors 目录下新建模块, 命名规范为 数据库小写名称-connector, 例如接入数据库的名称为 XDB, 模块名称为 xdb-connector

③ 开发者根据官方 API 文档完成接入数据库的开发实现

④ 通过 TDD 测试后, 提交 PR 到 idaas-pdk

⑤ 官方团队 Review 提交的 PR 之后合并代码

彩蛋

感兴趣的同学先别急着开发,据了解,官方推出的免费版本,已陆续实现30个常见数据源/目标间的实时数据对接,如果其中已经包含了你想要接入的数据库,完全可以直接使用 Tapdata Cloud(Tapdata Cloud | 免费的异构数据库实时同步云平台 - Tapdata)进行免费数据实时同步。当然,如果你的需求现阶段还不被支持,就可以通过Tapdata PDK 自助开发,快速接入。

Tapdata Cloud 现阶段支持的数据连接类型

目前,Tapdata 开放了面向开发者的插件生态共建群,开发过程中可以提供技术交流和支持,感兴趣的同学可以扫码关注,拉你入群:

CSDN 社区图书馆,开张营业! 深读计划,写书评领图书福利~

又快又好,行人检测和人脸检测和人脸关键点检测(c++/android源码)(代码片段)

又快又好,行人检测(人体检测)和人脸检测和人脸关键点检测(C++/Android)目录又快又好,行人检测(人体检测)和人脸检测和人脸关键点检测(C++/Android)1.前言2.项目说明... 查看详情

怎么让word编辑公式又快又好

...几个简单的公式。那这篇文章就告诉Word公式怎么编辑才又快又好。想要在Word中编辑出完美的公式,那MathType是一个非常有必要的工具。如 查看详情

重建大师姿态角辅助功能,“又快又好”实现倾斜空三处理

...为观测值参与影像定向,但现有建模软件在处理过程中仅利用了位置参数,姿态角元素信息没有得到有效利用。对此,我们在重建大师中加入了姿态角辅助空三的相应功能,将获取的每张像片的外方位元素作为带权观测值参与摄... 查看详情

电商搜索如何让你买得又快又好「整流程」(五)

参考技术A其它相关文章整理:https://zhuanlan.zhihu.com/p/51015148本文介绍用户感知较弱的召回和排序模块,主要以技术方案和实现为主进行介绍(主要为下图中,搜索服务的一些工作)。这一过程和推荐非常类似,区别主要为召回源更多... 查看详情

〖产品思维训练白宝书-核心竞争力篇⑯〗-产品经理核心竞争力解读之如何学习的又快又好并学以致用(代码片段)

大家好,我是哈士奇,一位工作了十年的"技术混子",致力于为开发者赋能的UP主,目前正在运营着TFS_CLUB社区。💬人生格言:优于别人,并不高贵,真正的高贵应该是优于过去的自己。💬📫如果文... 查看详情

yolov6又快又准的目标检测框架已开源

siou精度是最高的,其次是yoloe,但是没开源:即插即用|SIoU实现50.3AP+7.6ms检测速度精度、速度完美超越YoloV5、YoloX_AI视觉网奇的博客-CSDN博客先看YOLOv6精度:ModelSizemAPval0.5:0.95SpeedV100fp16b32(ms)SpeedV100fp32b32(ms)Spe 查看详情

最终选型blazor.server:又快又稳!

书接上文,昨天我们快速的走了一遍wasm的开发流程(我的『MVP.Blazor』快速创建与部署),总体来说还是很不错的,无论是从技术上,还是从开发上,重点是用C#来开启前端时代,可以开发SPA单页面... 查看详情

刷脸认证如何实现人脸又快又准完成校验?

...快速检票等。与此同时也伴随了很多安全问题,首要就是如何判断用户的真实性。HMSCore机器学习服务(MLKit)的人脸比对和 查看详情

sass

Sass的作用是,帮助我们更快地(又快)写出具有高可维护性(又好)的CSS代码。直白点说就是,用了sass,写起样式来,脚步带风,效率提升,另一方面,sass在保证你“车速”的同时,还尽量减少你“翻车”的概率,你的样式代... 查看详情

长达1.7万字的explain关键字指南!(代码片段)

...语句,最后会输出一系列的指标告诉我们这条语句的性能如何,如下图所示。mysql>explainselect*fromstudentwhereid=1\\G******************************************************id:1select_type:SIMPLEtable:subjectpartitions:NULLtype:constpossible_keys:PRIMARYkey:PRIMARYkey_le... 查看详情

行人检测和人脸检测和人脸关键点检测(c++/android源码)(代码片段)

又快又好,行人检测(人体检测)和人脸检测和人脸关键点检测(C++/Android)目录又快又好,行人检测(人体检测)和人脸检测和人脸关键点检测(C++/Android)1.前言2.项目说明... 查看详情

yolov6:又快又准的目标检测框架开源啦

...平台的部署,极大简化工程部署时的适配工作。特此开源,希望能帮助到更多的同学。1.概述YOLOv6是美团视觉智能部研发的一款目标检测框架,致力于工业应用。本框架同时专注于检测的精度和推理效率,在工业... 查看详情

华为云技术分享介绍一个又快又准的截图骚操作

...屏,有时候需要截个网页屏,方式有很多,各种快捷键和插件也都能够办到。但下面这个情况不知道大家会怎么来做。需求切入有一天,我在电脑上看到了一条微博,或者一篇文章。比如微博像这样:比如文章像这样:这时候我... 查看详情

代码写的又好又快的秘诀

好与快的区别就是代码质量的区别,在保证一定代码质量的前提下,去追求快。代码时间花在 分析,测试,调试 上面代码阅读法:不管你的调试技巧怎么样,都没有一次性写好来的高效常见方法:第一遍:检查语法、代... 查看详情

微信小游戏爆发式增长,如何保证小游戏的版本迭代又快又稳(代码片段)

...。微信小游戏市场一直都充满着希望与竞争,开发者如何在爆品争霸中脱颖而出呢?在小游戏开发中有哪些传统开发经验可以借鉴与学习呢?我们特邀腾讯云TVP、计算机作家/讲师李艺老师,在他新书《微信小游戏... 查看详情

别乱用,这样打日志定位bug又快又准!(代码片段)

点击关注公众号,实用技术文章及时了解来源:blog.csdn.net/linsongbin1/article/details/90349661概述日常工作中,程序员需要经常处理线上的各种大小故障,如果业务代码没打印日志或者日志打印的不好,会极大的加大... 查看详情

用敏捷的devops拳打研发低效脚踢管控不足

...个运行环境缺乏统一管理,资源申请和获取周期长,资源利用率低也是当前运行管理中频繁出现的问题。问题1研发过程无法清晰度量、查看和 查看详情

软件工程第二周作业

...能提高就有可能伴随着程序的稳定性的降低,这两者应该如何权衡呢?2.关于5.3.5老板驱动的流程,这种开发流程模式存在着一些问题,那要如何解决这些问题呢?这种模式当然也有它的问题。领导对许多技术细节是外行。领导... 查看详情