企业日志大数据分析系统elk+kafka实现

hch的随笔成功的秘诀在于恒心—迪斯雷利      2022-02-14     244

关键词:

背景:

最近线上上了ELK,但是只用了一台Redis在中间作为消息队列,以减轻前端es集群的压力,Redis的集群解决方案暂时没有接触过,并且Redis作为消息队列并不是它的强项;所以最近将Redis换成了专业的消息信息发布订阅系统Kafka, Kafka的更多介绍大家可以看这里:http://blog.csdn.net/lizhitao/article/details/39499283  ,关于ELK的知识网上有很多的哦, 此篇博客主要是总结一下目前线上这个平台的实施步骤,ELK是怎么跟Kafka结合起来的。好吧,动手!

ELK架构拓扑:

然而我这里的整个日志收集平台就是这样的拓扑:

技术分享

1,使用一台Nginx代理访问kibana的请求;

2,两台es组成es集群,并且在两台es上面都安装kibana;(以下对elasticsearch简称es)

3,中间三台服务器就是我的kafka(zookeeper)集群啦; 上面写的消费者/生产者这是kafka(zookeeper)中的概念;

4,最后面的就是一大堆的生产服务器啦,上面使用的是logstash,当然除了logstash也可以使用其他的工具来收集你的应用程序的日志,例如:Flume,Scribe,Rsyslog,Scripts……

角色:

技术分享

软件选用:

elasticsearch-1.7.3.tar.gz #这里需要说明一下,前几天使用了最新的elasticsearch2.0,java-1.8.0报错,目前未找到原因,故这里使用1.7.3版本

Logstash-2.0.0.tar.gz

kibana-4.1.2-linux-x64.tar.gz

以上软件都可以从官网下载:https://www.elastic.co/downloads

java-1.8.0,nginx采用yum安装

部署步骤:

1.ES集群安装配置;

2.Logstash客户端配置(直接写入数据到ES集群,写入系统messages日志);

3.Kafka(zookeeper)集群配置;(Logstash写入数据到Kafka消息系统);

4.Kibana部署;

5.Nginx负载均衡Kibana请求;

6.案例:nginx日志收集以及MySQL慢日志收集;

7.Kibana报表基本使用;

ES集群安装配置;

es1.example.com:

1.安装java-1.8.0以及依赖包

yum install -y epel-release

yum install -y java-1.8.0 git wget lrzsz

2.获取es软件包

wget https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.3.tar.gz

tar -xf elasticsearch-1.7.3.tar.gz -C /usr/local

ln -sv /usr/local/elasticsearch-1.7.3 /usr/local/elasticsearch

3.修改配置文件

[[email protected] ~]# vim /usr/local/elasticsearch/config/elasticsearch.yml

32 cluster.name: es-cluster #组播的名称地址

40 node.name: "es-node1 " #节点名称,不能和其他节点重复

47 node.master: true #节点能否被选举为master

51 node.data: true #节点是否存储数据

107 index.number_of_shards: 5 #索引分片的个数

111 index.number_of_replicas: 1 #分片的副本个数

145 path.conf: /usr/local/elasticsearch/config/ #配置文件的路径

149 path.data: /data/es/data #数据目录路径

159 path.work: /data/es/worker #工作目录路径

163 path.logs: /usr/local/elasticsearch/logs/ #日志文件路径

167 path.plugins: /data/es/plugins #插件路径

184 bootstrap.mlockall: true #内存不向swap交换

232 http.enabled: true #启用http

4.创建相关目录

mkdir /data/es/{data,worker,plugins} -p

5.获取es服务管理脚本

[[email protected] ~]# git clone https://github.com/elastic/elasticsearch-servicewrapper.git

[[email protected] ~]# mv elasticsearch-servicewrapper/service /usr/local/elasticsearch/bin/

[[email protected] ~]# /usr/local/elasticsearch/bin/service/elasticsearch install

Detected RHEL or Fedora:

Installing the Elasticsearch daemon..

[[email protected] ~]#

#这时就会在/etc/init.d/目录下安装上es的管理脚本啦

#修改其配置:

[[email protected] ~]#

set.default.ES_HOME=/usr/local/elasticsearch #安装路径

set.default.ES_HEAP_SIZE=1024 #jvm内存大小,根据实际环境调整即可

6.启动es ,并检查其服务是否正常

[[email protected] ~]# netstat -nlpt | grep -E "9200|"9300

tcp 0 0 0.0.0.0:9200 0.0.0.0:* LISTEN 1684/java

tcp 0 0 0.0.0.0:9300 0.0.0.0:* LISTEN 1684/java

访问http://192.168.2.18:9200/ 如果出现以下提示信息说明安装配置完成啦,

技术分享

7.es1节点好啦,我们直接把目录复制到es2

[[email protected] local]# scp -r elasticsearch-1.7.3 192.168.12.19:/usr/local/

[[email protected] local]# ln -sv elasticsearch-1.7.3 elasticsearch

[[email protected] local]# elasticsearch/bin/service/elasticsearch install

#es2只需要修改node.name即可,其他都与es1相同配置

8.安装es的管理插件

es官方提供一个用于管理es的插件,可清晰直观看到es集群的状态,以及对集群的操作管理,安装方法如下:

[[email protected] local]# /usr/local/elasticsearch/bin/plugin -i mobz/elasticsearch-head

安装好之后,访问方式为: http://192.168.2.18:9200/_plugin/head,由于集群中现在暂时没有数据,所以显示为空,

技术分享

此时,es集群的部署完成。

Logstash客户端安装配置;

在webserve1上面安装Logstassh

1.downloads  软件包 ,这里注意,Logstash是需要依赖java环境的,所以这里还是需要yum install -y java-1.8.0.

[[email protected] ~]# wget https://download.elastic.co/logstash/logstash/logstash-2.0.0.tar.gz

[[email protected] ~]# tar -xf logstash-2.0.0.tar.gz -C /usr/local

[[email protected] ~]# cd /usr/local/

[[email protected] local]# ln -sv logstash-2.0.0 logstash

[[email protected] local]# mkdir logs etc

2.提供logstash管理脚本,其中里面的配置路径可根据实际情况修改

#!/bin/bash

#chkconfig: 2345 55 24

#description: logstash service manager

#auto: Maoqiu Guo

FILE=‘/usr/local/logstash/etc/*.conf‘ #logstash配置文件

LOGBIN=‘/usr/local/logstash/bin/logstash agent --verbose --config‘ #指定logstash配置文件的命令

LOCK=‘/usr/local/logstash/locks‘ #用锁文件配合服务启动与关闭

LOGLOG=‘--log /usr/local/logstash/logs/stdou.log‘ #日志

START() {

    if [ -f $LOCK ];then

        echo -e "Logstash is already 33[32mrunning33[0m, do nothing."

    else

        echo -e "Start logstash service.33[32mdone33[m"

        nohup ${LOGBIN} ${FILE} ${LOGLOG} &

        touch $LOCK

    fi

}

STOP() {

    if [ ! -f $LOCK ];then

        echo -e "Logstash is already stop, do nothing."

    else

        echo -e "Stop logstash serivce 33[32mdone33[m"

        rm -rf $LOCK

        ps -ef | grep logstash | grep -v "grep" | awk ‘{print $2}‘ | xargs kill -s 9 >/dev/null

    fi

}

STATUS() {

    ps aux | grep logstash | grep -v "grep" >/dev/null

    if [ -f $LOCK ] && [ $? -eq 0 ]; then

        echo -e "Logstash is: 33[32mrunning33[0m..."

    else

        echo -e "Logstash is: 33[31mstopped33[0m..."

    fi

}

TEST(){

    ${LOGBIN} ${FILE} --configtest

}

case "$1" in

start)

    START

    ;;

stop)

    STOP

    ;;

status)

    STATUS

    ;;

restart)

    STOP

sleep 2

START

    ;;

test)

    TEST

    ;;

*)

    echo "Usage: /etc/init.d/logstash (test|start|stop|status|restart)"

    ;;

esac

3.Logstash 向es集群写数据

(1)编写一个logstash配置文件

[[email protected] etc]# cat logstash.conf

input { #数据的输入从标准输入

stdin {}

}

output { #数据的输出我们指向了es集群

elasticsearch {

hosts => ["192.168.2.18:9200","192.168.2.19:9200"]   #es主机的ip及端口

}

}

[[email protected] etc]#

(2)检查配置文件是否有语法错

[[email protected] etc]# /usr/local/logstash/bin/logstash -f logstash.conf --configtest --verbose

Configuration OK

[[email protected] etc]#

(3)既然配置ok我们手动启动它,然后写点东西看能否写到es

技术分享

ok.上图已经看到logstash已经可以正常的工作啦.

4.下面演示一下如何收集系统日志

将之前的配置文件修改如下所示内容,然后启动logstash服务就可以在web页面中看到messages的日志写入es,并且创建了一条索引

[[email protected] etc]# cat logstash.conf

input {       #这里的输入使用的文件,即日志文件messsages

file {   

path => "/var/log/messages"   #这是日志文件的绝对路径

start_position => "beginning" #这个表示从messages的第一行读取,即文件开始处

}

}

output {    #输出到es

elasticsearch {

hosts => ["192.168.2.18:9200","192.168.2.19:9200"]

index => "system-messages-%{+YYYY-MM}"  #这里将按照这个索引格式来创建索引

}

}

[[email protected] etc]#

启动logstash后,我们来看head这个插件的web页面

技术分享

ok,系统日志我们已经成功的收集,并且已经写入到es集群中,那上面的演示是logstash直接将日志写入到es集群中的,这种场合我觉得如果量不是很大的话直接像上面已将将输出output定义到es集群即可,如果量大的话需要加上消息队列来缓解es集群的压力。前面已经提到了我这边之前使用的是单台redis作为消息队列,但是redis不能作为list类型的集群,也就是redis单点的问题没法解决,所以这里我选用了kafka ;下面就在三台server上面安装kafka集群

Kafka集群安装配置;

在搭建kafka集群时,需要提前安装zookeeper集群,当然kafka已经自带zookeeper程序只需要解压并且安装配置就行了

kafka1上面的配置:

1.获取软件包.官网:http://kafka.apache.org

[[email protected] ~]# wget http://mirror.rise.ph/apache/kafka/0.8.2.1/kafka_2.11-0.8.2.1.tgz

[[email protected] ~]# tar -xf kafka_2.11-0.8.2.1.tgz -C /usr/local/

[[email protected] ~]# cd /usr/local/

[[email protected] local]# ln -sv kafka_2.11-0.8.2.1 kafka

2.配置zookeeper集群,修改配置文件

[[email protected] ~]# vim /usr/local/kafka/config/zookeeper.propertie

dataDir=/data/zookeeper

clienrtPort=2181

tickTime=2000

initLimit=20

syncLimit=10

server.2=192.168.2.22:2888:3888

server.3=192.168.2.23:2888:3888

server.4=192.168.2.24:2888:3888

#说明:

tickTime: 这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。

2888端口:表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;

3888端口:表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。

3.创建zookeeper所需要的目录

[[email protected] ~]# mkdir /data/zookeeper

4.在/data/zookeeper目录下创建myid文件,里面的内容为数字,用于标识主机,如果这个文件没有的话,zookeeper是没法启动的哦

[[email protected] ~]# echo 2 > /data/zookeeper/myid

以上就是zookeeper集群的配置,下面等我配置好kafka之后直接复制到其他两个节点即可

5.kafka配置

[[email protected] ~]# vim /usr/local/kafka/config/server.properties

broker.id=2      # 唯一,填数字,本文中分别为2/3/4

prot=9092        # 这个broker监听的端口 

host.name=192.168.2.22  # 唯一,填服务器IP

log.dir=/data/kafka-logs # 该目录可以不用提前创建,在启动时自己会创建

zookeeper.connect=192.168.2.22:2181,192.168.2.23:2181,192.168.2.24:2181  #这个就是zookeeper的ip及端口

num.partitions=16 # 需要配置较大 分片影响读写速度

log.dirs=/data/kafka-logs # 数据目录也要单独配置磁盘较大的地方

log.retention.hours=168 # 时间按需求保留过期时间 避免磁盘满

6.将kafka(zookeeper)的程序目录全部拷贝至其他两个节点

[[email protected] ~]# scp -r /usr/local/kafka 192.168.2.23:/usr/local/

[[email protected] ~]# scp -r /usr/local/kafka 192.168.2.24:/usr/local/

7.修改两个借点的配置,注意这里除了以下两点不同外,都是相同的配置

(1)zookeeper的配置

mkdir /data/zookeeper

echo "x" > /data/zookeeper/myid

(2)kafka的配置

broker.id=2

host.name=192.168.2.22

8.修改完毕配置之后我们就可以启动了,这里先要启动zookeeper集群,才能启动kafka

我们按照顺序来,kafka1 –> kafka2 –>kafka3

[[email protected] ~]# /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties & #zookeeper启动命令

[[email protected] ~]# /usr/local/kafka/bin/zookeeper-server-stop.sh #zookeeper停止的命令

注意,如果zookeeper有问题 nohup的日志文件会非常大,把磁盘占满,这个zookeeper服务可以通过自己些服务脚本来管理服务的启动与关闭。

后面两台执行相同操作,在启动过程当中会出现以下报错信息

[2015-11-13 19:18:04,225] WARN Cannot open channel to 3 at election address /192.168.2.23:3888 (org.apache.zookeeper.server.quorum.QuorumCnxManager)

java.net.ConnectException: Connection refused

    at java.net.PlainSocketImpl.socketConnect(Native Method)

    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)

    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)

    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)

    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)

    at java.net.Socket.connect(Socket.java:589)

    at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectOne(QuorumCnxManager.java:368)

    at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectAll(QuorumCnxManager.java:402)

    at org.apache.zookeeper.server.quorum.FastLeaderElection.lookForLeader(FastLeaderElection.java:840)

    at org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:762)

[2015-11-13 19:18:04,232] WARN Cannot open channel to 4 at election address /192.168.2.24:3888 (org.apache.zookeeper.server.quorum.QuorumCnxManager)

java.net.ConnectException: Connection refused

    at java.net.PlainSocketImpl.socketConnect(Native Method)

    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)

    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)

    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)

    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)

    at java.net.Socket.connect(Socket.java:589)

    at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectOne(QuorumCnxManager.java:368)

    at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectAll(QuorumCnxManager.java:402)

    at org.apache.zookeeper.server.quorum.FastLeaderElection.lookForLeader(FastLeaderElection.java:840)

    at org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:762)

[2015-11-13 19:18:04,233] INFO Notification time out: 6400 (org.apache.zookeeper.server.quorum.FastLeaderElection)

由于zookeeper集群在启动的时候,每个结点都试图去连接集群中的其它结点,先启动的肯定连不上后面还没启动的,所以上面日志前面部分的异常是可以忽略的。通过后面部分可以看到,集群在选出一个Leader后,最后稳定了。

其他节点也可能会出现类似的情况,属于正常。

9.zookeeper服务检查

[[email protected]~]# netstat -nlpt | grep -E "2181|2888|3888"

tcp 0 0 192.168.2.24:3888 0.0.0.0:* LISTEN 1959/java

tcp 0 0 0.0.0.0:2181 0.0.0.0:* LISTEN 1959/java

[[email protected] ~]# netstat -nlpt | grep -E "2181|2888|3888"

tcp 0 0 192.168.2.23:3888 0.0.0.0:* LISTEN 1723/java

tcp 0 0 0.0.0.0:2181 0.0.0.0:* LISTEN 1723/java

[[email protected] ~]# netstat -nlpt | grep -E "2181|2888|3888"

tcp 0 0 192.168.2.24:3888 0.0.0.0:* LISTEN 950/java

tcp 0 0 0.0.0.0:2181 0.0.0.0:* LISTEN 950/java

tcp 0 0 192.168.2.24:2888 0.0.0.0:* LISTEN 950/java

#可以看出,如果哪台是Leader,那么它就拥有2888这个端口

ok.  这时候zookeeper集群已经启动起来了,下面启动kafka,也是依次按照顺序启动

[[email protected] ~]# nohup /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties & #kafka启动的命令

[[email protected] ~]# /usr/local/kafka/bin/kafka-server-stop.sh #kafka停止的命令

注意,跟zookeeper服务一样,如果kafka有问题 nohup的日志文件会非常大,把磁盘占满,这个kafka服务同样可以通过自己些服务脚本来管理服务的启动与关闭。

此时三台上面的zookeeper及kafka都已经启动完毕,来检测以下吧

(1)建立一个主题

[[email protected] ~]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic summer

#注意:factor大小不能超过broker数

(2)查看有哪些主题已经创建

[[email protected] ~]# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.2.22:2181 #列出集群中所有的topic

summer #已经创建成功

(3)查看summer这个主题的详情

[[email protected] ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.2.22:2181 --topic summer

Topic:summer    PartitionCount:1    ReplicationFactor:3    Configs:

    Topic: summer    Partition: 0    Leader: 2    Replicas: 2,4,3    Isr: 2,4,3

#主题名称:summer

#Partition:只有一个,从0开始

#leader :id为2的broker

#Replicas 副本存在于broker id为2,3,4的上面

#Isr:活跃状态的broker

(4)发送消息,这里使用的是生产者角色

[[email protected] ~]# /bin/bash /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.2.22:9092 --topic summer

This is a messages

welcome to kafka

(5)接收消息,这里使用的是消费者角色

[[email protected] ~]# /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper 192.168.2.24:2181 --topic summer --from-beginning

This is a messages

welcome to kafka

如果能够像上面一样能够接收到生产者发过来的消息,那说明基于kafka的zookeeper集群就成功啦。

10,下面我们将webserver1上面的logstash的输出改到kafka上面,将数据写入到kafka中

(1)修改webserver1上面的logstash配置,如下所示:各个参数可以到官网查询.

[email protected] etc]# cat logstash.conf

input { #这里的输入还是定义的是从日志文件输入

file {

type => "system-message"

path => "/var/log/messages"

start_position => "beginning"

}

}

output {

#stdout { codec => rubydebug } #这是标准输出到终端,可以用于调试看有没有输出,注意输出的方向可以有多个

kafka { #输出到kafka

bootstrap_servers => "192.168.2.22:9092,192.168.2.23:9092,192.168.2.24:9092" #他们就是生产者

topic_id => "system-messages" #这个将作为主题的名称,将会自动创建

compression_type => "snappy" #压缩类型

}

}

[[email protected] etc]#

(2)配置检测

[[email protected] etc]# /usr/local/logstash/bin/logstash -f logstash.conf --configtest --verbose

Configuration OK

[[email protected] etc]#

(2)启动Logstash,这里我直接在命令行执行即可

[[email protected] etc]# /usr/local/logstash/bin/logstash -f logstash.conf

(3)验证数据是否写入到kafka,这里我们检查是否生成了一个叫system-messages的主题

[[email protected] ~]# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.2.22:2181

summer

system-messages #可以看到这个主题已经生成了

#再看看这个主题的详情:

[[email protected] ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.2.22:2181 --topic system-messages

Topic:system-messages    PartitionCount:16    ReplicationFactor:1    Configs:

    Topic: system-messages    Partition: 0    Leader: 2    Replicas: 2    Isr: 2

    Topic: system-messages    Partition: 1    Leader: 3    Replicas: 3    Isr: 3

    Topic: system-messages    Partition: 2    Leader: 4    Replicas: 4    Isr: 4

    Topic: system-messages    Partition: 3    Leader: 2    Replicas: 2    Isr: 2

    Topic: system-messages    Partition: 4    Leader: 3    Replicas: 3    Isr: 3

    Topic: system-messages    Partition: 5    Leader: 4    Replicas: 4    Isr: 4

    Topic: system-messages    Partition: 6    Leader: 2    Replicas: 2    Isr: 2

    Topic: system-messages    Partition: 7    Leader: 3    Replicas: 3    Isr: 3

    Topic: system-messages    Partition: 8    Leader: 4    Replicas: 4    Isr: 4

    Topic: system-messages    Partition: 9    Leader: 2    Replicas: 2    Isr: 2

    Topic: system-messages    Partition: 10    Leader: 3    Replicas: 3    Isr: 3

    Topic: system-messages    Partition: 11    Leader: 4    Replicas: 4    Isr: 4

    Topic: system-messages    Partition: 12    Leader: 2    Replicas: 2    Isr: 2

    Topic: system-messages    Partition: 13    Leader: 3    Replicas: 3    Isr: 3

    Topic: system-messages    Partition: 14    Leader: 4    Replicas: 4    Isr: 4

    Topic: system-messages    Partition: 15    Leader: 2    Replicas: 2    Isr: 2

[[email protected] ~]#

可以看出,这个主题生成了16个分区,每个分区都有对应自己的Leader,但是我想要有10个分区,3个副本如何办?还是跟我们上面一样命令行来创建主题就行,当然对于logstash输出的我们也可以提前先定义主题,然后启动logstash 直接往定义好的主题写数据就行啦,命令如下:

[[email protected] ~]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.2.22:2181 --replication-factor 3 --partitions 10 --topic TOPIC_NAME

好了,我们将logstash收集到的数据写入到了kafka中了,在实验过程中我使用while脚本测试了如果不断的往kafka写数据的同时停掉两个节点,数据写入没有任何问题。

那如何将数据从kafka中读取然后给我们的es集群呢?那下面我们在kafka集群上安装Logstash,安装步骤不再赘述;三台上面的logstash 的配置如下,作用是将kafka集群的数据读取然后转交给es集群,这里为了测试我让他新建一个索引文件,注意这里的输入日志还是messages,主题名称还是“system-messages”

[[email protected] etc]# more logstash.conf

input {

kafka {

zk_connect => "192.168.2.22:2181,192.168.2.23:2181,192.168.2.24:2181" #消费者们

topic_id => "system-messages"

codec => plain

reset_beginning => false

consumer_threads => 5

decorate_events => true

}

}

output {

elasticsearch {

hosts => ["192.168.2.18:9200","192.168.2.19:9200"]

index => "test-system-messages-%{+YYYY-MM}" #为了区分之前实验,我这里新生成的所以名字为“test-system-messages-%{+YYYY-MM}”

}

}

在三台kafka上面启动Logstash,注意我这里是在命令行启动的;

[[email protected] etc]# pwd

/usr/local/logstash/etc

[[email protected] etc]# /usr/local/logstash/bin/logstash -f logstash.conf

[[email protected] etc]# pwd

/usr/local/logstash/etc

[[email protected] etc]# /usr/local/logstash/bin/logstash -f logstash.conf

[[email protected] etc]# pwd

/usr/local/logstash/etc

[[email protected] etc]# /usr/local/logstash/bin/logstash -f logstash.conf

在webserver1上写入测试内容,即webserver1上面利用message这个文件来测试,我先将其清空,然后启动

[[email protected] etc]# >/var/log/messages

[[email protected] etc]# echo "我将通过kafka集群达到es集群哦^0^" >> /var/log/messages

#启动logstash,让其读取messages中的内容

下图为我在客户端写入到kafka集群的同时也将其输入到终端,这里写入了三条内容

技术分享

而下面三张图侧可以看出,三台Logstash 很平均的从kafka集群当中读取出来了日志内容

技术分享

技术分享

技术分享

再来看看我们的es管理界面

技术分享

ok ,看到了吧,

流程差不多就是下面 酱紫咯

技术分享

由于篇幅较长,我将

4.Kibana部署;

5.Nginx负载均衡Kibana请求;

6.案例:nginx日志收集以及MySQL慢日志收集;

7.Kibana报表基本使用;

 

转自

企业日志大数据分析系统ELK+KAFKA实现
http://mp.weixin.qq.com/s?__biz=MzIyMDA1MzgyNw==&mid=2651969453&idx=1&sn=1c4d7d7b55deea7300f3b3701fbcdc89&chksm=8c349f81bb431697da71338a862c9528b5cd1c176129424a8b70548ae9c44017289650ac4d61&scene=0#rd

elk日志系统部署实现(代码片段)

文章目录1.前言2.ELK服务安装部署2.1Log4j配置2.2Filebeat配置2.2.1Filebeat数据流转2.2.2Filebeat部署运行2.3Kafka配置2.3.1Kafka与Zookeeper2.3.2Zookeeper的作用2.3.3Kafka数据流转2.3.4Kafka安装部署2.3.5Kafka优化2.4Logstash配置与数据流转2.4.1Logstash数据流转... 查看详情

elk+filebeat+kafka+zookeeper构建大数据日志分析平台三

安装并配置FilebeatFilebeat与filebeat比较Logstash缺点:依赖java、在数据量大的时候,Logstash进程会消耗过多的系统资源,严重影响业务系统的性能filebeat优点:基于Go语言,没有任何依赖配置文件简单,格式明了filebeat比logstash更加轻... 查看详情

elk日志系统部署实现(代码片段)

...据流转2.6.2Kibana部署运行1.前言ELK全家桶官网:elastic.co/cnELK日志系统的常见解决方案:    通常的产品或项目部署至服务器,运行服务后会生成日志文件。    通过Filebeat监控相关文件夹,当有新日志产生,就读... 查看详情

elk+kafka企业日志收集平台

背景:最近线上上了ELK,但是只用了一台Redis在中间作为消息队列,以减轻前端es集群的压力,Redis的集群解决方案暂时没有接触过,并且Redis作为消息队列并不是它的强项;所以最近将Redis换成了专业的消息信息发布订阅系统Kafka... 查看详情

「springcloud」(三十八)搭建elk日志采集与分析系统

参考技术A 一套好的日志分析系统可以详细记录系统的运行情况,方便我们定位分析系统性能瓶颈、查找定位系统问题。上一篇说明了日志的多种业务场景以及日志记录的实现方式,那么日志记录下来,相关人员就需要对日... 查看详情

部署elk+kafka+filebeat日志收集分析系统(代码片段)

ELK+Kafka+Filebeat日志系统文章目录ELK+Kafka+Filebeat日志系统1.环境规划2.部署elasticsearch集群2.1.配置es-1节点2.2.配置es-2节点2.3.配置es-3节点2.4.使用es-head插件查看集群状态3.部署kibana4.部署zookeeper4.1.配置zookeeper-1节点4.2.配置zo... 查看详情

elk企业级日志分析系统(代码片段)

ELK企业级日志分析系统一、ELK日志分析系统简介二、使用ELK的原因三、完整日志系统基本特征四、ELK的工作原理五、ELK日志分析系统集群部署5.1ELKElasticsearch集群部署(在Node1、Node2节点上操作)5.1ELKElasticsearch集群部署具... 查看详情

elk企业级日志分析系统(代码片段)

ELK企业级日志分析系统ELK概述ELK简介ELK的作用完整日志系统基本特征ELK的工作原理ELK集群部署ELKElasticsearch集群部署(在Node1、Node2节点上操作)ELKLogstash部署(在Apache节点上操作)ELKKiabana部署(在Node1节点上操... 查看详情

elk企业级日志分析系统(代码片段)

ELK企业级日志分析系统ELK概述ELK简介ELK的作用完整日志系统基本特征ELK的工作原理ELK集群部署ELKElasticsearch集群部署(在Node1、Node2节点上操作)ELKLogstash部署(在Apache节点上操作)ELKKiabana部署(在Node1节点上操... 查看详情

离线部署elk+kafka日志管理系统

1、简介对于日志来说,最常见的需求就是收集、查询、显示,正对应logstash、elasticsearch、kibana的功能。 ELK日志系统在系统中,主要可解决的问题:基于日志的数据挖掘问题排查,上线检查根据关键字查询日志详情异常数据... 查看详情

离线部署elk+kafka日志管理系统

1、简介对于日志来说,最常见的需求就是收集、查询、显示,正对应logstash、elasticsearch、kibana的功能。 ELK日志系统在系统中,主要可解决的问题:基于日志的数据挖掘问题排查,上线检查根据关键字查询日志详情异常数据... 查看详情

elk企业级日志分析系统(代码片段)

目录前言一、ELK概述1.1ELK日志分析系统1.2ELK中日志处理步骤1.3Elasticsearch概述1.4LogStash概述1.5Kibana概述1.6Filebeat完整日志系统基本特征ELK的工作原理:二、部署ELK日志分析系统2.1配置Elasticsearch环境2.2安装elasticsearch-head插件2.3安装Logst... 查看详情

eflfk——elk日志分析系统+kafka+filebeat架构(代码片段)

ELFK——ELK结合filebeat日志分析系统(2)_Evens7xxX的博客-CSDN博客紧接上期,在ELFK的基础上,添加kafka做数据缓冲附kafka消息队列 nginx服务器配置filebeat收集日志:192.168.116.40,修改配置将采集到的日志转发给k... 查看详情

elk企业级日志分析系统概述及部署(代码片段)

ELK企业级日志分析系统概述及部署一、ELK日志分析系统概述1、ELK简介2、为什么要使用ELK?3、完整日志系统基本特征4、ELK的工作原理5、ELK日志处理步骤二、Elasticsearch概述1、Elasticsearch特性三、LogStash概述1、LogStash主要组件四... 查看详情

elk日志分析系统(企业级)(代码片段)

目录一.ELK概述1.ELK简介2.使用ELK原因3.完整日志系统基本特征4.ELK的工作原理二.ELK日志分析系统部署节点部署信息1.ELKElasticsearch集群部署(==在Node1、Node2节点上操作==)①环境准备②部署Elasticsearch软件③安装Ela... 查看详情

elk日志分析系统(企业级)(代码片段)

目录一.ELK概述1.ELK简介2.使用ELK原因3.完整日志系统基本特征4.ELK的工作原理二.ELK日志分析系统部署节点部署信息1.ELKElasticsearch集群部署(==在Node1、Node2节点上操作==)①环境准备②部署Elasticsearch软件③安装Ela... 查看详情

elk企业级日志分析系统(代码片段)

文章目录ELK概述ELK简介ElasticSearchLogstashKiabanaFilebeat为什么要使用ELK完整日志系统基本特征ELK的工作原理环境介绍配置步骤配置Elasticsearch环境配置elasticsearch环境(node1、node2)安装elasticsearch-head插件安装logstashELK概述ELK简介... 查看详情

elk企业级日志分析系统(代码片段)

文章目录ELK概述ELK简介ElasticSearchLogstashKiabanaFilebeat为什么要使用ELK完整日志系统基本特征ELK的工作原理环境介绍配置步骤配置Elasticsearch环境配置elasticsearch环境(node1、node2)安装elasticsearch-head插件安装logstashELK概述ELK简介... 查看详情