filebeat-2-通过kafka队列链接logstash

bronk bronk     2022-09-16     783

关键词:

filebeat 直接到logstash, 由于logstash的设计问题, 可能会出现阻塞问题, 因为中间使用消息队列分开

可以使用redis, 或者kafka, 这儿使用的是kafka

1, 安装

kafka的安装, 解压可用, 但需要zookeeper, 内置了一个zookeeper, 直接使用即可

1), 启动内置zookeeper

./bin/zookeeper-server-start.sh ./config/zookeeper.properties &

2), 修改kafka的配置文件

vim ./conf/server.properties

############################# Server Basics #############################
broker.id=0
delete.topic.enable=true
 
############################# Socket Server Settings #############################
listeners=PLAINTEXT://0.0.0.0:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
 
############################# Log Basics #############################
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
 
############################# Log Flush Policy #############################
log.flush.interval.messages=10000
log.flush.interval.ms=1000
 
############################# Log Retention Policy #############################
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
 
############################# Zookeeper #############################
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000

3), 启动kafkaserver

/bin/kafka-server-start.sh ./config/server.properties &

4),修改filebeat文件, 最终形态

cat ./elk/filebeat-5.5.2-linux-x86_64/filebeat.yml | grep -v # | grep -v ^$

 

filebeat.prospectors:
- input_type: log
  paths:
    - /var/log/nginx/*.log
  encoding: utf-8
  document_type: my-nginx-log
  scan_frequency: 5s
  harvester_buffer_size: 16384
  max_bytes: 10485760
  tail_files: true
output.kafka:
  enabled: true
  hosts: ["www.wenbronk.com:9092"]
  topic: elk-%{[type]}
  worker: 2
  max_retries: 3
  bulk_max_size: 2048
  timeout: 30s
  broker_timeout: 10s
  channel_buffer_size: 256
  keep_alive: 60
  compression: gzip
  max_message_bytes: 1000000
  required_acks: 0
  client_id: beats

5), 重新启动filebeat

./filebeat -c ./filebeat.yml &

6), 修改 logstash的input

input {
    kafka  {
      #codec => "json"
      topics_pattern => "elk-.*"
      bootstrap_servers => "127.0.0.1:9092"
      auto_offset_reset => "latest"
      group_id => "logstash-g1"
    }
}
output {
    elasticsearch {                                  #Logstash输出到elasticsearch;
      hosts => ["localhost:9200"]                    #elasticsearch为本地;
      index => "logstash-nginx-%{+YYYY.MM.dd}"       #创建索引;
      document_type => "nginx"                       #文档类型;
      workers => 1                                   #进程数量;
      user => elastic                                #elasticsearch的用户;
      password => changeme                           #elasticsearch的密码;
      flush_size => 20000
      idle_flush_time => 10
 }
}

7), 重启logstash

8 ), 页面访问 nginx, 可以查看消息队列中的消息

./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic elk-log -m-beginning

 

 

 参考: http://www.ywnds.com/?p=9776

消息队列kafka「检索组件」重磅上线!

本文对消息队列Kafka「检索组件」进行详细介绍,首先通过对消息队列使用过程中的痛点问题进行介绍,然后针对痛点问题提出相应的解决办法,并对关键技术技术进行解读,旨在帮助大家对消息队列Kafka「检索组件」的特点及... 查看详情

kafka分布式消息队列

...段拆分开,两个阶段互相独立各自实现自己的处理逻辑,通过Kafka提供的消息写入和消费接口实现对消息的连接处理。降低开发复杂度,提高系统稳定性。高吞吐率:kafka通过顺序读写磁盘提供可以和内存随机读写相匹敌的读写... 查看详情

kafka消息队列(代码片段)

...的实现不够深刻,进而无法从测试角度去评估测试风险;通过调研,国内大厂使用kafka较多,对于这一基本技术应该有所了解;目标对kafka的基本概念,基本原理,常见应用有所了解。了解messagebus在kafka中的应用消息系统选型测... 查看详情

kafka消息队列大数据实战教程-第二篇(kafka集群搭建)

文章目录前言一、准备工作二、安装Zookeeper集群2.1.下载2.2解压2.3配置环境变量2.4修改conf文件夹配置2.5创建data、log文件夹2.6启动三、安装Kafka集群3.1.下载3.2解压3.3创建存储kafka数据地址3.4修改配置文件3.5启动3.6测试四、启动脚本4... 查看详情

kafka消息队列大数据实战教程-第二篇(kafka集群搭建)

文章目录前言一、准备工作二、安装Zookeeper集群2.1.下载2.2解压2.3配置环境变量2.4修改conf文件夹配置2.5创建data、log文件夹2.6启动三、安装Kafka集群3.1.下载3.2解压3.3创建存储kafka数据地址3.4修改配置文件3.5启动3.6测试四、启动脚本4... 查看详情

kafka消息队列大数据实战教程-第二篇(kafka集群搭建)

文章目录前言一、准备工作二、安装Zookeeper集群2.1.下载2.2解压2.3配置环境变量2.4修改conf文件夹配置2.5创建data、log文件夹2.6启动三、安装Kafka集群3.1.下载3.2解压3.3创建存储kafka数据地址3.4修改配置文件3.5启动3.6测试四、启动脚本4... 查看详情

kafka分布式消息队列介绍以及集群安装

...的通信采用tcp协议  4、kafka中不同业务系统的消息可以通过topic(主题)进行区 查看详情

kafka消息重新发送

Kafka消息重新发送 1、 使用kafka消息队列做消息的发布、订阅,如果consumer端消费出问题,导致数据并没有消费,此时不需要担心,数据并不会立刻丢失,kafka会把数据在服务器的磁盘上默认存储7天,或者自己指定有两种... 查看详情

kafka还是rabbitmq?

...念,消息从一处流向另一处,吞吐量比rabbit更高。接下来通过俩张图来理解他俩的设计与区别。首先来看rabbit,他通过broker来进行统一调配消息去向,生产者通过指定的规则将消息发送到broker,broker再按照规则发送给消费者进行... 查看详情

kafka-概述

...用场景日志收集:可以用Kafka收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。消息系统:解耦和生产者和消费者、缓存消息等。用户活动跟踪:Kafka经常被用来记... 查看详情

分布式实时消息队列kafka(代码片段)

...机制实现数据缓存消息队列的优点是什么?实现解耦通过异步,提高性能消息队列的缺点是什么?架构更加复杂:如果消息队列出现故障,整个系统都会故 查看详情

canalserver发送binlog消息到kafka消息队列中

...章中,我们使用上篇文章的基础,将消息发送到kafka消息队列中。影响性能的几个参数:参考链接:https://github.com/alibaba/canal/wiki/Canal-MQ-Performance参考文档:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStarthttps://gitee.com/huan1993/... 查看详情

jafka源码分析——logmanager

...负责管理broker上全部的Log(每个topic-partition为一个Log)。通过阅读源码可知其详细完毕的功能例如以下:1. 依照预设规则对消息队列进行清理。2. 依照预设规则对消息队列进行持久化(flush操作)。3. 连接ZooKeeper进行b... 查看详情

技术分享|消息队列kafka群集部署(代码片段)

...应用场景是日志收集:可以用kafka收集各种服务的日志,通过已统一接口的形式开放给各种消费 查看详情

kafka简单介绍和安装(代码片段)

...传统定义:Kafka是一个分布式的基于发布/订阅模式的消息队列(MessageQueue),主要应用于大数据实时处理领域。**Kafka最新定义**:Kafka是一个开源的分布式事件流平台(EventStreamingPlatform),被数千家公司用于高性能数据管道、流... 查看详情

kafka总结

...日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式 查看详情

kafka消息系统

... 消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完... 查看详情

kafka配置文件参数详解

...下不需要去做修改background.threads=4##等待IO线程处理的请求队列最大数,若是等待IO的请求 查看详情