springboot自定义kafka消费者配置containerfactory最佳实践

上帝爱吃苹果      2022-05-05     713

关键词:

Spring Boot 自定义kafka 消费者配置 ContainerFactory最佳实践

本篇博文主要提供一个在 SpringBoot 中自定义 kafka配置的实践,想象这样一个场景:你的系统需要监听多个不同集群的消息,在不同的集群中topic冲突了,所以你需要分别定义kafka消息配置。

此篇文章会在SpringBoot 提供的默认模板上提供扩展,不会因为你自定义了消费者配置,而导致原生SpringBoot的Kakfa模板配置失效。

引入 MAVEN 依赖

版本需要你自己指定

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>xxx</version>
</dependency>

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>xxx</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>xxx</version>
</dependency>

引入Java配置类

/**
 * 手动自定义 kafka 消费者 ContainerFactory 配置demo
 */
@Configuration
@EnableConfigurationProperties(KafkaProperties.class)
public class KafkaConsumerConfig {

    @Autowired
    private KafkaProperties properties;

    @Value("${监听服务地址}")
    private List<String> myServers;

    @Bean("myKafkaContainerFactory")
    @ConditionalOnBean(ConcurrentKafkaListenerContainerFactoryConfigurer.class)
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, consumerFactory());
        return factory;
    }

    //获得创建消费者工厂
    public ConsumerFactory<Object, Object> consumerFactory() {
        KafkaProperties myKafkaProperties = JSON.parseObject(JSON.toJSONString(this.properties), KafkaProperties.class);
        //对模板 properties 进行定制化
        //....
        //例如:定制servers
        myKafkaProperties.setBootstrapServers(myServers);
        return new DefaultKafkaConsumerFactory<>(myKafkaProperties.buildConsumerProperties());
    }

}

yml模板

#kafka配置,更多配置请参考:KafkaProperties
spring.kafka:
  #公共参数,其他的timeout.ms, request.timeout.ms, metadata.fetch.timeout.ms保持默认值
  properties:
    #这个参数指定producer在发送批量消息前等待的时间,当设置此参数后,即便没有达到批量消息的指定大小(batch-size),到达时间后生产者也会发送批量消息到broker。默认情况下,生产者的发送消息线程只要空闲了就会发送消息,即便只有一条消息。设置这个参数后,发送线程会等待一定的时间,这样可以批量发送消息增加吞吐量,但同时也会增加延迟。
    linger.ms: 50 #默认值:0毫秒,当消息发送比较频繁时,增加一些延迟可增加吞吐量和性能。
    #这个参数指定producer在一个TCP connection可同时发送多少条消息到broker并且等待broker响应,设置此参数较高的值可以提高吞吐量,但同时也会增加内存消耗。另外,如果设置过高反而会降低吞吐量,因为批量消息效率降低。设置为1,可以保证发送到broker的顺序和调用send方法顺序一致,即便出现失败重试的情况也是如此。
    #注意:当前消息符合at-least-once,自kafka1.0.0以后,为保证消息有序以及exactly once,这个配置可适当调大为5。
    max.in.flight.requests.per.connection: 1 #默认值:5,设置为1即表示producer在connection上发送一条消息,至少要等到这条消息被broker确认收到才继续发送下一条,因此是有序的。

  #生产者的配置,可参考org.apache.kafka.clients.producer.ProducerConfig
  producer:
    #这个参数可以是任意字符串,它是broker用来识别消息是来自哪个客户端的。在broker进行打印日志、衡量指标或者配额限制时会用到。
    clientId: ${spring.application.name} #方便kafkaserver打印日志定位请求来源
    bootstrap-servers: 127.0.0.1:8080 #kafka服务器地址,多个以逗号隔开
    #acks=0:生产者把消息发送到broker即认为成功,不等待broker的处理结果。这种方式的吞吐最高,但也是最容易丢失消息的。
    #acks=1:生产者会在该分区的leader写入消息并返回成功后,认为消息发送成功。如果群首写入消息失败,生产者会收到错误响应并进行重试。这种方式能够一定程度避免消息丢失,但如果leader宕机时该消息没有复制到其他副本,那么该消息还是会丢失。另外,如果我们使用同步方式来发送,延迟会比前一种方式大大增加(至少增加一个网络往返时间);如果使用异步方式,应用感知不到延迟,吞吐量则会受异步正在发送中的数量限制。
    #acks=all:生产者会等待所有副本成功写入该消息,这种方式是最安全的,能够保证消息不丢失,但是延迟也是最大的。
    #如果是发送日志之类的,允许部分丢失,可指定acks=0,如果想不丢失消息,可配置为all,但需密切关注性能和吞吐量。
    acks: all #默认值:1
    #当生产者发送消息收到一个可恢复异常时,会进行重试,这个参数指定了重试的次数。在实际情况中,这个参数需要结合retry.backoff.ms(重试等待间隔)来使用,建议总的重试时间比集群重新选举leader的时间长,这样可以避免生产者过早结束重试导致失败。
    #另外需注意,当开启重试时,若未设置max.in.flight.requests.per.connection=1,则可能出现发往同一个分区的两批消息的顺序出错,比如,第一批发送失败了,第二批成功了,然后第一批重试成功了,此时两者的顺序就颠倒了。
    retries: 2  #发送失败时重试多少次,0=禁用重试(默认值)
    #默认情况下消息是不压缩的,此参数可指定采用何种算法压缩消息,可取值:none,snappy,gzip,lz4。snappy压缩算法由Google研发,这种算法在性能和压缩比取得比较好的平衡;相比之下,gzip消耗更多的CPU资源,但是压缩效果也是最好的。通过使用压缩,我们可以节省网络带宽和Kafka存储成本。
    compressionType: "none" #如果不开启压缩,可设置为none(默认值),比较大的消息可开启。
    #当多条消息发送到一个分区时,Producer会进行批量发送,这个参数指定了批量消息大小的上限(以字节为单位)。当批量消息达到这个大小时,Producer会一起发送到broker;但即使没有达到这个大小,生产者也会有定时机制来发送消息,避免消息延迟过大。
    batch-size: 16384 #默认16K,值越小延迟越低,但是吞吐量和性能会降低。0=禁用批量发送
    #这个参数设置Producer暂存待发送消息的缓冲区内存的大小,如果应用调用send方法的速度大于Producer发送的速度,那么调用会阻塞一定(max.block.ms)时间后抛出异常。
    buffer-memory: 33554432 #缓冲区默认大小32M
  #消费者的配置,可参考:org.apache.kafka.clients.consumer.ConsumerConfig
  consumer:
    #这个参数可以为任意值,用来指明消息从哪个客户端发出,一般会在打印日志、衡量指标、分配配额时使用。
    #暂不用提供clientId,2.x版本可放出来,1.x有多个topic且concurrency>1会出现JMX注册时异常
    #clientId: ${spring.application.name} #方便kafkaserver打印日志定位请求来源
    # 签中kafka集群
    bootstrap-servers: 127.0.0.1:8080 #kafka服务器地址,多个以逗号隔开
    #这个参数指定了当消费者第一次读取分区或者无offset时拉取那个位置的消息,可以取值为latest(从最新的消息开始消费),earliest(从最老的消息开始消费),none(如果无offset就抛出异常)
    autoOffsetReset: latest #默认值:latest
    #这个参数指定了消费者是否自动提交消费位移,默认为true。如果需要减少重复消费或者数据丢失,你可以设置为false,然后手动提交。如果为true,你可能需要关注自动提交的时间间隔,该间隔由auto.commit.interval.ms设置。
    enable-auto-commit: false
    #周期性自动提交的间隔,单位毫秒
    auto-commit-interval: 2000 #默认值:5000
    #这个参数允许消费者指定从broker读取消息时最小的Payload的字节数。当消费者从broker读取消息时,如果数据字节数小于这个阈值,broker会等待直到有足够的数据,然后才返回给消费者。对于写入量不高的主题来说,这个参数可以减少broker和消费者的压力,因为减少了往返的时间。而对于有大量消费者的主题来说,则可以明显减轻broker压力。
    fetchMinSize: 1 #默认值: 1
    #上面的fetch.min.bytes参数指定了消费者读取的最小数据量,而这个参数则指定了消费者读取时最长等待时间,从而避免长时间阻塞。这个参数默认为500ms。
    fetchMaxWait: 500 #默认值:500毫秒
    #这个参数控制一个poll()调用返回的记录数,即consumer每次批量拉多少条数据。
    maxPollRecords: 500 #默认值:500
  listener:
    #创建多少个consumer,值必须小于等于Kafk Topic的分区数。
    ack-mode: MANUAL_IMMEDIATE
    concurrency: 1  #推荐设置为topic的分区数

配置释义

点开 KafkaProperties 这个类,可以看到这个是SpringBoot 自动配置kafka的配置类,引入这个实例,就相当于你拿到了SpringBoot kafka配置模板的参数,就是上述贴的配置,然后再此基础上重新定义你需要改变的配置,这里主要讲消费者配置。

代码中举了个重写监听servers的例子:

//例如:定制servers
myKafkaProperties.setBootstrapServers(myServers);

@KafkaListener 使用 containerFactory

@Slf4j
@Component
public class ConsumerDemo {

    //声明consumerID为demo,监听topicName为topic.quick.demo的Topic
    //这个消费者的 containerFactory 是SpringBoot 提供的 kafkaListenerContainerFactory 这个bean
    @KafkaListener(id = "demo", topics = "topic.quick.demo")
    public void listen(String msgData) {
        log.info("demo receive : " + msgData);
    }

    @KafkaListener(topics = "k010", containerFactory = "myKafkaContainerFactory")
    public void listen(String msgData, Acknowledgment ack) {
        log.info("demo receive : " + msgData);
        //手动提交
        //enable.auto.commit参数设置成false。那么就是Spring来替为我们做人工提交,从而简化了人工提交的方式。
        //所以kafka和springboot结合中的enable.auto.commit为false为spring的人工提交模式。
        //enable.auto.commit为true是采用kafka的默认提交模式。
        ack.acknowledge();
    }
}

如果在@KafkaListener属性中没有指定 containerFactory 那么Spring Boot 会默认注入 name 为“kafkaListenerContainerFactory” 的 containerFactory。具体源码可跟踪:KafkaListenerAnnotationBeanPostProcessor中的常量:

public static final String DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME = "kafkaListenerContainerFactory";

具有自定义消费者组名称的 Kafka Sink 连接器

】具有自定义消费者组名称的KafkaSink连接器【英文标题】:KafkaSinkConnectorwithcustomconsumer-groupname【发布时间】:2021-10-2615:12:11【问题描述】:在kafkaconnect中,所有的sink连接器都会使用不同的组,命名转换为connect-connector_name。但我... 查看详情

kafka参数在线修改

...t为topic名称#retention.ms保留时间24小时这样就可以保证当前消费者不会再消费阻塞的数据了首先是可以在配置文件中设置全局性的topic配置参数,其次是可以在创建topic时使用–confi设置一个或多个自定义的配置。自定义的配置... 查看详情

kafka--06---springboot中使⽤kafka(代码片段)

...自动生成,如何生成可参考右边的帮助文档文章目录Springboot中使⽤Kafka1.引⼊依赖2.编写配置⽂件-----yml3.消息⽣产者4.消费者5.消费者中配置消费主题、分区和偏移量Springboot中使⽤Kafka1.引⼊依赖<dependency><groupId>org.spri... 查看详情

springboot整合kafka(代码片段)

目录前言一、SpringBoot怎么整合Kafka?二、使用步骤1.YML文件的配置2.消费者的config3.如何取出kafka数据4.pom文件的配置总结前言SpringBoot整合Kafka监听拿取数据提示:以下是本篇文章正文内容,下面案例可供参考一、SpringBo... 查看详情

flink自定义metrics监控kafka消费(代码片段)

...实现记录一下。有部分代码引用了:Flink监控:自定义消费延迟Metrics二实现1. CustomerJsonDeserializationimportorg.apache.commons.lang3.ThreadU 查看详情

springboot集成kafka(代码片段)

SpringBoot集成Kafka知识索引SpringBoot集成Kafka工程搭建SpringBoot集成Kafka配置SpringBoot集成Kafka生产消息SpringBoot集成Kafka消费消息  1生产者SpringBoot集成Kafka,无非就是生产者和消费者,但首先得实现SpringBoot集成,流程如下&... 查看详情

在 Spring Boot 中控制启用/禁用 Kafka 消费者

】在SpringBoot中控制启用/禁用Kafka消费者【英文标题】:Controlenabling/disablingKafkaconsumersinSpringBoot【发布时间】:2019-06-2221:52:44【问题描述】:我在SpringBoot中配置了几个Kafka消费者。这就是kafka.properties的样子(这里只列出一个消费... 查看详情

@StreamListener 为 kafka 定义 groupId-如何为同一主题设置多个消费者

...-01-2313:53:56【问题描述】:我在我的应用程序中使用java、springboot和Kafka。我想在Kafka中为同一个主题定义多个消费者。现在,我 查看详情

flink1.14.0消费kafka数据自定义反序列化类

...还是非常大的,那在使用新的API消费kafka数据的时候如何自定义序列化类呢?KafkaSourceKafkaSource<String> source = KafkaSource.<String>b 查看详情

flink实战系列flink1.14.0消费kafka数据自定义反序列化器(代码片段)

Flink1.14.0消费kafka数据自定义反序列类在最近刚发布的Flink1.14.0版本中Source接口进行了重构,API的变化还是非常大的,那在新的接口下消费kafka的时候如何自定义反序列类呢?KafkaSource使用Kafkasource提供了一个构建类来构造KafkaSource的实... 查看详情

springboot实现自定义国际化

...建目录login.properties,login_zh_CN.properties,login_en_US.properties2.SpringBoot自动配置好了管理国际化资源文件的组件;配置前缀spring.messages.basename=i18n.login@Co 查看详情

结合源码讲解:kafka消费者参数配置(解释定义引用注意事项)

原创:石头哥@大数据架构师 2021年8月2日 微信:nevian668899个人背景 电商大数据存储工程师6K+存储集群调优实战者百P存储数据最佳实践精读HDFS源码60%、glusterfs70%,熟悉存储生态技术CephZFSlustre等参数配置源码解读bootstrap.serversbro... 查看详情

kafka2.5.0自定义数据序列化类(代码片段)

...高。小知识:Kafka支持Avro序列化器,比较适用于生产者和消费者在版本升级差距拉大时使用,但同时要注意性能。参考文章《使用kafka中提供的Avro序列化框架实现序列化》1)自定义 查看详情

springboot整合kafka(代码片段)

目录前言一、SpringBoot怎么整合Kafka?二、使用步骤1.YML文件的配置2.消费者的config3.如何取出kafka数据4.pom文件的配置总结前言SpringBoot整合Kafka监听拿取数据提示:以下是本篇文章正文内容,下面案例可供参考一、SpringBo... 查看详情

springboot整合kafka(代码片段)

目录前言一、SpringBoot怎么整合Kafka?二、使用步骤1.YML文件的配置2.消费者的config3.如何取出kafka数据4.pom文件的配置总结前言SpringBoot整合Kafka监听拿取数据提示:以下是本篇文章正文内容,下面案例可供参考一、SpringBo... 查看详情

springboot整合kafka实现手动提交offset(代码片段)

...提交offset嘛。那么具体代码该怎么写呢?本文就基于springboot来进行消费者手动提交offset的试验。配置application.ymlspring:kafka:#指定kafk 查看详情

springboot中级教程之springboot自定义配置(代码片段)

SpringBoot中级教程之SpringBoot自定义配置(十一)前言首先力推下我的开源项目http://git.oschina.net/wangkang_daydayup/swagger-doc这是一个完全利用springboot开发的项目,拯救了springfox-swagger污染代码的问题,完全利用java-doc来实现,... 查看详情

springboot整合kafka实现消息推送

参考技术A本篇文章主要介绍的是springboot整合kafka。1.使用docker安装kafka,移步https://www.jianshu.com/p/89b19f5b28ec创建一个名为springboot-kafka-common的微服务,打包方式为jar,存放一些公共配置和公共类,如util等1.配置pom文件pom文件中以父... 查看详情