kafka源码分析-序列2-producer

author author     2022-08-06     580

关键词:

  在上一篇,我们从使用方式和策略上,对消息队列做了一个宏观描述。从本篇开始,我们将深入到源码内部,仔细分析Kafka到底是如何实现一个分布式消息队列。我们的分析将从Producer端开始。
  
  从Kafka 0.8.2开始,发布了一套新的Java版的client api, KafkaProducer/KafkaConsumer,替代之前的scala版的api。本系列的分析将只针对这套Java版的api。
  
  多线程异步发送模型
  
  下图是经过源码分析之后,整理出来的Producer端的架构图:
  
  这里写图片描述
  
  在上一篇我们讲过,Producer有同步发送和异步发送2种策略。在以前的Kafka client api实现中,同步和异步是分开实现的。而在0.9中,同步发送其实是通过异步发送间接实现,其接口如下:
  
  public class KafkaProducer<K, V> implements Producer<K, V> {
  
  ...
  
  public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) //异步发送接口
  
  {
  
  ...
  
  }
  
  }
  
  1
  
  2
  
  3
  
  4
  
  5
  
  6
  
  7
  
  要实现同步发送,只要在拿到返回的Future对象之后,直接调用get()就可以了。
  
  基本思路
  
  从上图我们可以看出,异步发送的基本思路就是:send的时候,KafkaProducer把消息放到本地的消息队列RecordAccumulator,然后一个后台线程Sender不断循环,把消息发给Kafka集群。
  
  要实现这个,还得有一个前提条件:就是KafkaProducer/Sender都需要获取集群的配置信息Metadata。所谓Metadata,也就是在上一篇所讲的,Topic/Partion与broker的映射关系:每一个Topic的每一个Partion,得知道其对应的broker列表是什么,其中leader是谁,follower是谁。
  
  2个数据流
  
  所以在上图中,有2个数据流:
  
  Metadata流(A1,A2,A3):Sender从集群获取信息,然后更新Metadata; KafkaProducer先读取Metadata,然后把消息放入队列。
  
  消息流(B1, B2, B3):这个很好理解,不再详述。
  
  本篇着重讲述Metadata流,消息流,将在后续详细讲述。
  
  Metadata的线程安全性
  
  从上图可以看出,Metadata是多个producer线程读,一个sender线程更新,因此它必须是线程安全的。
  
  Kafka的官方文档上也有说明,KafkaProducer是线程安全的,可以在多线程中调用:
  
  The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances.
  
  从下面代码也可以看出,它的所有public方法都是synchronized:
  
  public final class Metadata {
  
  。。。
  
  public synchronized Cluster fetch() {
  
  return this.cluster;
  
  }
  
  public synchronized long timeToNextUpdate(long nowMs) {
  
  。。。
  
  }
  
  public synchronized int requestUpdate() {
  
  。。。
  
  }
  
  。。。
  
  }
  
  1
  
  2
  
  3
  
  4
  
  5
  
  6
  
  7
  
  8
  
  9
  
  10
  
  11
  
  12
  
  13
  
  Metadata的数据结构
  
  下面代码列举了Metadata的主要数据结构:一个Cluster对象 + 1堆状态变量。前者记录了集群的配置信息,后者用于控制Metadata的更新策略。
  
  public final class www.tyff688.com Metadata {
  
  ...
  
  private final long refreshBackoffMs; //更新失败的情况下,下1次更新的补偿时间(这个变量在代码中意义不是太大)
  
  private final long metadataExpireMs; //关键值:每隔多久,更新一次。缺省是600*1000,也就是10分种
  
  private int version; //每更新成功1次,version递增1。这个变量主要用于在while循环,wait的时候,作为循环判断条件
  
  private long lastRefreshMs; //上一次更新时间(也包含更新失败的情况)
  
  private long lastSuccessfulRefreshMs; //上一次成功更新的时间(如果每次都成功的话,则2者相等。否则,lastSuccessulRefreshMs < lastRefreshMs)
  
  private Cluster cluster; //集群配置信息
  
  private boolean needUpdate; //是否强制刷新
  
  、
  
  ...
  
  }
  
  public final class Cluster {
  
  ...
  
  private final List<Node> nodes; //www.senta77.com Node也就是Broker
  
  private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition; //Topic/Partion和broker list的映射关系
  
  private final Map<String, List<PartitionInfo>> partitionsByTopic;
  
  private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
  
  private final Map<Integer, List<PartitionInfo>> partitionsByNode;
  
  private final Map<Integer, Node> nodesById;
  
  }
  
  public class PartitionInfo {
  
  private final String topic;
  
  private final int partition;
  
  private final Node www.xbyl688.com leader;
  
  private final Node[] replicas;
  
  private final Node[] www.honqili66.com inSyncReplicas;
  
  }
  
  1
  
  2
  
  3
  
  4
  
  5
  
  6
  
  7
  
  8
  
  9
  
  10
  
  11
  
  12
  
  13
  
  14
  
  15
  
  16
  
  17
  
  18
  
  19
  
  20
  
  21
  
  22
  
  23
  
  24
  
  25
  
  26
  
  27
  
  28
  
  29
  
  30
  
  producer读取Metadata
  
  下面是send函数的源码,可以看到,在send之前,会先读取metadata。如果metadata读不到,会一直阻塞在那,直到超时,抛出TimeoutException
  
  //KafkaProducer
  
  public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
  
  try {
  
  long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs); //拿不到topic的配置信息,会一直阻塞在这,直到抛异常
  
  ... //拿到了,执行下面的send逻辑
  
  } catch()
  
  {}
  
  }
  
  //KafkaProducer
  
  private long waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException {
  
  if (!this.metadata.containsTopic(topic))
  
  this.metadata.add(topic);
  
  if (metadata.fetch().partitionsForTopic(topic) != null)
  
  return 0; //取到topic的配置信息,直接返回
  
  long begin = time.milliseconds();
  
  long remainingWaitMs = maxWaitMs;
  
  while (metadata.fetch().partitionsForTopic(topic) == null) { //取不到topic的配置信息,一直死循环wait,直到超时,抛TimeoutException
  
  log.trace("Requesting metadata update for topic {}.", topic);
  
  int version = metadata.requestUpdate(); //把needUpdate置为true
  
  sender.wakeup(); //唤起sender
  
  metadata.awaitUpdate(version, remainingWaitMs); //metadata的关键函数
  
  long elapsed = time.milliseconds() - begin;
  
  if (elapsed >= maxWaitMs)
  
  throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
  
  if (metadata.fetch().unauthorizedTopics().contains(topic))
  
  throw new TopicAuthorizationException(topic);
  
  remainingWaitMs = maxWaitMs - elapsed;
  
  }
  
  return time.milliseconds() - begin;
  
  }
  
  //Metadata
  
  public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
  
  if (maxWaitMs < 0) {
  
  throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");
  
  }
  
  long begin = System.currentTimeMillis();
  
  long remainingWaitMs = maxWaitMs;
  
  while (this.version <= lastVersion) { //当Sender成功更新meatadata之后,version加1。否则会循环,一直wait
  
  if (remainingWaitMs != 0
  
  wait(remainingWaitMs); //线程的wait机制,wait和synchronized的配合使用
  
  long elapsed = System.currentTimeMillis() - begin;
  
  if (elapsed >= maxWaitMs) //wait时间超出了最长等待时间
  
  throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
  
  remainingWaitMs = maxWaitMs - elapsed;
  
  }
  
  }
  
  1
  
  2
  
  3
  
  4
  
  5
  
  6
  
  7
  
  8
  
  9
  
  10
  
  11
  
  12
  
  13
  
  14
  
  15
  
  16
  
  17
  
  18
  
  19
  
  20
  
  21
  
  22
  
  23
  
  24
  
  25
  
  26
  
  27
  
  28
  
  29
  
  30
  
  31
  
  32
  
  33
  
  34
  
  35
  
  36
  
  37
  
  38
  
  39
  
  40
  
  41
  
  42
  
  43
  
  44
  
  45
  
  46
  
  47
  
  48
  
  49
  
  50
  
  51
  
  52
  
  总结:从上面代码可以看出,producer wait metadata的时候,有2个条件:
  
  (1) while (metadata.fetch().partitionsForTopic(topic) == null)
  
  (2)while (this.version <= lastVersion)
  
  有wait就会有notify,notify在Sender更新Metadata的时候发出。
  
  Sender更新Metadata
  
  Sender的创建
  
  下面是KafkaProducer的构造函数,从代码可以看出,Sender就是KafkaProducer中创建的一个Thread.
  
  private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
  
  try {
  
  ...
  
  this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG)); //构造metadata
  
  this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds()); //往metadata中,填入初始的,配置的node列表
  
  ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
  
  NetworkClient client = new NetworkClient(
  
  new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", metricTags, channelBuilder),
  
  this.metadata,
  
  clientId,
  
  config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
  
  config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
  
  config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
  
  config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
  
  this.sender = new Sender(client, //构造一个sender。sender本身实现的是Runnable接口
  
  this.metadata,
  
  this.accumulator,
  
  config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
  
  (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
  
  config.getInt(ProducerConfig.RETRIES_CONFIG),
  
  this.metrics,
  
  new SystemTime(),
  
  clientId,
  
  this.requestTimeoutMs);
  
  String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
  
  this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
  
  this.ioThread.start(); //一个线程,开启sender
  
  1
  
  2
  
  3
  
  4
  
  5
  
  6
  
  7
  
  8
  
  9
  
  10
  
  11
  
  12
  
  13
  
  14
  
  15
  
  16
  
  17
  
  18
  
  19
  
  20
  
  21
  
  22
  
  23
  
  24
  
  25
  
  26
  
  27
  
  28
  
  29
  
  30
  
  31
  
  32
  
  33
  
  Metadata的更新机制 – Sender的run方法
  
  public void run() {
  
  // main loop, runs until close is called
  
  while (running) {
  
  try {
  
  run(time.milliseconds());
  
  } catch (Exception e) {
  
  log.error("Uncaught error in kafka producer I/O thread: ", e);
  
  }
  
  }
  
  。。。
  
  }
  
  public void run(long now) {
  
  Cluster cluster = metadata.fetch();
  
  。。。
  
  RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); //遍历消息队列中所有的消息,找出对应的,已经ready的Node
  
  if (result.unknownLeadersExist) //如果一个ready的node都没有,请求更新metadata
  
  this.metadata.requestUpdate();
  
  。。。
  
  //client的2个关键函数,一个发送ClientRequest,一个接收ClientResponse。底层调用的是NIO的poll。关于nio, 后面会详细介绍
  
  for (ClientRequest request : requests)
  
  client.send(request, now);
  
  this.client.poll(pollTimeout, now);
  
  }
  
  //NetworkClient
  
  public List<ClientResponse> poll(long timeout, long now) {
  
  long metadataTimeout = metadataUpdater.maybeUpdate(now); //判断是否要更新metadata
  
  try {
  
  this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
  
  } catch (IOException e) {
  
  log.error("Unexpected error during I/O", e);
  
  }
  
  // process completed actions
  
  long updatedNow = this.time.milliseconds();
  
  List<ClientResponse> responses = new ArrayList<>();
  
  handleCompletedSends(responses, updatedNow);
  
  handleCompletedReceives(responses, updatedNow); //在返回的handler中,会处理metadata的更新
  
  handleDisconnections(responses, updatedNow);
  
  handleConnections();
  
  handleTimedOutRequests(responses, updatedNow);
  
  // invoke callbacks
  
  for (ClientResponse response : responses) {
  
  if (response.request().hasCallback()) {
  
  try {
  
  response.request().callback().onComplete(response);
  
  } catch (Exception e) {
  
  log.error("Uncaught error in request completion:", e);
  
  }
  
  }
  
  }
  
  return responses;
  
  }
  
  //DefaultMetadataUpdater
  
  @Override
  
  public long maybeUpdate(long now) {
  
  // should we update our metadata?
  
  long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
  
  long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);
  
  long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0;
  
  // if there is no node available to connect, back off refreshing metadata
  
  long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt),
  
  waitForMetadataFetch);
  
  if (metadataTimeout == 0) {
  
  // highly dependent on the behavior of leastLoadedNode.
  
  Node node = leastLoadedNode(now); //找到负载最小的Node
  
  maybeUpdate(now, node); //把更新Metadata的请求,发给这个Node
  
  }
  
  return metadataTimeout;
  
  }
  
  private void maybeUpdate(long now, Node node) {
  
  if (node == null) {
  
  log.debug("Give up sending metadata request since no node is available");
  
  // mark the timestamp for no node available to connect
  
  this.lastNoNodeAvailableMs = now;
  
  return;
  
  }
  
  String nodeConnectionId = node.idString();
  
  if (canSendRequest(nodeConnectionId)) {
  
  Set<String> topics = metadata.needMetadataForAllTopics() ? new HashSet<String>() : metadata.topics();
  
  this.metadataFetchInProgress = true;
  
  ClientRequest metadataRequest = request(now, nodeConnectionId, topics); //关键点:发送更新Metadata的Request
  
  log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
  
  doSend(metadataRequest, now); //这里只是异步发送,返回的response在上面的handleCompletedReceives里面处理
  
  } else if (connectionStates.canConnect(nodeConnectionId, now)) {
  
  log.debug("Initialize connection to node {} for sending metadata request", node.id());
  
  initiateConnect(node, now);
  
  } else { // connected, but can‘t send more OR connecting
  
  this.lastNoNodeAvailableMs = now;
  
  }
  
  }
  
  private void handleCompletedReceives(List<ClientResponse> responses, long now) {
  
  for (NetworkReceive receive : this.selector.completedReceives()) {
  
  String source = receive.source();
  
  ClientRequest req = inFlightRequests.completeNext(source);
  
  ResponseHeader header = ResponseHeader.parse(receive.payload());
  
  // Always expect the response version id to be the same as the request version id
  
  short apiKey = req.request().header().apiKey();
  
  short apiVer = req.request().header().apiVersion();
  
  Struct body = (Struct) ProtoUtils.responseSchema(apiKey, apiVer).read(receive.payload());
  
  correlate(req.request().header(), header);
  
  if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
  
  responses.add(new ClientResponse(req, now, false, body));
  
  }
  
  }
  
  @Override
  
  public boolean maybeHandleCompletedReceive(ClientRequest req, long now, Struct body) {
  
  short apiKey = req.request().header().apiKey();
  
  if (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient()) {
  
  handleResponse(req.request().header(), body, now);
  
  return true;
  
  }
  
  return false;
  
  }
  
  //关键函数
  
  private void handleResponse(RequestHeader header, Struct body, long now) {
  
  this.metadataFetchInProgress = false;
  
  MetadataResponse response = new MetadataResponse(body);
  
  Cluster cluster = response.cluster(); //从response中,拿到一个新的cluster对象
  
  if (response.errors().size() > 0) {
  
  log.warn("Error while fetching metadata with correlation id {} : {}", header.correlationId(), response.errors());
  
  }
  
  if (cluster.nodes().size() > 0) {
  
  this.metadata.update(cluster, now); //更新metadata,用新的cluster覆盖旧的cluster
  
  } else {
  
  log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());
  
  this.metadata.failedUpdate(now); //更新metadata失败,做失败处理逻辑
  
  }
  
  }
  
  //更新成功,version+1, 同时更新其它字段
  
  public synchronized void update(Cluster cluster, long now) {
  
  this.needUpdate = false;
  
  this.lastRefreshMs = now;
  
  this.lastSuccessfulRefreshMs = now;
  
  this.version += 1;
  
  for (Listener listener: listeners)
  
  listener.onMetadataUpdate(cluster); //如果有人监听了metadata的更新,通知他们
  
  this.cluster = this.needMetadataForAllTopics ? getClusterForCurrentTopics(cluster) : cluster; //新的cluster覆盖旧的cluster
  
  notifyAll(); //通知所有的阻塞的producer线程
  
  log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
  
  }
  
  //更新失败,只更新lastRefreshMs
  
  public synchronized void failedUpdate(long now) {
  
  this.lastRefreshMs = now;
  
  }
  
  1
  
  2
  
  3
  
  4
  
  5
  
  6
  
  7
  
  8
  
  9
  
  10
  
  11
  
  12
  
  13
  
  14
  
  15
  
  16
  
  17
  
  18
  
  19
  
  20
  
  21
  
  22
  
  23
  
  24
  
  25
  
  26
  
  27
  
  28
  
  29
  
  30
  
  31
  
  32
  
  33
  
  34
  
  35
  
  36
  
  37
  
  38
  
  39
  
  40
  
  41
  
  42
  
  43
  
  44
  
  45
  
  46
  
  47
  
  48
  
  49
  
  50
  
  51
  
  52
  
  53
  
  54
  
  55
  
  56
  
  57
  
  58
  
  59
  
  60
  
  61
  
  62
  
  63
  
  64
  
  65
  
  66
  
  67
  
  68
  
  69
  
  70
  
  71
  
  72
  
  73
  
  74
  
  75
  
  76
  
  77
  
  78
  
  79
  
  80
  
  81
  
  82
  
  83
  
  84
  
  85
  
  86
  
  87
  
  88
  
  89
  
  90
  
  91
  
  92
  
  93
  
  94
  
  95
  
  96
  
  97
  
  98
  
  99
  
  100
  
  101
  
  102
  
  103
  
  104
  
  105
  
  106
  
  107
  
  108
  
  109
  
  110
  
  111
  
  112
  
  113
  
  114
  
  115
  
  116
  
  117
  
  118
  
  119
  
  120
  
  121
  
  122
  
  123
  
  124
  
  125
  
  126
  
  127
  
  128
  
  129
  
  130
  
  131
  
  132
  
  133
  
  134
  
  135
  
  136
  
  137
  
  138
  
  139
  
  140
  
  141
  
  142
  
  143
  
  144
  
  145
  
  146
  
  147
  
  148
  
  149
  
  150
  
  151
  
  152
  
  153
  
  154
  
  155
  
  156
  
  157
  
  158
  
  159
  
  160
  
  161
  
  162
  
  163
  
  164
  
  165
  
  166
  
  167
  
  168
  
  169
  
  170
  
  171
  
  172
  
  总结
  
  最后做一个总结:
  
  (1) Metadata的更新,是在while循环,每次调用client.poll()的时候更新的。在这个while循环中,通过记录当前时间,来实现各种超时机制。
  
  (2) 更新机制有2个:
  
  机制1:每隔一段时间更新一次,这个通过 Metadata的lastRefreshMs, lastSuccessfulRefreshMs 这2个字段来实现
  
  机制2:强制更新, 通过Metadata的needUpdate字段来实现。 requestUpdate()函数里面其实什么都没做,就是把needUpdate置成了false
  
  每次poll的时候,都检查这2个条件,达到了,就触发更新
  
  (3) 更新请求MetadataRequest是nio异步发送的,在poll的返回中,处理MetadataResponse的时候,才真正更新Metadata。
  
  这里有个关键点:Metadata的cluster对象,每次是整个覆盖的,而不是局部更新。所以cluster内部不用加锁。
  
  (4) 更新的时候,是从metadata保存的所有Node,或者说Broker中,选负载最小的那个,也就是当前接收请求最少的那个。向其发送MetadataRequest请求,获取新的Cluster对象。

源码分析kafka之producer

Kafka是一款很棒的消息系统,可以看看我之前写的后端好书阅读与推荐来了解一下它的整体设计。今天我们就来深入了解一下它的实现细节(我fork了一份代码),首先关注Producer这一方。要使用kafka首先要实例化一个KafkaProducer,... 查看详情

源码分析kafka之producer

Kafka是一款很棒的消息系统,可以看看我之前写的后端好书阅读与推荐来了解一下它的整体设计。今天我们就来深入了解一下它的实现细节(我fork了一份代码),首先关注Producer这一方。要使用kafka首先要实例化一个KafkaProducer,... 查看详情

高吞吐量的分布式发布订阅消息系统kafka之producer源码分析(代码片段)

引言Kafka是一款很棒的消息系统,今天我们就来深入了解一下它的实现细节,首先关注Producer这一方。要使用kafka首先要实例化一个KafkaProducer,需要有brokerIP、序列化器等必要Properties以及acks(0、1、n)、compression、retries、batch.size... 查看详情

kafka源码分析之product(代码片段)

引言Kafka是一款很棒的消息系统,今天我们就来深入了解一下它的实现细节,首先关注Producer这一方。要使用kafka首先要实例化一个KafkaProducer,需要有brokerIP、序列化器等必要Properties以及acks(0、1、n)、compression、retries、batch.size... 查看详情

聊聊kafka:producer源码解析(代码片段)

...关于Kafka的基础架构以及搭建,从这篇开始我们就来源码分析一波。我们这用的Kafka版本是2.7.0,其Client端是由Java实现,Server端是由Scala来实现的,在使用Kafka时,Client是用户最先接触到的部分,因此,... 查看详情

聊聊kafka:producer源码解析(代码片段)

...关于Kafka的基础架构以及搭建,从这篇开始我们就来源码分析一波。我们这用的Kafka版本是2.7.0,其Client端是由Java实现,Server端是由Scala来实现的,在使用Kafka时,Client是用户最先接触到的部分,因此,... 查看详情

聊聊kafka:producer的网络模型(代码片段)

...、Producer的网络模型我们前面几篇有说Producer发送流程的源码分析,但那个是大的轮廓,涉及到发送很多相关的内容,比如:获取topic的metadata信息key和value的序列化获取该record要发送到的partition向RecordAccmulator中追... 查看详情

聊聊kafka:producer的网络模型(代码片段)

...、Producer的网络模型我们前面几篇有说Producer发送流程的源码分析,但那个是大的轮廓,涉及到发送很多相关的内容,比如:获取topic的metadata信息key和value的序列化获取该record要发送到的partition向RecordAccmulator中追... 查看详情

spark kafka producer 可序列化

】sparkkafkaproducer可序列化【英文标题】:sparkkafkaproducerserializable【发布时间】:2017-03-2221:55:42【问题描述】:我想出了一个例外:ERRORyarn.ApplicationMaster:用户类抛出异常:org.apache.spark.SparkException:任务不可序列化org.apache.spark.Spa... 查看详情

kafka学习总结008---生产者生产数据流程(参照源码)

前一篇总结了下生产者JavaAPI,本篇参照源码总结下生产数据的具体流程,先上图:   1.Producer创建时,会创建一个Sender线程并设置为守护线程2.生产消息时,内部其实是异步流程;生产的消息先经过拦截器->序列化器... 查看详情

kafka一些问题点的分析

kakfka架构图:理解kafka需要理解三个问题。1.producer,broker,consumer,ZK的工作模式。broker,ZK是作为一个后台服务,而producer和consumer是作为一个SDK提供给开发者进行开发用。2.producer和consumer的交互类型。一般的队列模式是采用push模式,... 查看详情

从源码分析如何优雅的使用kafka生产者(代码片段)

...保证消息的高效及一致性呢?正好以这个问题结合Kakfa的源码讨论下如何正确、高效的发送消息。内容较多,对源码感兴趣的朋友请系好安全带??(源码基于v0.10.0.0版本分析)。同时最好是有一定的Kafka使用经验,知晓基本的用法。... 查看详情

kafka源码reassignpartitionscommand分区副本重分配源码原理分析(附配套教学视频)

...臻臻的杂货铺​​文章目录​​1.脚本的使用​​​​2.源码解析​​​​2.1`--generate`生成分配策略分析​​​​2.2`--execute`执行阶段分析​​​​2.2.1已有任务,尝试限流​​​​2.2.2当前未有执行任务,开始执行副本重分配任务​... 查看详情

我凭借这份pdf的复习思路,深入分析

Kafka源码篇——Kafka快速入门1.1Kafka简介1.2以Kafka为中心的解决方案1.3Kafka核心概念1.4搭建Kafka源码环境Kafka源码篇——生产者2.1KafkaProducer使用示例2.2KafkaProducer分析2.3RecordAccumulator分析2.4Sender分析Kafka源码篇——消费者3.1KafkaConsumer使... 查看详情

kafkaproducer原理(scala版同步producer)

...tp://www.cnblogs.com/huxi2b/p/4583249.html   供参考本文分析的Kafka代码为kafka-0.8.2.1。另外,由于Kafka目前提供了两套Producer代码,一套是Scala版的旧版本;一套是Java版的新版本。虽然Kafka社区极力推荐大家使用Java版本的producer,... 查看详情

kafka详解------producer生产者(代码片段)

...。  ③、因为消息要到网络上进行传输,所以必须进行序列化,序列化器的作用就是把消息的key和value对象序列化成字节数组。  ④、接下来数据传到分区器,如果之间的ProducerRecord对象指定了分区,那么分区器将不再做任... 查看详情

sparksql怎么使用kafkaavro序列化器

...个Kafka生产者实例。在创建生产者时需要指定使用KafkaAvro序列化器。importorg.apache.kafka.clients.producer.ProducerRecordimportorg.apache.kafka.clients.producer.KafkaProducerimportio.confluent.kafka.serializers.KafkaAvroSerializerimportjava.util.Properties valprops=newProp... 查看详情

夯实kafka知识体系及基本功分析一下生产者(producer)实现原理分析「原理篇」(代码片段)

kafka概述定义Kafka是一个分布式的基于发布/订阅模式的消息队列(messagequeue),主要应用于大数据的实时处理领域。消息队列传统的消息队列&新式的消息队列的模式上面是传统的消息队列,比如一个用户要注册... 查看详情