kafka分区与消费者的关系(代码片段)

cjsblog cjsblog     2023-01-01     215

关键词:

1.  前言


我们知道,生产者发送消息到主题,消费者订阅主题(以消费者组的名义订阅),而主题下是分区,消息是存储在分区中的,所以事实上生产者发送消息到分区,消费者则从分区读取消息,那么,这里问题来了,生产者将消息投递到哪个分区?消费者组中的消费者实例之间是怎么分配分区的呢?接下来,就围绕着这两个问题一探究竟。

2.  主题的分区数设置


在server.properties配置文件中可以指定一个全局的分区数设置,这是对每个主题下的分区数的默认设置,默认是1。

技术分享图片

当然每个主题也可以自己设置分区数量,如果创建主题的时候没有指定分区数量,则会使用server.properties中的设置。

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 2 --replication-factor 1

在创建主题的时候,可以使用--partitions选项指定主题的分区数量

[[email protected] kafka_2.11-2.0.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic abc
Topic:abc       PartitionCount:2        ReplicationFactor:1     Configs:
        Topic: abc      Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: abc      Partition: 1    Leader: 0       Replicas: 0     Isr: 0

3.  生产者与分区


首先提出一个问题:生产者将消息投递到分区有没有规律?如果有,那么它是如何决定一条消息该投递到哪个分区的呢?

3.1.  默认的分区策略

The default partitioning strategy:

  • If a partition is specified in the record, use it
  • If no partition is specified but a key is present choose a partition based on a hash of the key
  • If no partition or key is present choose a partition in a round-robin fashion

org.apache.kafka.clients.producer.internals.DefaultPartitioner

默认的分区策略是:

  • 如果在发消息的时候指定了分区,则消息投递到指定的分区
  • 如果没有指定分区,但是消息的key不为空,则基于key的哈希值来选择一个分区
  • 如果既没有指定分区,且消息的key也是空,则用轮询的方式选择一个分区
/**
 * Compute the partition for the given record.
 *
 * @param topic The topic name
 * @param key The key to partition on (or null if no key)
 * @param keyBytes serialized key to partition on (or null if no key)
 * @param value The value to partition on or null
 * @param valueBytes serialized value to partition on or null
 * @param cluster The current cluster metadata
 */
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) 
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    if (keyBytes == null) 
        int nextValue = nextValue(topic);
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (availablePartitions.size() > 0) 
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return availablePartitions.get(part).partition();
         else 
            // no partitions are available, give a non-available partition
            return Utils.toPositive(nextValue) % numPartitions;
        
     else 
        // hash the keyBytes to choose a partition
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    

通过源代码可以更加作证这一点

4.  分区与消费者


消费者以组的名义订阅主题,主题有多个分区,消费者组中有多个消费者实例,那么消费者实例和分区之前的对应关系是怎样的呢?

换句话说,就是组中的每一个消费者负责那些分区,这个分配关系是如何确定的呢?

技术分享图片

同一时刻,一条消息只能被组中的一个消费者实例消费

消费者组订阅这个主题,意味着主题下的所有分区都会被组中的消费者消费到,如果按照从属关系来说的话就是,主题下的每个分区只从属于组中的一个消费者,不可能出现组中的两个消费者负责同一个分区。

那么,问题来了。如果分区数大于或者等于组中的消费者实例数,那自然没有什么问题,无非一个消费者会负责多个分区,(PS:当然,最理想的情况是二者数量相等,这样就相当于一个消费者负责一个分区);但是,如果消费者实例的数量大于分区数,那么按照默认的策略(PS:之所以强调默认策略是因为你也可以自定义策略),有一些消费者是多余的,一直接不到消息而处于空闲状态。

话又说回来,假设多个消费者负责同一个分区,那么会有什么问题呢?

我们知道,Kafka它在设计的时候就是要保证分区下消息的顺序,也就是说消息在一个分区中的顺序是怎样的,那么消费者在消费的时候看到的就是什么样的顺序,那么要做到这一点就首先要保证消息是由消费者主动拉取的(pull),其次还要保证一个分区只能由一个消费者负责。倘若,两个消费者负责同一个分区,那么就意味着两个消费者同时读取分区的消息,由于消费者自己可以控制读取消息的offset,就有可能C1才读到2,而C1读到1,C1还没处理完,C2已经读到3了,则会造成很多浪费,因为这就相当于多线程读取同一个消息,会造成消息处理的重复,且不能保证消息的顺序,这就跟主动推送(push)无异。

4.1.  消费者分区分配策略

org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor

如果是自定义分配策略的话可以继承AbstractPartitionAssignor这个类,它默认有3个实现

4.1.1.  range

range策略对应的实现类是org.apache.kafka.clients.consumer.RangeAssignor

这是默认的分配策略

可以通过消费者配置中partition.assignment.strategy参数来指定分配策略,它的值是类的全路径,是一个数组

/**
 * The range assignor works on a per-topic basis. For each topic, we lay out the available partitions in numeric order
 * and the consumers in lexicographic order. We then divide the number of partitions by the total number of
 * consumers to determine the number of partitions to assign to each consumer. If it does not evenly
 * divide, then the first few consumers will have one extra partition.
 *
 * For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions,
 * resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2.
 *
 * The assignment will be:
 * C0: [t0p0, t0p1, t1p0, t1p1]
 * C1: [t0p2, t1p2]
 */

range策略是基于每个主题的

对于每个主题,我们以数字顺序排列可用分区,以字典顺序排列消费者。然后,将分区数量除以消费者总数,以确定分配给每个消费者的分区数量。如果没有平均划分(PS:除不尽),那么最初的几个消费者将有一个额外的分区。

简而言之,就是,

1、range分配策略针对的是主题(PS:也就是说,这里所说的分区指的某个主题的分区,消费者值的是订阅这个主题的消费者组中的消费者实例)

2、首先,将分区按数字顺序排行序,消费者按消费者名称的字典序排好序

3、然后,用分区总数除以消费者总数。如果能够除尽,则皆大欢喜,平均分配;若除不尽,则位于排序前面的消费者将多负责一个分区

例如,假设有两个消费者C0和C1,两个主题t0和t1,并且每个主题有3个分区,分区的情况是这样的:t0p0,t0p1,t0p2,t1p0,t1p1,t1p2

那么,基于以上信息,最终消费者分配分区的情况是这样的:

C0: [t0p0, t0p1, t1p0, t1p1]

C1: [t0p2, t1p2]

为什么是这样的结果呢?

因为,对于主题t0,分配的结果是C0负责P0和P1,C1负责P2;对于主题t2,也是如此,综合起来就是这个结果

上面的过程用图形表示的话大概是这样的:

技术分享图片

阅读代码,更有助于理解:

public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
                                                    Map<String, Subscription> subscriptions) 
    //    主题与消费者的映射                                                            
    Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);
    Map<String, List<TopicPartition>> assignment = new HashMap<>();
    for (String memberId : subscriptions.keySet())
        assignment.put(memberId, new ArrayList<TopicPartition>());

    for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) 
        String topic = topicEntry.getKey();    //    主题
        List<String> consumersForTopic = topicEntry.getValue();    //    消费者列表

        //    partitionsPerTopic表示主题和分区数的映射
        //    获取主题下有多少个分区
        Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
        if (numPartitionsForTopic == null)
            continue;

        //    消费者按字典序排序
        Collections.sort(consumersForTopic);

        //    分区数量除以消费者数量
        int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
        //    取模,余数就是额外的分区
        int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();

        List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
        for (int i = 0, n = consumersForTopic.size(); i < n; i++) 
            int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
            int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
            //    分配分区
            assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));
        
    
    return assignment;

4.1.2.  roundrobin(轮询)

roundronbin分配策略的具体实现是org.apache.kafka.clients.consumer.RoundRobinAssignor

/**
 * The round robin assignor lays out all the available partitions and all the available consumers. It
 * then proceeds to do a round robin assignment from partition to consumer. If the subscriptions of all consumer
 * instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts
 * will be within a delta of exactly one across all consumers.)
 *
 * For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions,
 * resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2.
 *
 * The assignment will be:
 * C0: [t0p0, t0p2, t1p1]
 * C1: [t0p1, t1p0, t1p2]
 *
 * When subscriptions differ across consumer instances, the assignment process still considers each
 * consumer instance in round robin fashion but skips over an instance if it is not subscribed to
 * the topic. Unlike the case when subscriptions are identical, this can result in imbalanced
 * assignments. For example, we have three consumers C0, C1, C2, and three topics t0, t1, t2,
 * with 1, 2, and 3 partitions, respectively. Therefore, the partitions are t0p0, t1p0, t1p1, t2p0,
 * t2p1, t2p2. C0 is subscribed to t0; C1 is subscribed to t0, t1; and C2 is subscribed to t0, t1, t2.
 *
 * Tha assignment will be:
 * C0: [t0p0]
 * C1: [t1p0]
 * C2: [t1p1, t2p0, t2p1, t2p2]
 */

轮询分配策略是基于所有可用的消费者和所有可用的分区的

与前面的range策略最大的不同就是它不再局限于某个主题

如果所有的消费者实例的订阅都是相同的,那么这样最好了,可用统一分配,均衡分配

例如,假设有两个消费者C0和C1,两个主题t0和t1,每个主题有3个分区,分别是t0p0,t0p1,t0p2,t1p0,t1p1,t1p2

那么,最终分配的结果是这样的:

C0: [t0p0, t0p2, t1p1]

C1: [t0p1, t1p0, t1p2]

用图形表示大概是这样的:

技术分享图片

假设,组中每个消费者订阅的主题不一样,分配过程仍然以轮询的方式考虑每个消费者实例,但是如果没有订阅主题,则跳过实例。当然,这样的话分配肯定不均衡。

什么意思呢?也就是说,消费者组是一个逻辑概念,同组意味着同一时刻分区只能被一个消费者实例消费,换句话说,同组意味着一个分区只能分配给组中的一个消费者。事实上,同组也可以不同订阅,这就是说虽然属于同一个组,但是它们订阅的主题可以是不一样的。

例如,假设有3个主题t0,t1,t2;其中,t0有1个分区p0,t1有2个分区p0和p1,t2有3个分区p0,p1和p2;有3个消费者C0,C1和C2;C0订阅t0,C1订阅t0和t1,C2订阅t0,t1和t2。那么,按照轮询分配的话,C0应该负责

首先,肯定是轮询的方式,其次,比如说有主题t0,t1,t2,它们分别有1,2,3个分区,也就是t0有1个分区,t1有2个分区,t2有3个分区;有3个消费者分别从属于3个组,C0订阅t0,C1订阅t0和t1,C2订阅t0,t1,t2;那么,按照轮询分配的话,C0应该负责t0p0,C1应该负责t1p0,其余均由C2负责。

上述过程用图形表示大概是这样的:

技术分享图片

为什么最后的结果是

C0: [t0p0]

C1: [t1p0]

C2: [t1p1, t2p0, t2p1, t2p2]

这是因为,按照轮询t0p1由C0负责,t1p0由C1负责,由于同组,C2只能负责t1p1,由于只有C2订阅了t2,所以t2所有分区由C2负责,综合起来就是这个结果

细想一下可以发现,这种情况下跟range分配的结果是一样的

5.  测试代码

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.cjs.example</groupId>
    <artifactId>kafka-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>kafka-demo</name>
    <description></description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.5.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>
package com.cjs.kafka.producer;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class HelloProducer 

    public static void main(String[] args) 

        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.1.133:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        for (int i = 0; i < 100; i++) 
            producer.send(new ProducerRecord<String, String>("abc", Integer.toString(i), Integer.toString(i)), new Callback() 
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) 
                    if (null != e) 
                        e.printStackTrace();
                    else 
                        System.out.println("callback: " + recordMetadata.topic() + " " + recordMetadata.partition() + " " + recordMetadata.offset());
                    
                
            );
        
        producer.close();

    
package com.cjs.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class HelloConsumer 

    public static void main(String[] args) 
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.1.133:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
//        props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList("foo", "bar", "abc"));
        while (true) 
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) 
                System.out.printf("partition = %s, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
            
        
    

6.  参考


http://kafka.apache.org/documentation/#consumerconfigs

https://blog.csdn.net/feelwing1314/article/details/81097167

https://blog.csdn.net/OiteBody/article/details/80595971

https://blog.csdn.net/YChenFeng/article/details/74980531

 

kafka面试连环炮(代码片段)

...的有序性吗5.1为什么topic下多个分区不能保证有序6分区与消费者组间的关系7生产者分区策略8数据丢失8.1生产者保证数据不丢失8.8.2broker保证数据不丢失8.3customer保证数据不丢失9数据重复10kafka当中数据的查找过程11Kafkaa 查看详情

中通消息平台kafka顺序消费线程模型的实践与优化(代码片段)

...可,而Kafka会在任意时刻保证一个消费组同时只能有一个消费者监听消费,因此可在消费时按分区进行顺序消费,保证每个分区的消息具备局部顺序性。由于需要确保分区消息的顺序 查看详情

真的,kafka入门一篇文章就够了(代码片段)

...Producer),生产者用于持续不断的向某个主题发送消息。消费者:订阅主题消息的客户端程序称为消费者(Consumer),消费者用于处理生产者产生的消息。消费者群组:生产者与消费者的关系就如同餐厅中的厨师和顾客之间的关... 查看详情

kafka学习笔记&面经分享(代码片段)

...答机制故障处理细节follower故障leader故障ExactlyOnce语义kafka消费者消费方式分区分配策略Range(默认策略)RoundRobinStickyAssignoroffset的维护Kafka高效读写数据顺序写磁盘零拷贝MemoryMappedFileszookeeper在kafka中的作用Controller节点broke... 查看详情

kafka集群监控安全机制与最佳实践(代码片段)

...它有如下功能:管理多个集群轻松检查群集状态(主题,消费者,偏移,代理,副本分发,分区分发)运行首选副本选举使用选项生成分区分配以选择要使用的代理运行分区重新分配(基于生成的分配)使用可选主题配置创建主... 查看详情

kafka分区机制详解

...基于发布-订阅模型而构建,生产者向主题发送消息,而消费者则通过订阅主题来消费消息。而主题里面又可以创建多个分区,新建的主题默认只有一个分区 查看详情

kafka消费组消费主题(代码片段)

消费组消费主题特征:同一个消费组里面消费者不能消费同一个分区不同消费组里面的消费者可以消费同一个分区,相互独立互不干扰当消费组中的消费者数量大于分区数量时,会存在浪费。因为有消费者分配不到partition进行消... 查看详情

kafka消费者开发方式小结(代码片段)

【README】1,本文总结了kafka消费者开发方式;2,本文使用的是最新的kafka版本3.0.0;【1】kafka消费则【1.1】消费者与消费者组1)消费者:应用程序需要创建消费者对象,订阅主题并开始接收消息;2&... 查看详情

kafka消费者开发方式小结(代码片段)

【README】1,本文总结了kafka消费者开发方式;2,本文使用的是最新的kafka版本3.0.0;【1】kafka消费则【1.1】消费者与消费者组1)消费者:应用程序需要创建消费者对象,订阅主题并开始接收消息;2&... 查看详情

kafka的生产者与消费者机制+分区策略你这还不懂?(代码片段)

...如Cloudera、Storm、Spark、Flink等都支持与Kafka集成生产者与消费者机制在Kafka中,生产者(producer)将消息发送给Broker,Broker将生产者发送的消息存储到磁盘当中,而消费者(Consumer)负责从Broker订阅并且消... 查看详情

kafka系列--多线程消费者实现(代码片段)

...据。用队列来存储要提交的offest,然后处理线程将其给回消费者提交。每个分区开一个处理线程来处理数据,分区与处理器的映射放在map中。当处理到一定的 查看详情

kafka消息中间件(kafka与mqtt区别)(代码片段)

...KafkaKafka重要原理Topic主题Partition分区Producer生产者Consumer消费者Broker中间件Offset偏移量Kafka与mqtt区别KafkaKafka是一个分布式流处理平台,它可以快速地处理大量的数据流。Kafka的核心原理是基于发布/订阅模式的消息队列。Kafka允... 查看详情

kafka消费者对应的分配partition分区策略(代码片段)

...存储到多个broker服务器里的partition分区中,消费组里面的消费者又可以同时消费不同的partition分区。解决问题:1.实现了负载均衡(分布在多个broker上,可以同时对外提供服务)。2.多消费者并发消费,提升kafka的吞吐量。 查看详情

kafka中的再均衡(代码片段)

在《Kafka消费者的使用和原理》中已经提到过“再均衡”的概念,我们先回顾下,一个主题可以有多个分区,而订阅该主题的消费组中可以有多个消费者。每一个分区只能被消费组中的一个消费者消费,可认为每个分区的消费权... 查看详情

kafka(代码片段)

...在不同的主机上面建了不同目录Producer:生产者Consumer:消费者Replica:就是kafka的副本,kafka的partition为了保证数据安全,所以每个分区partit 查看详情

kafka分区分配策略分析——重点:stickyassignor(代码片段)

...,默认情况下一个分区只能被一个消费组下面的一个消费者消费&# 查看详情

kafka-语义&重复消费&丢失数据&分区有序(代码片段)

...确一次语义。重复消费这是一个很常见的问题,如果保证消费者不重复消费数据,博客上有很多的方法,简单罗列几条如下。(1)给每条消息加一个独一无二的key,如uuid,消费数据的时候同时记录这些key,下次消费数据时需要... 查看详情

kafka监控工具之kafkamanager(代码片段)

...供了如下功能:管理多个集群。轻松检查群集状态(Topic,消费者,偏移量,Broker,副本分发,分区分发)。运行首选副本选举。使用选项生成分区分配以选择要使用的Broker。分区重新分配。使用可选Topic配 查看详情