关键词:
最近开发网关服务的过程当中,需要用到kafka转发消息与保存日志,在进行压测的过程中由于是多线程并发操作kafka producer 进行异步send,发现send耗时有时会达到几十毫秒的阻塞,很大程度上上影响了并发的性能,而在后续的测试中发现单线程发送反而比多线程发送效率高出几倍。所以就对kafka API send 的源码进行了一下跟踪和分析,在此总结记录一下。
首先看springboot下 kafka producer 的使用
在config中进行配置,向IOC容器中注入DefaultKafkaProducerFactory生产者工厂的实例
@Bean public ProducerFactory<Object, Object> producerFactory() return new DefaultKafkaProducerFactory<>(producerConfigs());
创建producer
this.producer = producerFactory.createProducer();
大家都知道springboot下IOC容器管理的实例默认都是单例模式;而DefaultKafkaProducerFactory本身也是一个单例工厂
@Override public Producer<K, V> createProducer() if (this.transactionIdPrefix != null) return createTransactionalProducer(); if (this.producer == null) synchronized (this) if (this.producer == null) this.producer = new CloseSafeProducer<K, V>(createKafkaProducer()); return this.producer;
我们创建的producer也是个单例。
接下来就是具体的发送,用过kafka的小伙伴都知道producer.send是个异步操作,会返回一个Future<RecordMetadata> 类型的结果。那么为什么单线程和多线程send效率会较大的差距呢,我们进入KafkaProducer内部看下producer.send的具体源码实现来找下答案
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) TopicPartition tp = null; try //保证主题的元数据可用 ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs); long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs); Cluster cluster = clusterAndWaitTime.cluster; byte[] serializedKey; try //序列化key serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key()); catch (ClassCastException cce) throw new SerializationException("Can‘t convert key of class " + record.key().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() + " specified in key.serializer", cce); byte[] serializedValue; try //序列化Value serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value()); catch (ClassCastException cce) throw new SerializationException("Can‘t convert value of class " + record.value().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + " specified in value.serializer", cce); //计算出具体的partition int partition = partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition); setReadOnly(record.headers()); Header[] headers = record.headers().toArray(); int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(), compressionType, serializedKey, serializedValue, headers); ensureValidRecordSize(serializedSize); long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp(); log.trace("Sending record with callback to topic partition ", record, callback, record.topic(), partition); // producer callback will make sure to call both ‘callback‘ and interceptor callback Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp); if (transactionManager != null && transactionManager.isTransactional()) transactionManager.maybeAddPartitionToTransaction(tp); //向队列容器中添加数据 RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs); if (result.batchIsFull || result.newBatchCreated) log.trace("Waking up the sender since topic partition is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); return result.future; // handling exceptions and record the errors; // for API exceptions return them in the future, // for other exceptions throw directly catch (ApiException e) log.debug("Exception occurred during message send:", e); if (callback != null) callback.onCompletion(null, e); this.errors.record(); this.interceptors.onSendError(record, tp, e); return new FutureFailure(e); catch (InterruptedException e) this.errors.record(); this.interceptors.onSendError(record, tp, e); throw new InterruptException(e); catch (BufferExhaustedException e) this.errors.record(); this.metrics.sensor("buffer-exhausted-records").record(); this.interceptors.onSendError(record, tp, e); throw e; catch (KafkaException e) this.errors.record(); this.interceptors.onSendError(record, tp, e); throw e; catch (Exception e) // we notify interceptor about all exceptions, since onSend is called before anything else in this method this.interceptors.onSendError(record, tp, e); throw e;
这里除了前面做的一些序列化操作和判断,最关键的就是向队列容器中执行添加数据操作
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);
accumulator是RecordAccumulator这个类的一个实例,RecordAccumulator类是一个队列容器类;它的内部维护了一个ConcurrentMap,每一个TopicPartition都对应一个专属的消息队列。
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
我们进入accumulator.append内部看下具体的实现
public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long maxTimeToBlock) throws InterruptedException // We keep track of the number of appending thread to make sure we do not miss batches in // abortIncompleteBatches(). appendsInProgress.incrementAndGet(); ByteBuffer buffer = null; if (headers == null) headers = Record.EMPTY_HEADERS; try //根据TopicPartition拿到对应的批处理队列 Deque<ProducerBatch> dq = getOrCreateDeque(tp); //同步队列,保证线程安全 synchronized (dq) if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); //把序列化后的数据放入队列,并返回结果 RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq); if (appendResult != null) return appendResult; // we don‘t have an in-progress record batch try to allocate a new batch byte maxUsableMagic = apiVersions.maxUsableProduceMagic(); int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers)); log.trace("Allocating a new byte message buffer for topic partition ", size, tp.topic(), tp.partition()); buffer = free.allocate(size, maxTimeToBlock); synchronized (dq) // Need to check if producer is closed again after grabbing the dequeue lock. if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq); if (appendResult != null) // Somebody else found us a batch, return the one we waited for! Hopefully this doesn‘t happen often... return appendResult; MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic); ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds()); FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds())); dq.addLast(batch); incomplete.add(batch); // Don‘t deallocate this buffer in the finally block as it‘s being used in the record batch buffer = null; return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true); finally if (buffer != null) free.deallocate(buffer); appendsInProgress.decrementAndGet();
在getOrCreateDeque中我们根据TopicPartition从ConcurrentMap获取对应队列,没有的话就初始化一个。
private Deque<ProducerBatch> getOrCreateDeque(TopicPartition tp) Deque<ProducerBatch> d = this.batches.get(tp); if (d != null) return d; d = new ArrayDeque<>(); Deque<ProducerBatch> previous = this.batches.putIfAbsent(tp, d); if (previous == null) return d; else return previous;
更关键的是为了保证并发时的线程安全,执行 RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq)时,Deque<ProducerBatch>必然需要同步处理。
synchronized (dq) if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq); if (appendResult != null) return appendResult;
在这里我们可以看出,多线程高并发情况下,dq会处在比较大的资源竞争,虽然是基于内存的操作,每个线程持有锁的时间极短,但相比单线程情况,高并发情况下线程开辟较多,锁竞争和cpu上下文切换都比较频繁,会造成一定的性能损耗,产生阻塞耗时。
分析到这里你就会发现,其实KafkaProducer这个异步发送是建立在生产者和消费者模式上的,send的真正操作并不是直接异步发送,而是把数据放在一个中间队列中。那么既然有生产者在往内存队列中放入数据,那么必然会有一个专有的线程负责把这些数据真正发送出去。我们通过监控jvm线程信息可以看到,KafkaProducer创建后确实会启动一个守护线程用于消息的发送。
OK,我们再回到 KafkaProducer中,会看到里面有这样两个对象,Sender就是kafka发送数据的后台线程
private final Sender sender; private final Thread ioThread;
在KafkaProducer的构造函数中会启动Sender线程
this.sender = new Sender(logContext, client, this.metadata, this.accumulator, maxInflightRequests == 1, config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), acks, retries, metricsRegistry.senderMetrics, Time.SYSTEM, this.requestTimeoutMs, config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG), this.transactionManager, apiVersions); String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId; this.ioThread = new KafkaThread(ioThreadName, this.sender, true); this.ioThread.start();
进入Sender内部可以看到这个线程的作用就是一直轮询发送数据。
public void run() log.debug("Starting Kafka producer I/O thread."); // main loop, runs until close is called while (running) try run(time.milliseconds()); catch (Exception e) log.error("Uncaught error in kafka producer I/O thread: ", e); log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records."); // okay we stopped accepting requests but there may still be // requests in the accumulator or waiting for acknowledgment, // wait until these are completed. while (!forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0)) try run(time.milliseconds()); catch (Exception e) log.error("Uncaught error in kafka producer I/O thread: ", e); if (forceClose) // We need to fail all the incomplete batches and wake up the threads waiting on // the futures. log.debug("Aborting incomplete batches due to forced shutdown"); this.accumulator.abortIncompleteBatches(); try this.client.close(); catch (Exception e) log.error("Failed to close network client", e); log.debug("Shutdown of Kafka producer I/O thread has completed."); /** * Run a single iteration of sending * * @param now The current POSIX time in milliseconds */ void run(long now) if (transactionManager != null) try if (transactionManager.shouldResetProducerStateAfterResolvingSequences()) // Check if the previous run expired batches which requires a reset of the producer state. transactionManager.resetProducerId(); if (!transactionManager.isTransactional()) // this is an idempotent producer, so make sure we have a producer id maybeWaitForProducerId(); else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) transactionManager.transitionToFatalError(new KafkaException("The client hasn‘t received acknowledgment for " + "some previously sent messages and can no longer retry them. It isn‘t safe to continue.")); else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest(now)) // as long as there are outstanding transactional requests, we simply wait for them to return client.poll(retryBackoffMs, now); return; // do not continue sending if the transaction manager is in a failed state or if there // is no producer id (for the idempotent case). if (transactionManager.hasFatalError() || !transactionManager.hasProducerId()) RuntimeException lastError = transactionManager.lastError(); if (lastError != null) maybeAbortBatches(lastError); client.poll(retryBackoffMs, now); return; else if (transactionManager.hasAbortableError()) accumulator.abortUndrainedBatches(transactionManager.lastError()); catch (AuthenticationException e) // This is already logged as error, but propagated here to perform any clean ups. log.trace("Authentication exception while processing transactional request: ", e); transactionManager.authenticationFailed(e); long pollTimeout = sendProducerData(now); client.poll(pollTimeout, now);
通过上面的分析我们可以看出producer.send操作本身其实是个基于内存的存储操作,耗时几乎可以忽略不计,但由于高并发情况下,线程同步会有一定的性能损耗,当然这个损耗在一般的应用场景下几乎是可以忽略不计的,但如果是数据量比较大,高并发的场景下会比较明显。
针对上面的问题分析,这里说下我个人的一些总结:
1、首先避免多线程操作producer发送数据,你可以采用生产者消费者模式把producer.send从你的多线程操作中解耦出来,维护一个你要发送的消息队列,单独开辟一个线程操作;
2、可能有的小伙伴会问,那么多创建几个producer的实例或者维护一个producer池可以吗,我原本也是这个想法,只是在测试中发现效果也不是很理想,我估计是由于创建producer实例过多,导致线程数量也跟着增加,本身的业务线程再加上kafka的线程,线程上下文切换比较频繁,CPU资源压力比较大,效率也不如单线程操作;
3、这个问题其实真是针对API操作来讲的,send操作并不是真正的数据发送,真正的数据发送由守护线程进行;按照kafka本身的设计思想,如果操作本身就成为了你性能的瓶颈,你应该考虑的是集群部署,负载均衡;
4、无锁才是真正的高性能;
高并发下springcloud hystrix的严重问题?
】高并发下springcloudhystrix的严重问题?【英文标题】:seriousissueofspringcloudhystrixunderhighconcurrency?【发布时间】:2018-04-0523:55:24【问题描述】:第1部分我使用HystrixCommand进行服务,并使用jmeter进行高并发测试。测试结果太差了,看... 查看详情
高并发下的static类成员可能存在安全隐患
有一个网友在高并发下使用下面的日期转换工具类时,遇到的问题publicclassDateUtil{privateDateUtil(){}privatestaticfinalDateFormatDATE_FORMAT=newSimpleDateFormat("yyyy-MM-ddHH:mm:ss");publicstaticDateparse(Stringdate)throwsParseExcept 查看详情
高并发下减少锁竞争
1.减少锁的持有时间,将不需要锁的操作从同步代码块的移除。 //可以优化的代码 class AttributeStore{ private final Map<String,String> attributes=new HashMap& 查看详情
高并发下接口的并发问题
...会员,限制每个帐号只能领取一个有恶意用户刷接口,在高并发下越过限制。原因领取会员流程:1.后端先生成卡卷,将卡号放到消息队列中2.用户扫码请求领取会员接口2-1).先检查用户是否已经领取过该活动会员2-2).领取过return... 查看详情
1.网站高并发下的测试指标及优化泛谈
网站高并发下的测试指标:1.并发量:可以承接多少次请求。2.服务器负载:服务器的cpu/内存消耗。3.平均响应时间:处理一次请求花费的时间。测试高并发时,一般的测试标准是在服务器负载为70%的时候可以处理多少次请求还... 查看详情
redis实现高并发下的抢购/秒杀功能
...lhttps://www.cnblogs.com/TankXiao/p/4045439.html之前写过一篇文章,高并发的解决思路(点此进入查看),今天再次抽空整理下实际场景中的具体代码逻辑实现吧:抢购/秒杀 查看详情
高并发下线程安全的单例模式
...之一,而单例模式有很多种实现方式,你是否都了解呢?高并发下如何保证单例模式的线程安全性呢?如何保证序列化后的单例对象在反序列化后任然是单例的呢?这些问题在看了本文之后都会一一的告诉你答案,赶快来阅读吧... 查看详情
漫画:高并发下的hashmap(代码片段)
这一期我们来讲解高并发环境下,HashMap可能出现的致命问题。 HashMap的容量是有限的。当经过多次元素插入,使得HashMap达到一定饱和度时,Key映射位置发生冲突... 查看详情
面试实战考核:设计一个高并发下的下单功能(代码片段)
功能需求:设计一个秒杀系统初始方案商品表设计:热销商品提供给用户秒杀,有初始库存。@EntitypublicclassSecKillGoodsimplementsSerializable@IdprivateStringid;/***剩余库存*/privateIntegerremainNum;/***秒杀商品名称*/privateStringgoodsName;秒杀订单表... 查看详情
redis实现高并发下的抢购秒杀功能
...很常见的一个应用场景,主要需要解决的问题有两个:1高并发对数据库产生的压力2竞争状态下如何解决库存的正确减少("超卖"问题)对于第一个问题,已经很容易想到用缓存来处理抢购,避免直接操作数据库,例如使用Redis。... 查看详情
day783.网络通信优化之i/o模型:如何解决高并发下i/o瓶颈-java性能调优实战(代码片段)
...高并发下I/O瓶颈Hi,我是阿昌,今天学习记录的是关于网络通信优化之I/O模型:如何解决高并发下I/O瓶颈。提到JavaI/O,相信你一定不陌生。可能使用I/O操作读写文件,也可能使用它实现Socket的信息传输…这些... 查看详情
高并发下,hashmap会产生哪些问题?(代码片段)
HashMap在高并发环境下会产生的问题HashMap其实并不是线程安全的,在高并发的情况下,会产生并发引起的问题:比如:HashMap死循环,造成CPU100%负载触发fail-fast下面逐个分析下出现上述情况的原因:HashMap死循环的原因HashMap进行存... 查看详情
宜人贷系统架构——高并发下的进化之路
宜人贷系统版本的迭代1.0版本——简单的烦恼1.PNG迭代之前宜人贷的系统,其实就是一个前台,一个后台,一个DB,前台采用的是多机部署的方式。软件层也是跟最传统的软件一样分三层,第一层是Controller,第二层是Service... 查看详情
高并发下接口幂等性解决方案
一、幂等性概念在编程中.一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。幂等函数,或幂等方法,是指可以使用相同参数重复执行,并能获得相同结果的函数。这些函数不会影响系统状态,也不... 查看详情
高并发下秒杀商品,这9个细节得知道(代码片段)
大家好,我是bigsai,又跟大家见面了。前言高并发下如何设计秒杀系统?这是一个高频面试题。这个问题看似简单,但是里面的水很深,它考查的是高并发场景下,从前端到后端多方面的知识。秒杀一般出... 查看详情
java中如何一次请求生成一个日志文件高并发下可用
参考技术Ajava是编程语言里比较难学的一门,如果有心从事编程方向的工作,最好到专业机构学习并有更多的项目实践,更贴近市场,这样更有利于将来的发展。 参考技术B日志框架,或写一个切面aop 查看详情
分布式事务,高并发下分布式事务的解决方案
我在上一期介绍了spring的事务原理(详情见《深入理解spring事务原理》),Spring事务本质是单机下的事务,是由数据库本身保证的。今天,我将介绍一种比较复杂的事务:分布式事务。1、什么是分布式事务分布式事务就是指事... 查看详情
高并发下缓存失效问题及解决方案
缓存穿透介绍:当查询一个不存在的数据,此时缓存是不命中的,就会去查询db,这将导致每次查询这个不存在的数据都要去访问db,缓存就没有意义了。如果不怀好意的人利用不存在的数据进行攻击,可能导致数据库崩溃解决... 查看详情