我在哪里编写 kafka 连接接收器自定义分区器的代码?

     2023-03-23     153

关键词:

【中文标题】我在哪里编写 kafka 连接接收器自定义分区器的代码?【英文标题】:Where do I write the code for kafka connect sink custom partitioner? 【发布时间】:2021-01-27 16:38:58 【问题描述】:

这可能是一个非常简单的问题,所以我会提前道歉。 我正在为一个 kafka 主题添加一个 s3 sink 连接器,conf 文件在这里:


  "schemas.enable": "false",
  "name": "my-s3-sink",
  "connector.class": "io.confluent.connect.s3.S3SinkConnector",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "org.apache.kafka.connect.storage.StringConverter",
  "errors.log.enable": "true",
  "errors.log.include.messages": "true",
  "topics": [
    "my-topic-name"
  ],
  "errors.deadletterqueue.context.headers.enable": "true",
  "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
  "flush.size": "2000",
  "rotate.schedule.interval.ms": "600000",
  "s3.bucket.name": "my-bucket-name",
  "s3.object.tagging": "true",
  "s3.region": "region",
  "s3.part.size": "5242880",
  "aws.access.key.id": "****",
  "aws.secret.access.key": "****",
  "s3.ssea.name": "AES256",
  "s3.compression.type": "gzip",
  "storage.class": "io.confluent.connect.s3.storage.S3Storage",
  "topics.dir": "",
  "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
  "partition.duration.ms": "3600000",
  "path.format": "YYYY/MM/dd/HH",
  "locale": "en-GB",
  "timezone": "UTC"

这会以topic_name/YYYY/MM/DD/HH/message 格式输出消息,我希望密钥为YYYY/MM/DD/HH/message。经过一些研究,我发现为了从键中删除主题名称,我必须编写一个自定义分区器来扩展和覆盖 TimeBasedPartitioner 的部分内容。 (这里是一个例子https://github.com/confluentinc/kafka-connect-storage-cloud/issues/321)

我的问题是我现在不知道在哪里为该分区程序编写实际代码,它应该放在哪里?基于时间的分区器似乎链接到 confluent 拥有的某种注册表,但是自定义分区器会去哪里以及如何在连接器的 conf 文件中引用该代码?

【问题讨论】:

【参考方案1】:

您在单独的项目中编写代码,将其编译为 JAR,然后将其放在每个连接工作程序的类路径中。

那你可以参考partitioner.class

【讨论】:

编写自定义 Kafka 序列化器

...KafkaSerializer【发布时间】:2014-07-0811:44:45【问题描述】:我在Kafka消息中使用我自己的类,它有一堆字符串数据类型。因此,我不能使用默认的序列化程序类或Kafka库附带的StringSerializer。我想我需要编写自己的序列化程序并将其... 查看详情

Apache Kafka 的自定义连接器

...时间】:2022-01-0908:27:24【问题描述】:我希望为ApacheKafka编写一个自定义连接器,以连接到SQL数据库以获取CDC数据。我想编写一个自定义连接器,这样我就可以使用一个连接器连接到多个数据库,因为所有市场连接器只为每个连... 查看详情

具有自定义消费者组名称的 Kafka Sink 连接器

...ctor_name。但我想使用自定义名称作为前缀。(我们可以在接收器配置中做-nameproperties,但要寻找 查看详情

kafka自定义分区器

packagecn.xiaojf.kafka.producer;importorg.apache.kafka.clients.producer.Partitioner;importorg.apache.kafka.common.Cluster;importorg.apache.kafka.common.PartitionInfo;importorg.apache.kafka.common.util 查看详情

kafka自定义分区规则

(一)生产者生成数据的数据,按自定义key做hashcode进行分区1.Kafka中可以将Topic从物理上划分成一个或多个分区(Partition) 每个分区在物理上对应一个文件夹,以"topicName_partitionIndex"方式命名 文件夹下存储这个分区... 查看详情

我在哪里放置 C++ 自定义异常代码?

】我在哪里放置C++自定义异常代码?【英文标题】:WheredoIputC++CustomExceptionCode?【发布时间】:2018-09-2720:16:27【问题描述】:我正在为我编写的生成随机短语的类编写自定义异常。我是C++新手,我想知道是否应该将异常放在Classes... 查看详情

Kafka Connect S3 sink 连接器与自定义 Partitioner 奇怪行为

】KafkaConnectS3sink连接器与自定义Partitioner奇怪行为【英文标题】:KafkaConnectS3sinkconnectorwithcustomPartitionerstrangebehavior【发布时间】:2020-10-2922:26:01【问题描述】:我计划使用自定义字段和基于时间的分区器在s3中对我的数据进行分... 查看详情

kafka2.5.0自定义分区器(代码片段)

自定义分区器:importorg.apache.kafka.clients.producer.Partitioner;importorg.apache.kafka.common.Cluster;importorg.apache.kafka.common.PartitionInfo;importjava.util.List;importjava.util.Map;/***@authorKing老师* 查看详情

调试自定义 Kafka 连接器的简单有效方法是啥?

...18-01-2420:07:44【问题描述】:我正在使用几个Kafka连接器,我在控制台输出中看不到它们的创建/部署有任何错误,但是我没有得到我正在寻找的结果(没有任何结果),希望或其他)。我基于Kaf 查看详情

重置为 Kafka 分区中的自定义偏移量

】重置为Kafka分区中的自定义偏移量【英文标题】:ResettocustomoffsetinKafkapartition【发布时间】:2020-02-2407:13:53【问题描述】:我正在针对我正在研究的特定用例研究Kafka。我有一个正在流动的数据流,我想对其进行处理并将其发布... 查看详情

断开/重新连接后自定义接收器不调用 onLoad

】断开/重新连接后自定义接收器不调用onLoad【英文标题】:Customreceiverdoesn\'tcallonLoadafterdisconnect/reconnect【发布时间】:2017-03-2021:36:19【问题描述】:当我在没有关闭Android应用程序的接收器的情况下断开连接并重新连接时,投射... 查看详情

SMT 通过连接器配置创建 kafka 连接器字符串分区键

】SMT通过连接器配置创建kafka连接器字符串分区键【英文标题】:SMT\'stocreatekafkaconnectorstringpartitionkeythroughconnectorconfig【发布时间】:2021-09-0601:40:51【问题描述】:我一直在为PostgreSQL实现一个kafka连接器(我正在使用debeziumkafka连... 查看详情

使用自定义目标接收器将日志导出到 BigQuery(表分区)

】使用自定义目标接收器将日志导出到BigQuery(表分区)【英文标题】:ExportingLogtoBigQuery(tablepartitioned)usingcustomdestinationsink【发布时间】:2020-07-0505:48:15【问题描述】:大家好。我想创建一个“接收器”(日志记录服务)来将日... 查看详情

如何使用sparkstreaming接收kafka中发送的自定义对象

参考技术A若将Spark作业以yarncluster模式提交到Yarn,由Yarn启动Spark作业,在某个子节点的Executor会监听该端口,接收数据。 查看详情

Confluent 云 S3 接收器连接器 - S3 对象的自定义对象名称

】Confluent云S3接收器连接器-S3对象的自定义对象名称【英文标题】:ConfluentcloudS3sinkconnector-customobjectnameforS3Objects【发布时间】:2021-10-1908:22:30【问题描述】:我的Kafka消息将包含UUID,它将成为所有未来通信的标识符。我为我的主... 查看详情

在 k8s 上使用 Strimzi 部署远程调试自定义 Kafka 连接器

...:41【问题描述】:我想远程调试我的自定义连接器,它是我在Kubernetes上部署的StrimziKafka操作员部署的一部分。在本地(例如使用docker映像),这可以通过添加JAVA_TOOL 查看详情

spark自定义分区器

...器:  Spark目前支持Hash分区和Range分区,用户也可以自定义分区,Hash分区为当前的默认分区,Spark中分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle过程属于哪个分区和Reduce的个数。只有Key-Value类型的RDD才有分区器... 查看详情

在 Amazon QuickSight 中使用自定义 SQL 时,连接在哪里执行?

...azonQuickSight?【发布时间】:2021-01-0919:33:01【问题描述】:我在AmazonQuickSight中使用自定义SQL来连接来自RedShift的多个表。我想知道连接发生在哪里,QuickSight是否将查询发送到RedShift 查看详情