如何在kafka中实现多个生产者和多个消费者

     2023-04-18     272

关键词:

【中文标题】如何在kafka中实现多个生产者和多个消费者【英文标题】:How do I implement multiple producer and multiple consumer in kafka 【发布时间】:2016-01-19 09:19:55 【问题描述】:

我是 kafka 的新手,我有很多服务器会产生大量日志的要求,我想创建多个生产者和消费者。

我已经为单个生产者和消费者实现了,任何人都可以帮助我了解我将如何创建多个生产者和消费者。

这是我的生产者和消费者代码

制作人:-

public class MessageProducerExample 

    public static void main(String[] args) 

        System.out.println("Hello World");


         Properties props = new Properties();
         props.put("bootstrap.servers", "localhost:9092");
         props.put("acks", "all");
         props.put("retries", 0);
         props.put("batch.size", 16384);
         props.put("linger.ms", 1);
         props.put("buffer.memory", 33554432);
         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

         Producer<String, String> producer = new KafkaProducer<String, String>(props);
         for(int i = 0; i < 100; i++)
             producer.send(new ProducerRecord<String, String>("test", "Msg"+Integer.toString(i),"Msg"+Integer.toString(i)));

         producer.close();

    

消费者:-

public class MessageConsumerExample 

    public static void main(String[] args) 

        String topicName = args[0];


         Properties props = new Properties();
         props.put("bootstrap.servers", "localhost:9092");
         props.put("group.id", "test-consumer-group");
         props.put("enable.auto.commit", "true");
         props.put("auto.commit.interval.ms", "1000");
         props.put("session.timeout.ms", "30000");
         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         props.put("auto.offset.reset","earliest");

         System.out.println("TopicName="+topicName);

         KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
         consumer.subscribe(Arrays.asList(topicName));

         while (true) 
             ConsumerRecords<String, String> records = consumer.poll(100);
             System.out.println("records"+records.count());
             for (ConsumerRecord<String, String> record : records)
                 //System.out.println("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
                 System.out.println("RECORD_OFFSET"+record.offset()+"RECORD_KEY" +record.key()+"RECORD_VALUE "+record.value());
         
        

提前致谢。

【问题讨论】:

【参考方案1】:

Kafka 生产者在异步模式下为每个代理使用一个 iothread。 Kafka Producer - By default supports Multithreading?

对于消费者,您可以创建一个线程池,每个线程都运行一个消费者实例。只要确保所有实例都具有相同的消费者组。这里给出了一些例子-Kafka consumer group

【讨论】:

kafka面试题

一、Kafka架构  Producer:消息生产者,就是向kafkabroker发消息的客户端;  Consumer:消息消费者,向kafkabroker取消息的客户端;  ConsumerGroup(CG):消费者组,由多个consumer组成。消费者组内每个消费者负责消费不同分区的数... 查看详情

kafka知识总结

...者主动拉取数据。 kafka是一个生产-消费模型。 Producer:生产者,只负责数据生产,生产者的代码可以集成到任务系统中。 数据的分发策略由producer决定,默认是defaultPartitionUtils.abs(key.hashCode)%numPartitions Broker:当前服务器上的Kaf... 查看详情

kafka的三种客户端线程模型和一个小惊喜

...fka作为一个流式数据平台,对开发者提供了三种客户端:生产者/消费者、连接器、流处理。本文着重分析这三种客户端的线程模型。看到最后的通常都有惊喜。消费者的线程模型0.8版本以前的消费者客户端会创建一个基于ZK的消... 查看详情

kafka&nsq

...进入其中一个partition自己的实验也证实了这种猜测Kafka的生产者和消费者相对于服务端而言都是客户端,生产者客户端发布消息到服务端的指定主题,会指定消息所属的分区。生产者发布消息时根据消息是否有键,采用不同的分... 查看详情

消息队列-kafka消费者原理

一、消费者消费原理在实际生产过程中,每个topic都会对对应多个partitions,好处如下:能够对broker上的数据进行分片有效减少了消息的容量从而提升io性能为了提高消费端的消费能力,一般会通过多个consumer去消费同一个topicQ1.在... 查看详情

kafka是如何实现事务的

...关键特性是数据的一致性。具体到Kafka的领域中,也就是生产者生产的数据和消费者消费的数据之间一对一的一致性。在各种类型的失败普遍存在的分布式系统环境下,保证业务层面一个整体的消息集合被原子的发布和恰好一次... 查看详情

kafka文件存储结构和如何保证数据不丢失(代码片段)

...点,多个broker构成一个集群2-topic对消息进行归类3-producer生产者4-comsumer消费者5-consumerGroup消费组topic的组成:1-partition 物理上数据存储的概念,一个topic包含多个partition,每个partition内部是有序的;每个partition是一个目录;2-s... 查看详情

如何在 C 中实现无锁共享标志?

...nC?【发布时间】:2012-10-0819:45:50【问题描述】:我有一个生产者一消费者模型,我需要生产者在数据可用时设置一个标志。我怀疑我可以在没有锁定共享标志的情况下逃脱,因为:生产者在设置之前从不检查值偶尔错过标志更新... 查看详情

如何在 TPL 中实现连续运行的数据流块?

...-1814:30:24【问题描述】:我使用BufferBlock和ActionBlock设置了生产者/消费者数据流块,它在控制台应用程序中运行良好;将所有项目添加到BurfferBlock并将BufferBlock与其他ActionItems链接后;它 查看详情

kafka基本概念

...由一个键、一个值和一个时间戳组成kafka有四个核心API:生产者( ProducerAPI )    允许应用向一个或多个Kafka主题发布记录流。消费者( ConsumerAPI )    允许应用订阅一个或多个主题,并处理... 查看详情

如何在单一方法Springboot JPA中实现多个更新的事务

】如何在单一方法SpringbootJPA中实现多个更新的事务【英文标题】:HowtoimplementTransactionalformultipleupdatesinsinglemethodSpringbootJPA【发布时间】:2021-11-2120:47:33【问题描述】:我有一个使用SPringbootJPA对DB进行多次更新、删除和保存方法... 查看详情

一个生产者,多个消费者

】一个生产者,多个消费者【英文标题】:OneProducer,multipleConsumers【发布时间】:2015-03-2914:03:50【问题描述】:我一直在编写一些代码,但我需要帮助。我创建了一个生产者和一个消费者,但是我需要创建多个消费者,他们将从... 查看详情

真的,kafka入门一篇文章就够了(代码片段)

...主题中的分区有序,但是无法保证主题中所有的分区有序生产者:向主题发布消息的客户端应用程序称为生产者(Producer),生产者用于持续不断的向某个主题发送消息。消费者:订阅主题消息的客户端程序称为消费者(Consumer... 查看详情

kafka消息中间件(kafka与mqtt区别)(代码片段)

文章目录KafkaKafka重要原理Topic主题Partition分区Producer生产者Consumer消费者Broker中间件Offset偏移量Kafka与mqtt区别KafkaKafka是一个分布式流处理平台,它可以快速地处理大量的数据流。Kafka的核心原理是基于发布/订阅模式的消息队... 查看详情

消息中间件之kafka

参考技术AProducer:生产者,发送消息的一方。生产者负责创建消息,然后将其发送到Kafka服务器上。Consumer:消费者,接受消息的一方。消费者连接到Kafka服务器上并接收消息,进而进行相应的业务逻辑处理。ConsumerGroup:消费者... 查看详情

如何在 REXX 中实现多个条件?

】如何在REXX中实现多个条件?【英文标题】:HowdoIimplementmultipleconditionsinREXX?【发布时间】:2015-06-0210:13:46【问题描述】:我正在尝试整理一些REXX代码来实现以下目标。检查给定输入(0007145547162165876,0002734752467588968,55572987931854886... 查看详情

如何查看kafka消费者信息

...现了消息处理不管是producer还是consumer的高吞吐量。Kafka的生产者和消费者都可以多线程地并行操作,而每个线程处理的是一个分区的数据。因此分区实际上是调优Kafka并行度的最小单元。对于producer而言,它实际上 查看详情

kafkaconsumerlagmonitoring

...是卡夫卡消费者滞后?卡夫卡消费者滞后指标表明卡夫卡生产者和消费者之间存在多少滞后。人们谈论卡夫卡时,通常指的是卡夫卡经纪人。您可以将KafkaBroker视为Kafka服务器。代理实际上是存储和提供Kafka消息的对象。Kafka生产... 查看详情