关键词:
引言
Kafka是一款很棒的消息系统,今天我们就来深入了解一下它的实现细节,首先关注Producer这一方。
要使用kafka首先要实例化一个KafkaProducer,需要有brokerIP、序列化器等必要Properties以及acks(0、1、n)、compression、retries、batch.size等非必要Properties,通过这个简单的接口可以控制Producer大部分行为,实例化后就可以调用send方法发送消息了。
核心实现是这个方法:
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);//①
return doSend(interceptedRecord, callback);//②
通过不同的模式可以实现发送即忘(忽略返回结果)、同步发送(获取返回的future对象,回调函数置为null)、异步发送(设置回调函数)三种消息模式。
我们来看看消息类ProducerRecord有哪些属性:
private final String topic;//主题
private final Integer partition;//分区
private final Headers headers;//头
private final K key;//键
private final V value;//值
private final Long timestamp;//时间戳
它有多个构造函数,可以适应不同的消息类型:比如有无分区、有无key等。
①中ProducerInterceptors(有0 ~ 无穷多个,形成一个拦截链)对ProducerRecord进行拦截处理(比如打上时间戳,进行审计与统计等操作)
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record)
ProducerRecord<K, V> interceptRecord = record;
for (ProducerInterceptor<K, V> interceptor : this.interceptors)
try
interceptRecord = interceptor.onSend(interceptRecord);
catch (Exception e)
// 不抛出异常,继续执行下一个拦截器
if (record != null)
log.warn("Error executing interceptor onSend callback for topic: , partition: ", record.topic(), record.partition(), e);
else
log.warn("Error executing interceptor onSend callback", e);
return interceptRecord;
如果用户有定义就进行处理并返回处理后的ProducerRecord,否则直接返回本身。
然后②中doSend真正发送消息,并且是异步的(源码太长只保留关键):
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback)
TopicPartition tp = null;
try
// 序列化 key 和 value
byte[] serializedKey;
try
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
catch (ClassCastException cce)
byte[] serializedValue;
try
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
catch (ClassCastException cce)
// 计算分区获得主题与分区
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
// 回调与事务处理省略。
Header[] headers = record.headers().toArray();
// 消息追加到RecordAccumulator中
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);
// 该批次满了或者创建了新的批次就要唤醒IO线程发送该批次了,也就是sender的wakeup方法
if (result.batchIsFull || result.newBatchCreated)
log.trace("Waking up the sender since topic partition is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
return result.future;
catch (Exception e)
// 拦截异常并抛出
this.interceptors.onSendError(record, tp, e);
throw e;
下面是计算分区的方法:
private int partition(ProducerRecord<K, V> record,
byte[] serializedKey, byte[] serializedValue, Cluster cluster)
Integer partition = record.partition();
// 消息有分区就直接使用,否则就使用分区器计算
return partition != null ?
partition :
partitioner.partition(
record.topic(), record.key(), serializedKey,
record.value(), serializedValue, cluster);
默认的分区器DefaultPartitioner实现方式是如果partition存在就直接使用,否则根据key计算partition,如果key也不存在就使用round robin算法分配partition。
/**
* The default partitioning strategy:
* <ul>
* <li>If a partition is specified in the record, use it
* <li>If no partition is specified but a key is present choose a partition based on a hash of the key
* <li>If no partition or key is present choose a partition in a round-robin fashion
*/
public class DefaultPartitioner implements Partitioner
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster)
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) //key为空
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);//可用的分区
if (availablePartitions.size() > 0) //有分区,取模就行
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
else // 无分区,
return Utils.toPositive(nextValue) % numPartitions;
else // key 不为空,计算key的hash并取模获得分区
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
private int nextValue(String topic)
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter)
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null)
counter = currentCounter;
return counter.getAndIncrement();//返回并加一,在取模的配合下就是round robin
以上就是发送消息的逻辑处理,接下来我们再看看消息发送的物理处理。
Sender(是一个Runnable,被包含在一个IO线程ioThread中,该线程不断从RecordAccumulator队列中的读取消息并通过Selector将数据发送给Broker)的wakeup方法,实际上是KafkaClient接口的wakeup方法,由NetworkClient类实现,采用了NIO,也就是java.nio.channels.Selector.wakeup()方法实现。
Sender的run中主要逻辑是不停执行准备消息和等待消息:
long pollTimeout = sendProducerData(now);//③
client.poll(pollTimeout, now);//④
③完成消息设置并保存到信道中,然后监听感兴趣的key,由KafkaChannel实现。
public void setSend(Send send)
if (this.send != null)
throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id);
this.send = send;
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
// transportLayer的一种实现中的相关方法
public void addInterestOps(int ops)
key.interestOps(key.interestOps() | ops);
④主要是Selector的poll,其select被wakeup唤醒:
public void poll(long timeout) throws IOException
/* check ready keys */
long startSelect = time.nanoseconds();
int numReadyKeys = select(timeout);//wakeup使其停止阻塞
long endSelect = time.nanoseconds();
this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers)
Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();
// Poll from channels that have buffered data (but nothing more from the underlying socket)
if (dataInBuffers)
keysWithBufferedRead.removeAll(readyKeys); //so no channel gets polled twice
Set<SelectionKey> toPoll = keysWithBufferedRead;
keysWithBufferedRead = new HashSet<>(); //poll() calls will repopulate if needed
pollSelectionKeys(toPoll, false, endSelect);
// Poll from channels where the underlying socket has more data
pollSelectionKeys(readyKeys, false, endSelect);
// Clear all selected keys so that they are included in the ready count for the next select
readyKeys.clear();
pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
immediatelyConnectedKeys.clear();
else
madeReadProgressLastPoll = true; //no work is also "progress"
long endIo = time.nanoseconds();
this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
其中pollSelectionKeys方法会调用如下方法完成消息发送:
public Send write() throws IOException
Send result = null;
if (send != null && send(send))
result = send;
send = null;
return result;
private boolean send(Send send) throws IOException
send.writeTo(transportLayer);
if (send.completed())
transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
return send.completed();
Send是一次数据发包,一般由ByteBufferSend或者MultiRecordsSend实现,其writeTo调用transportLayer的write方法,一般由PlaintextTransportLayer或者SslTransportLayer实现,区分是否使用ssl:
public long writeTo(GatheringByteChannel channel) throws IOException
long written = channel.write(buffers);
if (written < 0)
throw new EOFException("Wrote negative bytes to channel. This shouldn‘t happen.");
remaining -= written;
pending = TransportLayers.hasPendingWrites(channel);
return written;
public int write(ByteBuffer src) throws IOException
return socketChannel.write(src);
到此就把Producer的业务相关逻辑处理和非业务相关的网络 2方面的主要流程梳理清楚了。其他额外的功能是通过一些配置保证的。
比如顺序保证就是max.in.flight.requests.per.connection,InFlightRequests的doSend会进行判断(由NetworkClient的canSendRequest调用),只要该参数设为1即可保证当前包未确认就不能发送下一个包从而实现有序性
public boolean canSendMore(String node)
Deque<NetworkClient.InFlightRequest> queue = requests.get(node);
return queue == null || queue.isEmpty() ||
(queue.peekFirst().send.completed() && queue.size() < this.maxInFlightRequestsPerConnection);
再比如可靠性,通过设置acks,Sender中sendProduceRequest的clientRequest加入了回调函数:
RequestCompletionHandler callback = new RequestCompletionHandler()
public void onComplete(ClientResponse response)
handleProduceResponse(response, recordsByPartition, time.milliseconds());//调用completeBatch
;
/**
* 完成或者重试投递,这里如果acks不对就会重试
*
* @param batch The record batch
* @param response The produce response
* @param correlationId The correlation id for the request
* @param now The current POSIX timestamp in milliseconds
*/
private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId,
long now, long throttleUntilTimeMs)
public class ProduceResponse extends AbstractResponse
/**
* Possible error code:
* INVALID_REQUIRED_ACKS (21)
*/
kafka源码一层一层包装很多,错综复杂,如有错误请大家不吝赐教。
kafka介绍(代码片段)
...高吞吐、可容错的分布式发布订阅消息系统。Kafka具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。接下来先介绍下消息系统的基本理念,然后再介绍Kafka。消息系统介绍一个消息系统... 查看详情
初识kafka
一、kafka介绍Kafka是一种高吞吐量的分布式发布订阅消息系统,使用scala编写。Kafka拥有作为一个消息系统应该具备的功能,有着独特的设计。它借鉴的JMS的规范思想,但未完全遵循JMS的规范。Kafka是一个分布式的,分区的消息(co... 查看详情
kafka
...建实时数据管道和数据流的应用。具有实时横向扩展、高吞吐量、支持大量堆积具有容错性和速度快等特点。它是一个高性能分布式消息系统。通常一个分布式流数据平台它具有三个特点:发布和订阅功能,类似于消息系统以容... 查看详情
kafka数据可靠性深度解读
...分布式系统,易于向外扩展;它同时为发布和订阅提供高吞吐量;它支持多订阅者, 查看详情
kafka入门:一个开源的、轻量级、高吞吐、高可用的分布式消息系统
...然非常流行的消息系统。Kafka是一款开源的、轻量级的、分布式、可分区和具有复制备份的(Replicated)、基于ZooKeeper协调管理的分布式流平台的功能强大的消息系统。作为一个流式处理平台,必须具备以下3个关键特性:1)能够允... 查看详情
linux12elk-->01elk概述(代码片段)
...中间件)3.2.1点对点模式3.2.2发布订阅模式3.2.3kafka高吞吐量的分布式发布订阅消息系统3.2.3kafka高吞吐量的分布式发布订阅消息系统1.架构图简版:位于各个节点上的filebeat将收集到的 查看详情
javaweb项目架构之kafka分布式日志队列
...的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的... 查看详情
kafka
一、概念Kafka是一种高吞吐量、分布式、基于发布/订阅的消息系统,最初由LinkedIn公司开发,使用Scala语言编写,目前是Apache的开源项目。broker:Kafka服务器,负责消息存储和转发;topic:消息类别,Kafka按照topic来分类消息;partit... 查看详情
kafka介绍
...p;Kafka介绍 Kafka是由Java和Scala编写的是一个分布式、高吞吐量、分区的、多副本的、多订阅者,基于zookeeper协调的分布式发布订阅消息系统(也可以当做MQ系统) 分布式:所有的producer、broker和consumer都会有多个,均匀... 查看详情
原kafka简介
...在apache开源,基于发布订阅的分布式消息系统。2.特点高吞吐量:单机每秒几百MB的读写 消息持久化 高扩展性 高可靠性 支持多消费者(这个是比较重要的特点)3.拓扑结构Broker:Kafka集群包含一个或多个服务器,... 查看详情
kafka删除节点怎么删除
参考技术AKafka是由LinkedIn设计的一个高吞吐量、分布式、基于发布订阅模式的消息系统,使用Scala编写,它以可水平扩展、可靠性、异步通信和高吞吐率等特性而被广泛使用。目前越来越多的开源分布式处理系统都支持与Kafka集成... 查看详情
如何构建安全的kafka集群
Kafka是由LinkedIn设计的一个高吞吐量、分布式、基于发布订阅模式的消息系统,使用Scala编写,它以可水平扩展、可靠性、异步通信和高吞吐率等特性而被广泛使用。目前越来越多的开源分布式处理系统都支持与Kafka集成,其中Spar... 查看详情
一.kafka入门
一、基本概念 Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据.这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素.Kafka有... 查看详情
附录e安装kafka
...afkaE.1.1 下载KafkaKafka是由LinkedIn设计的一个高吞吐量、分布式、基于发布订阅模式的消息系统,使用Scala编写,它以可水平扩展、可靠性、异步通信和高吞吐率等特性而被广泛使用。目前越来越多的开源分布式处理系... 查看详情
kafka简介
kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中所有动作流数据。这种动作(网页浏览、搜索和其他用户的行动)是现代网络上的许多社会功能的一个关键因素。这些数据通常是由于吞吐量的要... 查看详情
kafka简介
...高吞吐、可容错的分布式发布订阅消息系统。Kafka具有高吞吐量、内置分区、 查看详情
kafka入门教程(代码片段)
...f0c;支持不同消费组分别消费。在性能方面,它具有高吞吐量、低时延的特点。 查看详情
kafka学习笔记
Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消费。 查看详情