kafka2.5.0生产者与消费者配置详解(代码片段)

zhuwenjoyce zhuwenjoyce     2022-11-30     661

关键词:

1)引入maven依赖

我这里使用的是springboot 2.1.3.RELEASE 版本:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

会引入一对的kafka包:

技术图片

 

 2)生产者配置:

所有配置参考kafka-clients-2.5.0.jar包里的org.apache.kafka.clients.producer.ProducerConfig类,并且在该类中可以查看所有配置项的默认值: CONFIG = (new ConfigDef()).define(  这里的define方法的第三个参数就是默认值

application.properties里可以这样配置:

#####################  重要配置  ######################
spring.kafka.producer.bootstrap.servers=192.168.2.60:9092,192.168.2.62:9092
spring.kafka.producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value.serializer=org.apache.kafka.common.serialization.StringSerializer
# acks=0  如果设置为0,生产者将不等待任何来自服务器的确认。每个记录返回的偏移量将始终设置为-1。
# acks=1  这意味着leader确认消息即可,但不等待所有副本的完全确认的情况下进行响应。在这种情况下,如果leader在确认记录后立即失败,但是在副本复制它之前,那么记录将丢失。
# acks=all  不仅需要leader确认收到消息,还将等待全部的副本确认。这保证了只要至少有一个副本保持活动状态,记录就不会丢失。这是最有力的保证。这相当于ack =-1设置。
# acks=-1   跟集群有关
# 默认 1
spring.kafka.producer.acks=1
# 一个批次发送的大小,默认16KB,超过这个大小就会发送数据
spring.kafka.producer.batch.size=16384
# 一个批次最长等待多久就发送数据,默认0,即马上发送
spring.kafka.producer.linger.ms=5000
# 控制生产者最大发送大小,默认 1MB。这个值必须小于kafka服务器server.properties配置文件里的最大可接收数据大小配置:socket.request.max.bytes=104857600 (默认104857600 = 100MB)
spring.kafka.producer.max.request.size=1048576
 
#####################  非重要配置  ######################
# 生产者内存缓冲区大小。默认33554432bytes=32MB
spring.kafka.producer.buffer.memory=33554432
# 发送重试次数,默认 2147483647,接近无限大
spring.kafka.producer.retries=3
# 请求超时时间,默认30秒
spring.kafka.producer.request.timeout.ms=30000
# 默认值5。并发状态下,kafka生产者允许存在最大的kafka服务端未确认接收的消息个数最大值。
# 注意,如果该值设置为1,并且开启重试机制,则会在允许的重试次数内,阻塞其他消息发送到kafka Server端。并且为1的话,会严重影响生产者的吞吐量。仅适用于对数据有严格顺序要求的场景。
spring.kafka.producer.max.in.flight.requests.per.connection=5
# 最大阻塞时间,超过则抛出异常。默认60秒
spring.kafka.max.block.ms=60000
# 数据压缩类型:none、gzip、snappy、lz4、zstd,默认none什么都不做
spring.kafka.compression.type=none

在springboot框架里,手动封装 @bean对象:

@Configuration
@EnableKafka
public class KafkaProducerConfig 
    @Value("$kafka.producer.servers")
    private String servers;
    @Value("$kafka.producer.retries")
    private int retries;
    @Value("$kafka.producer.batch.size")
    private int batchSize;
    @Value("$kafka.producer.linger")
    private int linger;
    @Value("$kafka.producer.buffer.memory")
    private int bufferMemory;

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() 
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(props);
        KafkaTemplate kafkaTemplate
                = new KafkaTemplate<String, String>(factory) ;
        //kafkaTemplate.setProducerListener();
        return kafkaTemplate;
    

  

 

 

3)消费者配置:

 

 

 

end.

kafka2.5.0自定义数据序列化类(代码片段)

...,效率很高。小知识:Kafka支持Avro序列化器,比较适用于生产者和消费者在版本升级差距拉大时使用,但同时要注意性能。参考文章《使用kafka中提供的Avro序列化框架实现序列化》1)自定义 查看详情

kafka2.5.0详解核心配置文件server.properties(代码片段)

$cat-nconfig/server.propertiesbroker.id=0    //brokerID, 集群模式下该ID必须唯一,且永恒不变 listeners=PLAINTEXT://your_host_name:9092    // 配置你的应用所在IP地址,我理解为访问白名 查看详情

kafka2.5.0基本命令(代码片段)

1)启动zookeeper演示用的话,直接启动kafka自带的zookeeper即可:cdkafkaDirectory/kafka_2.12-2.5.0bin/zookeeper-server-start.shconfig/zookeeper.properties生产上建议连接到zookeeper集群,需更改配置文件config/server.properties 里更改zookeepe 查看详情

spring集成rabbitmq配置文件详解(生产者和消费者)(代码片段)

1,首先引入配置文件org.springframework.amqp,如下:<dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId><version>1.7.1.RELEASE</versio 查看详情

“全栈2019”java多线程第二十五章:生产者与消费者线程详解

...DEAv2018.3文章原文链接“全栈2019”Java多线程第二十五章:生产者与消费者线程详解下一章“全栈2019”Java多线程第二十六章:同步方法生产者与消费者线程学习小组加入同步学习小组,共同交流与进步。方式一:关注头条号Gorhaf... 查看详情

生产者消费者模型详解(代码片段)

生产者消费者模型文章目录生产者消费者模型什么是生产者消费者模型基于BlockingQueue的生产者消费者模型单生产者单消费者模型多生产者多消费者模型什么是生产者消费者模型生产者消费者模式就是通过一个容器来解决生产者... 查看详情

生产者-消费者问题详解(代码片段)

1.前言  生产者-消费者问题是经典的线程同步问题(我会用java和c分别实现),主要牵扯到三个点: 一:能否互斥访问共享资源(不能同时访问共享数据); 二:当公共容器满时,生产者能否继续生产(生产者应阻塞并... 查看详情

java面试笔试题代码,先收藏了

...基本概念2.安装与配置3.生产与消费4.服务端参数配置二、生产者1.客户端开发必要的参数配置消息的发送序列化分区器生产者拦截器2.原理分析整体架构元数据的更新3.重要的生产者参数三、消费者1.消费者与消费组2.客户端开发... 查看详情

rabbitmq部署及配置详解(单机)

一、RabbitMQ核心概念1.生产者和消费者Producer:消息的生产者,用于发布消息;Consumer:消息的消费者,用于从队列中获取消息.消费者只需关注队列即可,不需要关注交换机和路由键。消费者可以通过basicConsume(订阅模式可... 查看详情

线程内容详解(代码片段)

...道+锁因为没有锁,所以管道进程不安全管道:socket+pickle生产者与消费者模型解决数据的获取与处理之间时间不对等的情况能够自由的添加生产者与消费者的数量,以达到在相同的事件中,数据的处理速度能达到一个最大值queue=Q... 查看详情

java生产消费模型—arrayblockingqueue详解(代码片段)

...上,服务员负责取走(消费)美食。这里,厨师就扮演着生产者的身份,美食是生产的内容,服务员就扮演 查看详情

rabbitmq基础概念详解——环境配置及模拟生产者和消费者简单消息发送(代码片段)

一、简介:RabbitMq 是实现了高级消息队列协议(AMQP)的开源消息代理中间件。消息队列是一种应用程序对应用程序的通行方式,应用程序通过写消息,将消息传递于队列,由另一应用程序读取完成通信。而作为中间件的Rabbit... 查看详情

kafka2.5.0硬件集群架构图topic主题与partitions分区架构图

 kafkaTopic主题与Patitions分区架构图:  硬件架构图: end. 查看详情

深入了解androidhandler机制原理详解(代码片段)

...ff1a;子线程handler主线程其实构成了线程模型中的经典问题生产者-消费者模型。生产者-消费者模型:生产者和消费者在同一时间段内共用同一个存储空间,生产者往存储空间中 查看详情

高并发编程学习——线程通信详解(代码片段)

...m/2019/11/26/gao-bing-fa-bian-cheng-xue-xi-1-bing-fa-ji-chu/一、经典的生产者消费者案例上一篇文章我们提到一个应用可以创建多个线程去执行不同的任务,如果这些任务之间有着某种关系,那么线程之间必须能够通信来协调完成工作。生产... 查看详情

kafka2.5.0主题topic(代码片段)

kafka基本命令查看博客《kafka2.5.0基本命令》本博文所使用kafka版本2.5.0,操作系统centos8.1)创建主题创建my-topic主题,该主题有1个副本,8个分区:$bin/kafka-topics.sh--create--bootstrap-serverlocalhost:9092--replication-factor1--partitions8--topicmy-top 查看详情

kafka分区与消费者的关系(代码片段)

1. 前言我们知道,生产者发送消息到主题,消费者订阅主题(以消费者组的名义订阅),而主题下是分区,消息是存储在分区中的,所以事实上生产者发送消息到分区,消费者则从分区读取消息,那么,这里问题来了,生产... 查看详情

生产者消费者模型详解(代码片段)

生产者消费者模型文章目录生产者消费者模型什么是生产者消费者模型基于BlockingQueue的生产者消费者模型单生产者单消费者模型多生产者多消费者模型什么是生产者消费者模型生产者消费者模式就是通过一个容器来解决生产者... 查看详情