关键词:
一、下载kafka_2.12-2.4.0.tgz并解压至/home/kafka_2.12-2.4.0
二、配置kafka
2.1 创建kafka日志文件夹:/home/kafka_2.12-2.4.0/logs
2.2 创建zookeeper数据目录:/tmp/zookeeper
2.3 配置/home/kafka_2.12-2.4.0/config/server.properties 内容如下(SSL证书在下面介绍):
ssl.keystore.location=/home/ca/server/server.keystore.jks ssl.keystore.password=mima123 ssl.key.password=mima123 ssl.truststore.location=/home/ca/trust/server.truststore.jks ssl.truststore.password=mima123 ssl.client.auth=required ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 ssl.keystore.type=JKS ssl.truststore.type=JKS ssl.endpoint.identification.algorithm= #security.inter.broker.protocol=SSL inter.broker.listener.name=SSL ############################# Server Basics ############################# broker.id=0 listeners=SSL://阿里云内网IP:9093 advertised.listeners=SSL://阿里云外网IP:9093 listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 ############################# Log Basics ############################# log.dirs=/home/kafka_2.12-2.4.0/logs num.partitions=1 num.recovery.threads.per.data.dir=1 ############################# Internal Topic Settings ############################# offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 ############################# Log Flush Policy ############################# #log.flush.interval.messages=10000 #log.flush.interval.ms=1000 ############################# Log Retention Policy ############################# log.retention.hours=168 #log.retention.bytes=1073741824 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 ############################# Zookeeper ############################# zookeeper.connect=阿里云内网IP:2181 zookeeper.connection.timeout.ms=6000 ############################# Group Coordinator Settings ############################# group.initial.rebalance.delay.ms=0 delete.topic.enble=true
2.4 配置 /home/kafka_2.12-2.4.0/config/zookeeper.properties
dataDir=/tmp/zookeeper clientPort=2181 maxClientCnxns=0 admin.enableServer=false
2.5 配置/etc/hosts文件,增加红色行,IP为阿里云内网IP
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
172.18.54.18 iZwz9gq8vhwxtgpg21yonsZ iZwz9gq8vhwxtgpg21yonsZ
172.18.54.18 kafka-single
三、生成SSL相关证书文件
3.1、创建四个文件夹 /home/ca/root、/home/ca/trust、/home/ca/server、/home/ca/client
3.2、签发相关证书
第一步:生成server.keystore.jks文件(即:生成服务端的keystore文件)
keytool -keystore /home/ca/server/server.keystore.jks -alias ds-kafka-single -validity 10000 -genkey -keypass mima123 -keyalg RSA -dname "CN=kafka-single,OU=qmx,O=qmx,L=beijing,S=beijing,C=cn" -storepass mima123 -ext SAN=DNS:kafka-single
第二步:生成CA认证证书(为了保证整个证书的安全性,需要使用CA进行证书的签名保证)
openssl req -new -x509 -keyout /home/ca/root/ca-key -out /home/ca/root/ca-cert -days 10000 -passout pass:mima123 -subj "/C=cn/ST=beijing/L=beijing/O=qmx/OU=qmx/CN=kafka-single"
第三步:通过CA证书创建一个客户端信任证书
keytool -keystore /home/ca/trust/client.truststore.jks -alias CARoot -import -file /home/ca/root/ca-cert -storepass mima123
第四步:通过CA证书创建一个服务端器端信任证书
keytool -keystore /home/ca/trust/server.truststore.jks -alias CARoot -import -file /home/ca/root/ca-cert -storepass mima123
第五步:服务器证书的签名处理
第1小步:导出服务器端证书server.cert-file
keytool -keystore /home/ca/server/server.keystore.jks -alias ds-kafka-single -certreq -file /home/ca/server/server.cert-file -storepass mima123
第2小步:用CA给服务器端证书进行签名处理
openssl x509 -req -CA /home/ca/root/ca-cert -CAkey /home/ca/root/ca-key -in /home/ca/server/server.cert-file -out /home/ca/server/server.cert-signed -days 10000 -CAcreateserial -passin pass:mima123
第3小步:将CA证书导入到服务器端keystore
keytool -keystore /home/ca/server/server.keystore.jks -alias CARoot -import -file /home/ca/root/ca-cert -storepass mima123
第4小步:将已签名的服务器证书导入到服务器keystore
keytool -keystore /home/ca/server/server.keystore.jks -alias ds-kafka-single -import -file /home/ca/server/server.cert-signed -storepass mima123
客户端SSL证书签发
第一步:生成client.keystore.jks文件
keytool -keystore /home/ca/client/client.keystore.jks -alias ds-kafka-single -validity 10000 -genkey -keypass mima123-dname "CN=kafka-single,OU=qmx,O=qmx,L=beijing,S=beijing,C=cn" -ext SAN=DNS:kafka-single -storepass mima123
第二步:导出客户端证书client.cert-file
keytool -keystore /home/ca/client/client.keystore.jks -alias ds-kafka-single -certreq -file /home/ca/client/client.cert-file -storepass mima123
第三步:用CA给客户端证书进行签名处理
openssl x509 -req -CA /home/ca/root/ca-cert -CAkey /home/ca/root/ca-key -in /home/ca/client/client.cert-file -out /home/ca/client/client.cert-signed -days 10000 -CAcreateserial -passin pass:mima123
第四步:将CA证书导入到客户端keystore
keytool -keystore /home/ca/client/client.keystore.jks -alias CARoot -import -file /home/ca/root/ca-cert -storepass mima123
第五步:将已签名的证书导入到客户端keystore
keytool -keystore /home/ca/client/client.keystore.jks -alias ds-kafka-single -import -file /home/ca/client/client.cert-signed -storepass mima123
四、启动和停止kafka和zookeeper服务
cd /home/kafka_2.12-2.4.0/bin
启动zookeeper:
./zookeeper-server-start.sh /home/kafka_2.12-2.4.0/config/zookeeper.properties &
启动kafka:
./kafka-server-start.sh /home/kafka_2.12-2.4.0/config/server.properties &
查看topic情况:
./kafka-topics.sh --list --zookeeper localhost:2181
关闭kafka:
./kafka-server-stop.sh
关闭zookeeper:
./zookeeper-server-stop.sh
查看 kafka 的 topic 情况:
./kafka-topics.sh --list --zookeeper 172.18.54.18:2181
查看topic详细信息:
./kafka-topics.sh --describe --zookeeper 172.18.54.18:2181 --topic topic1
生产者客户端命令:
./kafka-console-producer.sh --broker-list 172.18.54.18:9092 --topic topic1
消费者客户端命令:
./kafka-console-consumer.sh --bootstrap-server 172.18.54.18:9092 --topic topic1 --from-beginning
五、JAVA客户端对接
5.1 Producer
package com.xrh.extend.kafka; import java.util.Properties; import java.util.Random; import java.util.concurrent.Future; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; public class Producer { public static String topic = "topic2";//定义主题 public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "阿里云外网IP:9093");//kafka地址,多个地址用逗号分割 // acks:消息的确认机制,默认值是0。 // acks=0:如果设置为0,生产者不会等待kafka的响应。 // acks=1:这个配置意味着kafka会把这条消息写到本地日志文件中,但是不会等待集群中其他机器的成功响应。 // acks=all:这个配置意味着leader会等待所有的follower同步完成。这个确保消息不会丢失,除非kafka集群中所有机器挂掉。这是最强的可用性保证。 props.put("acks", "all"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put("security.protocol", "SSL"); props.put("ssl.truststore.location", "D:caclient.truststore.jks"); props.put("ssl.truststore.password", "mima123"); props.put("ssl.keystore.location", "D:caclient.keystore.jks"); props.put("ssl.keystore.password", "mima123"); props.setProperty("ssl.endpoint.identification.algorithm", ""); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props); try { int i = 1; while (i < 20) { String msg = "测试 Hello," + new Random().nextInt(100); ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic , "key1", msg); kafkaProducer.send(record, new MyProducerCallBack()); System.out.println("消息发送成功:" + msg); ++ i; Thread.sleep(500); } } finally { kafkaProducer.close(); } } private static class MyProducerCallBack implements Callback { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if(null != e){ e.printStackTrace(); return; } System.out.println("时间戳,主题,分区,位移: " + recordMetadata.timestamp() + ", " + recordMetadata.topic() + "," + recordMetadata.partition() + " " + recordMetadata.offset()); } }; // acks = 1 // batch.size = 16384 //当多条消息需要发送到同一个分区时,生产者会尝试合并网络请求。这会提高client和生产者的效率。 // bootstrap.servers = [39.108.124.173:9092] // buffer.memory = 33554432 // client.dns.lookup = default // client.id = // compression.type = none // connections.max.idle.ms = 540000 // delivery.timeout.ms = 120000 // enable.idempotence = false // interceptor.classes = [] // key.serializer = class org.apache.kafka.common.serialization.StringSerializer // linger.ms = 0 // max.block.ms = 60000 // max.in.flight.requests.per.connection = 5 // max.request.size = 1048576 // metadata.max.age.ms = 300000 // metric.reporters = [] // metrics.num.samples = 2 // metrics.recording.level = INFO // metrics.sample.window.ms = 30000 // partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner // receive.buffer.bytes = 32768 // reconnect.backoff.max.ms = 1000 // reconnect.backoff.ms = 50 // request.timeout.ms = 30000 // retries = 2147483647 //配置为大于0的值的话,客户端会在消息发送失败时重新发送。 // retry.backoff.ms = 100 // sasl.client.callback.handler.class = null // sasl.jaas.config = null // sasl.kerberos.kinit.cmd = /usr/bin/kinit // sasl.kerberos.min.time.before.relogin = 60000 // sasl.kerberos.service.name = null // sasl.kerberos.ticket.renew.jitter = 0.05 // sasl.kerberos.ticket.renew.window.factor = 0.8 // sasl.login.callback.handler.class = null // sasl.login.class = null // sasl.login.refresh.buffer.seconds = 300 // sasl.login.refresh.min.period.seconds = 60 // sasl.login.refresh.window.factor = 0.8 // sasl.login.refresh.window.jitter = 0.05 // sasl.mechanism = GSSAPI // security.protocol = PLAINTEXT // security.providers = null // send.buffer.bytes = 131072 // ssl.cipher.suites = null // ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] // ssl.endpoint.identification.algorithm = https // ssl.key.password = null // ssl.keymanager.algorithm = SunX509 // ssl.keystore.location = null // ssl.keystore.password = null // ssl.keystore.type = JKS // ssl.protocol = TLS // ssl.provider = null // ssl.secure.random.implementation = null // ssl.trustmanager.algorithm = PKIX // ssl.truststore.location = null // ssl.truststore.password = null // ssl.truststore.type = JKS // transaction.timeout.ms = 60000 // transactional.id = null // value.serializer = class org.apache.kafka.common.serialization.StringSerializer }
5.2 Consumer
package com.xrh.extend.kafka; import java.util.Collections; import java.util.Iterator; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import javafx.util.Duration; public class Consumer { public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "阿里云外网IP:9093"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, "group2"); props.put("security.protocol", "SSL"); props.put("ssl.truststore.location", "D:caclient.truststore.jks"); props.put("ssl.truststore.password", "mima123"); props.put("ssl.keystore.location", "D:caclient.keystore.jks"); props.put("ssl.keystore.password", "mima123"); props.setProperty("ssl.endpoint.identification.algorithm", ""); // p.put("auto.offset.reset", "latest"); // bootstrap.servers: kafka的地址。 // group.id:组名 不同组名可以重复消费。例如你先使用了组名A消费了kafka的1000条数据,但是你还想再次进行消费这1000条数据,并且不想重新去产生,那么这里你只需要更改组名就可以重复消费了。 // enable.auto.commit:是否自动提交,默认为true。 // auto.commit.interval.ms: 从poll(拉)的回话处理时长。 // session.timeout.ms:超时时间。 // max.poll.records:一次最大拉取的条数。 // auto.offset.reset:消费规则,默认earliest 。 // earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 。 // latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 。 // none: topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常。 // key.serializer: 键序列化,默认org.apache.kafka.common.serialization.StringDeserializer。 // value.deserializer:值序列化,默认org.apache.kafka.common.serialization.StringDeserializer。 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Collections.singletonList(Producer.topic));// 订阅消息 while(true){ ConsumerRecords<String, String> consumerDatas = consumer.poll(100); if( consumerDatas.count() > 0 ){ Iterator<ConsumerRecord<String, String>> consumerIter = consumerDatas.iterator(); while(consumerIter.hasNext()){ ConsumerRecord<String, String> consumerData = consumerIter.next(); System.out.printf("offset = %d, key = %s, value = %s%n", consumerData.offset(), consumerData.key(), consumerData.value()); } }else{ System.out.println("KafkaConsumer1 is waiting message..."); try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } }
linux基础内容学习一:linux下的分区及安装
...或者存数据扩展分区最多有一个,可以没有扩展分区linux下的分区方案五个分区1p+1e( 查看详情
shell自动部署linux环境下的组件安装配置(代码片段)
为了业务部署的便利,将手动在linux中执行的组件部署任务,系统配置任务,编写shell实现自动安装配置本文shell参考的就是附件链接中文档的“4Linux课程镜像的创建”章节编码,同时该链接也上传了该脚本阿里云... 查看详情
tvm在linux环境下的安装与编译及vscode如何配置tvm的远程连接调试环境(代码片段)
...FFINavigator结束语前言 本篇文章介绍一下tvm在linux环境下的安装与编译,以及如何使用vscode来配置tvm的远程连接调试环境。 所需软硬件环境:环境版本localsystemwindows10services 查看详情
vmware虚拟机安装及部署
Linux系统安装及网络配置 这篇文章介绍关于Linux系统的安装以及网络配置,关于虚拟机配置中网络的几个模式区别进行详细讲解。学习Linux对于后端开发人员来说是很有必要的,结合实际开发,Linux服务器是小组共享的,... 查看详情
部署kvm虚拟化平台及功能管理
...在Linux内核中主机要求采用CentOS6.5x86_64开启CPU虚拟化支持安装方式全新安装,选择虚拟化选项设置KVM网络采用桥接模式KVM图形化管理创建存储池创建存储卷创建虚拟机安装 查看详情
elasticsearch在centos6.5下的部署
安装jdk 安装jdkde要点主要就是环境变量 导出JAVA_HOME JRE_HOME 导出可执行程序到PATH变量tar-zxvfjdk-8u73-linux-x64.tar.gztar-zxfjdk-8u73-linux-x64.tar.gzmvjdk1.8.0_73/usr/local/jdkecho"JAVA_HOME=/usr/local/jdk 查看详情
[linux]linux下redis的安装及配置.
...词器详细实例. 我们已经将redis所需tar包拷贝到了linux下的root根目录下,接着我们只需要解压就可以了. 先将Redis的tar包拷贝到Linux下的根目录然后解压到redis文件夹下:(先使用mkdir创建redis文件夹)接下来就是解压tar包到redis目... 查看详情
linux/centos下安装部署phantomjs及使用
PhantomJS 是一个基于 WebKit 的服务器端 JavaScript API。它全面支持web而不需浏览器支持,其快速,原生支持各种Web标准:DOM处理,CSS选择器,JSON,Canvas,和SVG。 PhantomJS 可以用于 页面自动化 , 网络监... 查看详情
tvm在linux环境下的安装与编译及vscode如何配置tvm的远程连接调试环境(代码片段)
...FFINavigator结束语前言 本篇文章介绍一下tvm在linux环境下的安装与编译,以及如何使用vscode来配置tvm的远程连接调试环境。 所需软硬件环境:环境版本localsystemwindows10servicesystemubuntu18.04tvmlatest(0.9.dev0)python(conda)python3.... 查看详情
python❀软件安装与环境部署(代码片段)
文章目录1、运行Python代码2、Python下的第一个程序2.1在不同操作系统下的Python编译环境2.1.1在Linux系统中安装Python2.1.2在Windows系统中安装Python2.2在不同操作系统下调用Python程序2.2.1在Linux系统中调用2.2.2在Windows系统中调用当前的编... 查看详情
python❀软件安装与环境部署(代码片段)
文章目录1、运行Python代码2、Python下的第一个程序2.1在不同操作系统下的Python编译环境2.1.1在Linux系统中安装Python2.1.2在Windows系统中安装Python2.2在不同操作系统下调用Python程序2.2.1在Linux系统中调用2.2.2在Windows系统中调用当前的编... 查看详情
linux虚拟机怎么配置安装jdk和tomcat
...,然后执行tar命令解压安装。2、解压完毕后,在etc目录下的profile文件中配置下环境变量,分别建立JAVA_HOME、PATH、CLASSPATH,并正确指定值。3、下载tomcat的二进制安装包,通过ssh客户端放到linux的某个目录下,同样通过tar命令解压... 查看详情
11.elk部署安装
(1) 安装Logstash依赖包JDKLogstash的运行依赖于Java运行环境,Logstash1.5以上版本不低于java7推荐使用最新版本的Java。由于我们只是运行Java程序,而不是开发,下载JRE即可。下载linux-64的版本。如果使用Linux下载执行如下命令下... 查看详情
linux系统上部署一个web项目
...并且部署一个web项目。先从基本安装开始,可别小看linux下的文件安装,那可不是windows下点击next就可以完成,但也并不复杂,重要的是我们学会怎么用快速理解和掌握它,那么一切就变得容易多了,开始吧,当然在安装部署tomca... 查看详情
hudson安装配置部署应用及分析
...境:apache-tomcat-7.0.27 ,JDK二、环境搭建1、下载hudson,安装部署http://hudson-ci.org/点击下载2、下载完成,需找一台linux的机器,上传文件。后台解压运行#java-jarhudson- 查看详情
hive安装部署及测试(代码片段)
...关联,进行创建表,加载数据测试2)在Linux下安装MySQL数据库3)配置Hive元数据存储在MySQL中,查看相关元数据表信息4)熟悉基本的DML和DDL语句(创建数据库、表及加载数 查看详情
promethus普罗米修斯介绍及linux系统下的安装与配置(代码片段)
普罗米修斯概述Prometheus(是由go语言(golang)开发)是一套开源的监控&报警&时间序列数据库的组合。适合监控docker容器。Prometheus是最初在SoundCloud上构建的开源系统监视和警报工具包。自2012年成立以来,许多公司和组织都... 查看详情
新手小白linux(centos6.5)部署javaweb项目(mongodb4.0.2安装及相关操作)
红帽企业或CentOS的Linux上安装MongoDB的社区版:https://docs.mongodb.com/manual/tutorial/install-mongodb-on-red-hat/ 一、安装 1、配置yum源,在yum源目录下创建一个文件mongodb-org-4.0.repovi/etc/yum.repos.d/mongodb-org-4. 查看详情