hp下kafka的实践(代码片段)

author author     2022-11-24     751

关键词:

kafka

简介


Kafka 是一种高吞吐量的分布式发布订阅消息系统

kafka角色必知


producer:生产者。
consumer:消费者。
topic: 消息以topic为类别记录,Kafka将消息种子(Feed)分类, 每一类的消息称之为一个主题(Topic)。
broker:以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker;消费者可以订阅一个或多个主题(topic), 并从Broker拉数据,从而消费这些已发布的消息。

经典模型


1. 一个主题下的分区不能小于消费者数量,即一个主题下消费者数量不能大于分区属,大了就浪费了空闲了
2. 一个主题下的一个分区可以同时被不同消费组其中某一个消费者消费
3. 一个主题下的一个分区只能被同一个消费组的一个消费者消费

技术分享图片

常用参数说明

request.required.acks


Kafka producer的ack有3中机制,初始化producer时的producerconfig可以通过配置request.required.acks不同的值来实现。

0:这意味着生产者producer不等待来自broker同步完成的确认继续发送下一条(批)消息。此选项提供最低的延迟但最弱的耐久性保证(当服务器发生故障时某些数据会丢失,如leader已死,但producer并不知情,发出去的信息broker就收不到)。

1:这意味着producer在leader已成功收到的数据并得到确认后发送下一条message。此选项提供了更好的耐久性为客户等待服务器确认请求成功(被写入死亡leader但尚未复制将失去了唯一的消息)。

-1:这意味着producer在follower副本确认接收到数据后才算一次发送完成。 
此选项提供最好的耐久性,我们保证没有信息将丢失,只要至少一个同步副本保持存活。

三种机制,性能依次递减 (producer吞吐量降低),数据健壮性则依次递增。

auto.offset.reset


1. earliest:自动将偏移重置为最早的偏移量
2. latest:自动将偏移量重置为最新的偏移量(默认)
3. none:如果consumer group没有发现先前的偏移量,则向consumer抛出异常。
4. 其他的参数:向consumer抛出异常(无效参数)

kafka安装和简单测试

安装kafka(不需要安装,解包即可)


# 官方下载地址:http://kafka.apache.org/downloads
# wget https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.1/kafka_2.12-1.1.1.tgz
tar -xzf kafka_2.12-1.1.1.tgz
cd kafka_2.12-1.1.0

启动kafka server


# 需先启动zookeeper
# -daemon 可启动后台守护模式
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

启动kafka客户端测试


# 创建一个话题,test话题2个分区
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test
Created topic "test".

# 显示所有话题
bin/kafka-topics.sh --list --zookeeper localhost:2181
test

# 显示话题信息
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test    PartitionCount:2    ReplicationFactor:1    Configs:
    Topic: test    Partition: 0    Leader: 0    Replicas: 0    Isr: 0
    Topic: test    Partition: 1    Leader: 0    Replicas: 0    Isr: 0


# 启动一个生产者(输入消息)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
[等待输入自己的内容 出现>输入即可]
>i am a new msg !
>i am a good msg ?

# 启动一个消费者(等待消息) 
# 注意这里的--from-beginning,每次都会从头开始读取,你可以尝试去掉和不去掉看下效果
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
[等待消息]
i am a new msg !
i am a good msg ?

安装kafka的php扩展


# 先安装rdkfka库文件
git clone https://github.com/edenhill/librdkafka.git
cd librdkafka/
./configure 
make
sudo make install

git clone https://github.com/arnaud-lb/php-rdkafka.git
cd php-rdkafka
phpize
./configure
make all -j 5
sudo make install

vim [php]/php.ini
extension=rdkafka.so

php代码实践

生产者


<?php
$conf = new RdKafka\Conf();
$conf->setDrMsgCb(function ($kafka, $message) 
    file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND);
);
$conf->setErrorCb(function ($kafka, $err, $reason) 
    file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
);

$rk = new RdKafka\Producer($conf);
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("127.0.0.1");

$cf = new RdKafka\TopicConf();
// -1必须等所有brokers同步完成的确认 1当前服务器确认 0不确认,这里如果是0回调里的offset无返回,如果是1和-1会返回offset
// 我们可以利用该机制做消息生产的确认,不过还不是100%,因为有可能会中途kafka服务器挂掉
$cf->set('request.required.acks', 0);
$topic = $rk->newTopic("test", $cf);

$option = 'qkl';
for ($i = 0; $i < 20; $i++) 
    //RD_KAFKA_PARTITION_UA自动选择分区
    //$option可选
    $topic->produce(RD_KAFKA_PARTITION_UA, 0, "qkl . $i", $option);



$len = $rk->getOutQLen();
while ($len > 0) 
    $len = $rk->getOutQLen();
    var_dump($len);
    $rk->poll(50);

运行生产者


php producer.php
# output

int(20)
int(20)
int(20)
int(20)
int(0)

# 你可以查看你刚才上面启动的消费者shell应该会输出消息
qkl . 0
qkl . 1
qkl . 2
qkl . 3
qkl . 4
qkl . 5
qkl . 6
qkl . 7
qkl . 8
qkl . 9
qkl . 10
qkl . 11
qkl . 12
qkl . 13
qkl . 14
qkl . 15
qkl . 16
qkl . 17
qkl . 18
qkl . 19

Low Level 消费者


<?php
$conf = new RdKafka\Conf();
$conf->setDrMsgCb(function ($kafka, $message) 
    file_put_contents("./c_dr_cb.log", var_export($message, true), FILE_APPEND);
);
$conf->setErrorCb(function ($kafka, $err, $reason) 
    file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
);

//设置消费组
$conf->set('group.id', 'myConsumerGroup');

$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("127.0.0.1");

$topicConf = new RdKafka\TopicConf();
$topicConf->set('request.required.acks', 1);
//在interval.ms的时间内自动提交确认、建议不要启动
//$topicConf->set('auto.commit.enable', 1);
$topicConf->set('auto.commit.enable', 0);
$topicConf->set('auto.commit.interval.ms', 100);

// 设置offset的存储为file
//$topicConf->set('offset.store.method', 'file');
// 设置offset的存储为broker
 $topicConf->set('offset.store.method', 'broker');
//$topicConf->set('offset.store.path', __DIR__);

//smallest:简单理解为从头开始消费,其实等价于上面的 earliest
//largest:简单理解为从最新的开始消费,其实等价于上面的 latest
//$topicConf->set('auto.offset.reset', 'smallest');

$topic = $rk->newTopic("test", $topicConf);

// 参数1消费分区0
// RD_KAFKA_OFFSET_BEGINNING 重头开始消费
// RD_KAFKA_OFFSET_STORED 最后一条消费的offset记录开始消费
// RD_KAFKA_OFFSET_END 最后一条消费
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
//$topic->consumeStart(0, RD_KAFKA_OFFSET_END); //
//$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

while (true) 
    //参数1表示消费分区,这里是分区0
    //参数2表示同步阻塞多久
    $message = $topic->consume(0, 12 * 1000);
    if (is_null($message)) 
        sleep(1);
        echo "No more messages\n";
        continue;
    
    switch ($message->err) 
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages; will wait for more\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    

High LEVEL消费者


<?php
/**
 * Created by PhpStorm.
 * User: qkl
 * Date: 2018/8/22
 * Time: 17:58
 */
$conf = new \RdKafka\Conf();

function rebalance(\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) 
    global $offset;
    switch ($err) 
        case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
            echo "Assign: ";
            var_dump($partitions);
            $kafka->assign();
//            $kafka->assign([new RdKafka\TopicPartition("qkl01", 0, 0)]);
            break;

        case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
            echo "Revoke: ";
            var_dump($partitions);
            $kafka->assign(NULL);
            break;

        default:
            throw new \Exception($err);
    


// Set a rebalance callback to log partition assignments (optional)
$conf->setRebalanceCb(function(\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) 
    rebalance($kafka, $err, $partitions);
);

// Configure the group.id. All consumer with the same group.id will consume
// different partitions.
$conf->set('group.id', 'test-110-g100');

// Initial list of Kafka brokers
$conf->set('metadata.broker.list', '192.168.216.122');

$topicConf = new \RdKafka\TopicConf();

$topicConf->set('request.required.acks', -1);
//在interval.ms的时间内自动提交确认、建议不要启动
$topicConf->set('auto.commit.enable', 0);
//$topicConf->set('auto.commit.enable', 0);
$topicConf->set('auto.commit.interval.ms', 100);

// 设置offset的存储为file
$topicConf->set('offset.store.method', 'file');
$topicConf->set('offset.store.path', __DIR__);
// 设置offset的存储为broker
// $topicConf->set('offset.store.method', 'broker');

// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
$topicConf->set('auto.offset.reset', 'smallest');

// Set the configuration to use for subscribed/assigned topics
$conf->setDefaultTopicConf($topicConf);

$consumer = new \RdKafka\KafkaConsumer($conf);

//$KafkaConsumerTopic = $consumer->newTopic('qkl01', $topicConf);

// Subscribe to topic 'test'
$consumer->subscribe(['qkl01']);

echo "Waiting for partition assignment... (make take some time when\n";
echo "quickly re-joining the group after leaving it.)\n";

while (true) 
    $message = $consumer->consume(120*1000);
    switch ($message->err) 
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message);
//            $consumer->commit($message);
//            $KafkaConsumerTopic->offsetStore(0, 20);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages; will wait for more\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    

消费组特别说明


特别注意,High LEVEL消费者设置的消费组,kafka服务器才会记录, Low Level消费者设置的消费组,服务器不会记录

具体查看消费组信息,你可以翻阅本篇文章

查看服务器元数据(topic/partition/broker)


<?php

$conf = new RdKafka\Conf();
$conf->setDrMsgCb(function ($kafka, $message) 
    file_put_contents("./xx.log", var_export($message, true), FILE_APPEND);
);
$conf->setErrorCb(function ($kafka, $err, $reason) 
    printf("Kafka error: %s (reason: %s)\n", rd_kafka_err2str($err), $reason);
);

$conf->set('group.id', 'myConsumerGroup');

$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("127.0.0.1");

$allInfo = $rk->metadata(true, NULL, 60e3);

$topics = $allInfo->getTopics();

echo rd_kafka_offset_tail(100);
echo "--";

echo count($topics);
echo "--";


foreach ($topics as $topic) 

    $topicName = $topic->getTopic();
    if ($topicName == "__consumer_offsets") 
        continue ;
    

    $partitions = $topic->getPartitions();
    foreach ($partitions as $partition) 
//        $rf = new ReflectionClass(get_class($partition));
//        foreach ($rf->getMethods() as $f) 
//            var_dump($f);
//        
//        die();
        $topPartition = new RdKafka\TopicPartition($topicName, $partition->getId());
        echo  "当前的话题:" . ($topPartition->getTopic()) . " - " . $partition->getId() . " - ";
        echo  "offset:" . ($topPartition->getOffset()) . PHP_EOL;
    

如果需远端生产和消费


vim config/server.properties
advertised.listeners=PLAINTEXT://ip:9092
# ip 未你kafka的外网ip即可

分享一个打包好的php-rdkafka的类库

https://github.com/qkl9527/php-rdkafka-class

技术分享图片

参考文献

Kafka文档

原文地址:https://segmentfault.com/a/1190000015765348

聊聊kafka:kafka消息重复的场景以及最佳实践(代码片段)

...一篇我们讲了聊聊Kafka:Kafka消息丢失的场景以及最佳实践,这一篇我们来说一说Kafka消息重复的场景以及最佳实践。我们下面会从以下两个方面来说一下Kafka消息重复的场景以及最佳实践。生产者重复消息消费者重复消息... 查看详情

kafka的最佳实践(代码片段)

背景一个Python项目中要使用kafka去deliverlog,1个producer,5个consumer。单条消息的大小是100KB~500KB,producer要在1秒钟之内能够发送30个这样的message。然后每个consumer也要在1秒钟之内消费30个这样的message。另外,消息不需要持久化(... 查看详情

kafka最佳实践(代码片段)

注意事项kafka重平衡比较坑,当客户端收不到最新的消息时,大概率是kafka在重平衡,可以查看消费位点,查看kafka是否活跃,是否在重平衡。常用命令docker中查看消费位点:dockerexec-itkafka/bin/bashcd/opt/kafka/binkafka-... 查看详情

kafka集群监控安全机制与最佳实践(代码片段)

Kafka监控安装Kafka集群监控方案选择:Kafka只能依靠kafka-run-class.sh等命令进行管理KafkaManager(CMAK)是目前比较常用的监控工具,它有如下功能:管理多个集群轻松检查群集状态(主题,消费者,偏移,代理,副本分发,分区分发... 查看详情

kafka-connect实践(代码片段)

一、Kafka-Connect介绍  Kafka是一个使用越来越广的消息系统,尤其是在大数据开发中(实时数据处理和分析)。为集成其他系统和解耦应用,之前经常使用Producer来发送消息到Broker,并使用Consumer来消费Broker中... 查看详情

阿里云消息队列kafka-消息检索实践(代码片段)

...的排查办法,以及消息队列Kafka「检索组件」的场景实践,并对其关键技术进行解读。旨在帮助大家对消息队列Kafka「检索组件」的特点和使用方式更加熟悉,以更有效地解决消息排查过程中所遇到的问题。场景痛点... 查看详情

kafka新老集群平滑迁移实践(代码片段)

前言公司一直使用云上的kafka服务,随着业务规模和体量的增大,使用云上的服务成本相对比较高,所以考虑本地自建kafka集群对外提供服务。因此,需要把正在运行的还在使用云上kafka的业务服务迁移到本地自建... 查看详情

kafkaeagle监控mrskafka之操作实践(代码片段)

本文分享自华为云社区《KAFKAEAGLE监控MRSkafka之操作实践》,作者:啊喔YeYe。1.KafkaEagle简介Kafkaeagle 是一款分布式、高可用的kafka监控软件,提供丰富的kafka监控指标,例如:Kafka集群的Broker数、Topic数、Consumer数... 查看详情

kafka基本命令和实践(代码片段)

Kafka基本命令#启动server./bin/kafka-server-start.shconfig/server.properties#创建topic(主题)test./bin/kafka-topics.sh--create--zookeeperlocalhost:2181--replication-factor1-partitions1--topictest#删除主题./bin/kafka-topics.sh--zookeeperlocalhost:2181--delete--topictest#–注意:... 查看详情

聊聊kafka:kafka消息丢失的场景以及最佳实践(代码片段)

...会从以下三个方面来说一下Kafka消息丢失的场景以及最佳实践。生产者丢失消息KafkaBroker服务端丢失消息消费者丢失消息二、Kafka的三种消息语义先说Kafka消息丢失的场景之前,我们先来说下Kafka的三种消息语义,不会还有... 查看详情

中通消息平台kafka顺序消费线程模型的实践与优化(代码片段)

此文转载自:https://blog.csdn.net/zchdjb/article/details/110427195各类消息中间件对顺序消息实现的做法是将具有顺序性的一类消息发往相同的主题分区中,只需要将这类消息设置相同的Key即可,而Kafka会在任意时刻保证一个消费组同时只... 查看详情

centos7安装kafka+zookeeper实践(代码片段)

一、概述最近有项目要测试kafka连接可用性,本地搭建模拟环境测试一下,本文作为一个记录。二、部署2.1环境准备本次采用Centos7.9安装,因为kafka和zookeeper都需要用到jdk,这次安装的使用oraclejdk8https://www.oracle.com/java/technologies/ja... 查看详情

聊聊kafka:kafka消息重复的场景以及最佳实践(代码片段)

...一篇我们讲了聊聊Kafka:Kafka消息丢失的场景以及最佳实践,这一篇我们来说一说Kafka消息重复的场景以及最佳实践。我们下面会从以下两个方面来说一下Kafka消息重复的场景以及最佳实践。生产者重复消息消费者重复消息... 查看详情

夯实kafka知识体系及基本功「实践操作篇」单机部署实践手册(2.8.0)(代码片段)

来一段舞蹈前提回顾下载kafkawgethttps://archive.apache.org/dist/kafka/0.8.0/kafka_2.8.0-0.8.0.tar.gz或者curl-Ohttps://archive.apache.org/dist/kafka/0.8.0/kafka_2.8.0-0.8.0.tar.gz解压缩tar-vxfkafka_2.8.0-0.8.0.tar.gz修改 查看详情

zookeeper和kafka的sasl认证以及生产实践(代码片段)

一、什么是zookeeper?ZooKeeper是一个集中的服务,用于维护配置信息、命名、提供分布式同步以及提供组服务。所有这些类型的服务都以某种形式被分布式应用程序使用。每次它们被实现时,都有大量的工作需要去修复... 查看详情

kafka认证一:plainloginmodule认证及java连接测试(代码片段)

...;且支持安全认证模式。本文介绍Kafka帐号密码认证的完整实践流程,自己实践一遍,才能穿起各个概念。Kafka的Plain简单文本认证方式比较简单,只需要Kafka服务端维护用户列表,客户端同样的安全认证配置即可。... 查看详情

kafka认证一:plainloginmodule认证及java连接测试(代码片段)

...;且支持安全认证模式。本文介绍Kafka帐号密码认证的完整实践流程,自己实践一遍,才能穿起各个概念。Kafka的Plain简单文本认证方式比较简单,只需要Kafka服务端维护用户列表,客户端同样的安全认证配置即可。... 查看详情

kafka安装(代码片段)

...务、三高架构(高性能、高并发、高可用)有过实践架构经验。博主:java_wxid社区:幕后大佬文章目录一、安装JDK二、安装Zookeeper三、安装Kafka四、启动并验证kafka启动kafka进入zookeeper目录通过zookeeper客户端查看下z... 查看详情