关键词:
filebeat 直接到logstash, 由于logstash的设计问题, 可能会出现阻塞问题, 因为中间使用消息队列分开
可以使用redis, 或者kafka, 这儿使用的是kafka
1, 安装
kafka的安装, 解压可用, 但需要zookeeper, 内置了一个zookeeper, 直接使用即可
1), 启动内置zookeeper
./bin/zookeeper-server-start.sh ./config/zookeeper.properties &
2), 修改kafka的配置文件
vim ./conf/server.properties
############################# Server Basics ############################# broker.id=0 delete.topic.enable=true ############################# Socket Server Settings ############################# listeners=PLAINTEXT://0.0.0.0:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 ############################# Log Basics ############################# log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 ############################# Log Flush Policy ############################# log.flush.interval.messages=10000 log.flush.interval.ms=1000 ############################# Log Retention Policy ############################# log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 ############################# Zookeeper ############################# zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=6000
3), 启动kafkaserver
/bin/kafka-server-start.sh ./config/server.properties &
4),修改filebeat文件, 最终形态
cat ./elk/filebeat-5.5.2-linux-x86_64/filebeat.yml | grep -v ‘#‘ | grep -v ‘^$‘
filebeat.prospectors: - input_type: log paths: - /var/log/nginx/*.log encoding: utf-8 document_type: my-nginx-log scan_frequency: 5s harvester_buffer_size: 16384 max_bytes: 10485760 tail_files: true output.kafka: enabled: true hosts: ["www.wenbronk.com:9092"] topic: elk-%{[type]} worker: 2 max_retries: 3 bulk_max_size: 2048 timeout: 30s broker_timeout: 10s channel_buffer_size: 256 keep_alive: 60 compression: gzip max_message_bytes: 1000000 required_acks: 0 client_id: beats
5), 重新启动filebeat
./filebeat -c ./filebeat.yml &
6), 修改 logstash的input
input { kafka { #codec => "json" topics_pattern => "elk-.*" bootstrap_servers => "127.0.0.1:9092" auto_offset_reset => "latest" group_id => "logstash-g1" } } output { elasticsearch { #Logstash输出到elasticsearch; hosts => ["localhost:9200"] #elasticsearch为本地; index => "logstash-nginx-%{+YYYY.MM.dd}" #创建索引; document_type => "nginx" #文档类型; workers => 1 #进程数量; user => elastic #elasticsearch的用户; password => changeme #elasticsearch的密码; flush_size => 20000 idle_flush_time => 10 } }
7), 重启logstash
8 ), 页面访问 nginx, 可以查看消息队列中的消息
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic elk-log -m-beginning
参考: http://www.ywnds.com/?p=9776
消息队列kafka「检索组件」重磅上线!
本文对消息队列Kafka「检索组件」进行详细介绍,首先通过对消息队列使用过程中的痛点问题进行介绍,然后针对痛点问题提出相应的解决办法,并对关键技术技术进行解读,旨在帮助大家对消息队列Kafka「检索组件」的特点及... 查看详情
kafka分布式消息队列
...段拆分开,两个阶段互相独立各自实现自己的处理逻辑,通过Kafka提供的消息写入和消费接口实现对消息的连接处理。降低开发复杂度,提高系统稳定性。高吞吐率:kafka通过顺序读写磁盘提供可以和内存随机读写相匹敌的读写... 查看详情
kafka消息队列(代码片段)
...的实现不够深刻,进而无法从测试角度去评估测试风险;通过调研,国内大厂使用kafka较多,对于这一基本技术应该有所了解;目标对kafka的基本概念,基本原理,常见应用有所了解。了解messagebus在kafka中的应用消息系统选型测... 查看详情
kafka消息队列大数据实战教程-第二篇(kafka集群搭建)
文章目录前言一、准备工作二、安装Zookeeper集群2.1.下载2.2解压2.3配置环境变量2.4修改conf文件夹配置2.5创建data、log文件夹2.6启动三、安装Kafka集群3.1.下载3.2解压3.3创建存储kafka数据地址3.4修改配置文件3.5启动3.6测试四、启动脚本4... 查看详情
kafka消息队列大数据实战教程-第二篇(kafka集群搭建)
文章目录前言一、准备工作二、安装Zookeeper集群2.1.下载2.2解压2.3配置环境变量2.4修改conf文件夹配置2.5创建data、log文件夹2.6启动三、安装Kafka集群3.1.下载3.2解压3.3创建存储kafka数据地址3.4修改配置文件3.5启动3.6测试四、启动脚本4... 查看详情
kafka消息队列大数据实战教程-第二篇(kafka集群搭建)
文章目录前言一、准备工作二、安装Zookeeper集群2.1.下载2.2解压2.3配置环境变量2.4修改conf文件夹配置2.5创建data、log文件夹2.6启动三、安装Kafka集群3.1.下载3.2解压3.3创建存储kafka数据地址3.4修改配置文件3.5启动3.6测试四、启动脚本4... 查看详情
kafka分布式消息队列介绍以及集群安装
...的通信采用tcp协议 4、kafka中不同业务系统的消息可以通过topic(主题)进行区 查看详情
kafka消息重新发送
Kafka消息重新发送 1、 使用kafka消息队列做消息的发布、订阅,如果consumer端消费出问题,导致数据并没有消费,此时不需要担心,数据并不会立刻丢失,kafka会把数据在服务器的磁盘上默认存储7天,或者自己指定有两种... 查看详情
kafka还是rabbitmq?
...念,消息从一处流向另一处,吞吐量比rabbit更高。接下来通过俩张图来理解他俩的设计与区别。首先来看rabbit,他通过broker来进行统一调配消息去向,生产者通过指定的规则将消息发送到broker,broker再按照规则发送给消费者进行... 查看详情
kafka-概述
...用场景日志收集:可以用Kafka收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。消息系统:解耦和生产者和消费者、缓存消息等。用户活动跟踪:Kafka经常被用来记... 查看详情
分布式实时消息队列kafka(代码片段)
...机制实现数据缓存消息队列的优点是什么?实现解耦通过异步,提高性能消息队列的缺点是什么?架构更加复杂:如果消息队列出现故障,整个系统都会故 查看详情
canalserver发送binlog消息到kafka消息队列中
...章中,我们使用上篇文章的基础,将消息发送到kafka消息队列中。影响性能的几个参数:参考链接:https://github.com/alibaba/canal/wiki/Canal-MQ-Performance参考文档:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStarthttps://gitee.com/huan1993/... 查看详情
jafka源码分析——logmanager
...负责管理broker上全部的Log(每个topic-partition为一个Log)。通过阅读源码可知其详细完毕的功能例如以下:1. 依照预设规则对消息队列进行清理。2. 依照预设规则对消息队列进行持久化(flush操作)。3. 连接ZooKeeper进行b... 查看详情
技术分享|消息队列kafka群集部署(代码片段)
...应用场景是日志收集:可以用kafka收集各种服务的日志,通过已统一接口的形式开放给各种消费 查看详情
kafka简单介绍和安装(代码片段)
...传统定义:Kafka是一个分布式的基于发布/订阅模式的消息队列(MessageQueue),主要应用于大数据实时处理领域。**Kafka最新定义**:Kafka是一个开源的分布式事件流平台(EventStreamingPlatform),被数千家公司用于高性能数据管道、流... 查看详情
kafka总结
...日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式 查看详情
kafka消息系统
... 消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完... 查看详情
kafka配置文件参数详解
...下不需要去做修改background.threads=4##等待IO线程处理的请求队列最大数,若是等待IO的请求 查看详情