elk笔记22.2--通过api快速创建索引

昕光xg 昕光xg     2022-12-02     714

关键词:


elk笔记22.2--通过api快速创建索引

1 简介

本文基于 ​​elk笔记22–通过api快速创建索引​​ 继续通过 api 快速创建索引。本节将追加一个db模块存储索引,并添加通过flask程序提供对外的api,后续会在此基础之上新增一个简单的UI界面,同时会追加一个索引定期删除功能。

2 功能实现

本文前置条件需要安装一套elk实例和mysql,具体安装方法可以参考笔者博文:
​​​elk笔记1–搭建elk集群​​​​elk笔记2–使用docker启一套elk实例​​​​docker笔记5–配置MySQL​

本文设计的主要模块包括如下内容:

  1. config.py,记录常见es、kibana、mysql 等基础配置
  2. db_helper.py,封装常见的mysql 操作
  3. es_helper.py,封装常见的elk操作
  4. create_index.py,flask主程序,用于实现创建和查看索引的主逻辑

2.1 源码

config.py

#!/usr/bin/python3
# -*- coding:utf-8 -*-

ES_URL = "http://127.0.0.1:9200"
KIBANA_URL = "http://127.0.0.1:5601"
ES_USER = "elastic"
ES_PWD = "elastic"
KIBANA_USER = "elastic"
KIBANA_PWD = "elastic"

SQL_HOST = 127.0.0.1
SQL_USER = root
SQL_PWD = 111111
SQL_DBNAME = bigdata_sre_log

db_helper.py

#!/usr/bin/python3
# -*- coding:utf-8 -*-

import pymysql
import traceback


class MysqlHelper:
def __init__(self, sql_host, sql_user, sql_pwd, db_name):
self.host = sql_host
self.usr = sql_user
self.pwd = sql_pwd
self.dbname = db_name
self.port = 3306
self.charset = utf8
self.db = pymysql.connect(host=self.host, user=self.usr, passwd=self.pwd, db=self.dbname, charset=self.charset)

def close_db(self):
self.db.close()

def show_version(self):
# 使用 cursor() 方法创建一个游标对象 cursor
cursor = self.db.cursor()
# 使用 execute() 方法执行 SQL 查询
cursor.execute("SELECT VERSION()")
# 使用 fetchone() 方法获取单条数据.
data = cursor.fetchone()
print(f"Database version : data")

def get_indices_list(self):
ret_list = []
instance = None
sql = f"select number, cluster, index_name,save_days, ilp_name, datetime from es_indices"
try:
cursor = self.db.cursor()
cursor.execute(sql)
ret = cursor.fetchall()
if ret is None:
return []
for item in ret:
instance = number: item[0], cluster: item[1], index_name: item[2], save_days: item[3],
ilp_name: item[4], datetime: item[5].strftime("%Y-%m-%d %H:%M:%S")
ret_list.append(instance)
print(f"get_indices_list succeed!")
return True, ret_list
except Exception as e:
self.db.rollback()
print(f"get_indices_list error:\\ne, traceback.print_exc()")
return False, f"get_indices_list error:\\ne, traceback.print_exc()"

def get_index_instance(self, cluster, index_name):
sql = f"select number, cluster, index_name, save_days, ilp_name, datetime from es_indices " \\
f"where cluster=cluster and index_name=index_name"
try:
cursor = self.db.cursor()
cursor.execute(sql)
item = cursor.fetchone()
if item is None:
return False, f"no cluster/index_name"
instance = number: item[0], cluster: item[1], index_name: item[2], save_days: item[3],
ilp_name: item[4], datetime: item[5].strftime("%Y-%m-%d %H:%M:%S")
print(f"get_index_instance succeed!")
return True, instance
except Exception as e:
print(f"get_index_instance error:\\ne, traceback.print_exc()")
return False, f"get_index_instance error:\\ne, traceback.print_exc()"

def judge_index_instance_exist(self, cluster, index_name):
tag, ret = self.get_index_instance(cluster, index_name)
if tag:
return True
return False

def add_index_instance(self, instance):
if self.judge_index_instance_exist(instance[cluster], instance[index_name]):
return False, f"instance[cluster]/instance[index_name] exist, please do not repeat!"
cursor = self.db.cursor()
sql = "INSERT INTO es_indices(cluster, index_name, save_days, ilp_name) VALUES ( %s, %s, %s, %s)"
try:
cursor.execute(sql, [instance[cluster], instance[index_name], instance[save_days],
instance[ilp_name]])
self.db.commit()
return True, f"add_index_instance succeed!"
except Exception as e:
self.db.rollback()
print(f"add_index_instance error:\\ne, traceback.print_exc()")
return False, f"add_index_instance error:\\ne, traceback.print_exc()"


if __name__ == "__main__":
sql_host = 127.0.0.1
sql_user = root
sql_pwd = 111111
sql_dbname = bigdata_sre_log

sql = MysqlHelper(sql_host, sql_user, sql_pwd, sql_dbname)
sql.show_version()

instance = "index_name": "test-elk", "cluster": "sre-elk",
"save_days": 10, "ilp_name": "ilp-default"
print(sql.add_index_instance(instance))

print(sql.get_indices_list())

sql.close_db()

es_helper.py

#!/usr/bin/python
# -*- coding:utf-8 -*-

import requests
from base64 import b64encode


def get_base64_str(str_text):
str_user_pwd = b64encode(str_text.encode(ascii))
return str_user_pwd.decode(ascii)


class ElasticHelper:
def __init__(self, es_url, es_user, es_pwd, es_index_name, es_ilp_name):
self.url = es_url
self.user = es_user
self.pwd = es_pwd
self.index_name = es_index_name
self.ilp_name = es_ilp_name

def get_index_template(self):
print(f"get_index_template: self.index_name")
url_full = self.url + f"/_template/self.index_name"
re = requests.get(url_full, auth=(self.user, self.pwd))
if re.status_code == 200:
return re.json()
else:
print("request error, not 200")
return None

def put_index_template(self):
body = order: 0,
index_patterns: [f"self.index_name-*"],
settings:
index:
number_of_shards: 2,
number_of_replicas: 0

,
mappings:
"properties":
"@timestamp":
"type": "date"


,
aliases:

url_full = self.url + f"/_template/self.index_name"
re = requests.put(url_full, json=body, auth=(self.user, self.pwd))
if re.status_code == 200:
print(re.json())
else:
print("request error, not 200")

def update_index_template_with_ilp(self):
ret = self.get_index_template()
if ret is None:
print(f"index template self.index_name-* not exist, please create one!")
body = ret[self.index_name]
body[settings][index][lifecycle] = name: fself.ilp_name,
rollover_alias: fself.index_name_write
re = requests.post(f"self.url/_template/self.index_name", json=body, auth=(elastic, elastic))
if re.status_code == 200:
print(f"put_index_template(index=self.index_name,ilp=self.ilp_name) succeed:\\ninfo re.status_code, "
f"re.text")
else:
print(f"put_index_template(index=self.index_name,ilp=self.ilp_name) failed:\\ninfo re.status_code, "
f"re.text")

def create_rollover_index(self):
body =
"aliases":
f"self.index_name_write":


re = requests.put(url=self.url+f"/%3Cself.index_name-%7Bnow%2Fd%7D-000001%3E", json=body, auth=(self.user,
self.pwd))
if re.status_code == 200:
print(re.json())
else:
print(f"create rollover index self.index_name failed!\\nre.status_code,\\nre.text")

def judge_alias_exist(self):
re = requests.get(f"self.url/_cat/aliases", auth=(elastic, elastic))
if re.status_code == 200:
ret_str = re.text
if ret_str.find(f"self.index_name_write") != -1:
return True
else:
False
else:
print(f"judge_alias_exist self.index_name_write failed!\\nre.status_code,\\nre.text")
return False


class KibanaHelper:
def __init__(self, kibana_url, kibana_user, kibana_pwd, kibana_index_pattern_name):
self.url = kibana_url
self.user = kibana_user
self.pwd = kibana_pwd
self.index_pattern_name = kibana_index_pattern_name

def get_index_pattern_id(self):
url = fself.url/api/saved_objects/_find?fields=title&fields=type&per_page=10000&type=index-pattern
headers = Authorization: Basic + get_base64_str(self.user + : + self.pwd)
ret = requests.get(url=url, headers=headers)
for item in (ret.json())[saved_objects]:
if item[attributes][title] == self.index_pattern_name:
return item[id]
return None

def get_kibana_index_pattern(self):
print(f"get_kibana_index_pattern index_pattern_name-*")
headers = Authorization: Basic + get_base64_str(self.user + : + self.pwd), kbn-xsrf: reporting
id = self.get_index_pattern_id()
url_full = self.url + f"/api/saved_objects/_bulk_get"
body = ["id": id, "type": "index-pattern"]
re = requests.post(url=url_full, json=body, headers=headers)
if re.status_code == 200:
print(re.text)
else:
print(f"error:\\n re.text")

def create_kibana_index_pattern(self):
print(f"index_pattern=self.index_pattern_name")
ret = self.get_index_pattern_id()
if ret is not None:
print(f"self.index_pattern_name has been existed, please do not repeat")
exit(0)
headers = Authorization: Basic + get_base64_str(self.user + : + self.pwd), kbn-xsrf: reporting
url_full = self.url + "/api/saved_objects/index-pattern"
body =
"attributes":
"title": self.index_pattern_name,
"timeFieldName": "@timestamp"


re = requests.post(url_full, json=body, headers=headers)
if re.status_code == 200:
print(re.text)
else:
print(f"error:\\n re.text")

def delete_index_pattern_id(self, index_pattern_id):
print(f"delete index_pattern_id=index_pattern_id")
headers = Authorization: Basic + get_base64_str(self.user + : + self.pwd), kbn-xsrf: reporting
url_full = self.url + f"/api/saved_objects/index-pattern/index_pattern_id?force=false"
re = requests.delete(url_full, headers=headers)
if re.status_code == 200:
print(f"delete_index_pattern_id(index_pattern_id) succeed:\\ninfo re.status_code, re.text")
else:
print(f"delete_index_pattern_id(index_pattern_id) error:\\ninfo re.status_code, re.text")

def get_ilm(self):
url_full = self.url+"/api/index_lifecycle_management/policies?withIndices=true"
headers = Authorization: Basic + get_base64_str(self.user + : + self.pwd), kbn-xsrf: reporting
re = requests.get(url_full, headers=headers)
if re.status_code == 200:
print(f"get_ilm succeed")
return re.json()
else:
print(f"get_ilm failed\\ninfo re.status_code,re.text")
return None

def judge_ilp_exist(self, ilp_name):
ilp_list = self.get_ilm()
if ilp_list is None:
return False
else:
for item in ilp_list:
if ilp_name == item["name"]:
return True
return False


if __name__ == "__main__":
es_url = "http://127.0.0.1:9200"
es_user = "elastic"
es_pwd = "elastic"
kibana_url = "http://127.0.0.1:5601"
kibana_user = "elastic"
kibana_pwd = "elastic"
index_name = "test-elk"
index_pattern_name = index_name + "-*"
ilp_name = "ilp-default"
es = ElasticHelper(es_url, es_user, es_pwd, index_name, ilp_name)
kibana = KibanaHelper(kibana_url, kibana_user, kibana_pwd, index_pattern_name)
es.put_index_template()
if not kibana.judge_ilp_exist(ilp_name):
print(f"ilp_name=ilp_name not exist, please change another one")
exit(0)
es.update_index_template_with_ilp()
if es.judge_alias_exist():
print(f"alias_name=index_name_write exists, please change another one")
exit(0)
es.create_rollover_index()
kibana.create_kibana_index_pattern()

create_index.py

#!/usr/bin/python3
# -*- coding:utf-8 -*-


add_index
list/<cluster>/<index_name>


import json
from flask import Flask
from flask import request

from db_helper import MysqlHelper
from es_helper import ElasticHelper, KibanaHelper
from config import *

app = Flask(__name__)


@app.route(/)
def voice_root():
return Hello, this web service is for create es index!\\n


@app.route(/add_index/, methods=[GET, POST])
def add_index():
if request.method == POST:
post_data = request.data
post_dict = json.loads(post_data)
# print(f"add_index post_data:\\npost_dict")
# post_json_str = json.dumps(post_dict)
# print(f"add_index post_json_str:\\npost_json_str")
instance = index_name: post_dict[index_name], "cluster": post_dict[cluster],
"save_days": post_dict[save_days], "ilp_name": post_dict[ilp_name]
sql = MysqlHelper(SQL_HOST, SQL_USER, SQL_PWD, SQL_DBNAME)
ret = sql.judge_index_instance_exist(post_dict[cluster], post_dict[index_name])

if ret:
return f"index post_dict[cluster]/post_dict[index_name] exists, please do not repeat"

index_name = post_dict[index_name]
index_pattern_name = index_name + "-*"
ilp_name = post_dict[ilp_name]
es = ElasticHelper(ES_URL, ES_USER, ES_PWD, index_name, ilp_name)
kibana = KibanaHelper(KIBANA_URL, KIBANA_USER, KIBANA_PWD, index_pattern_name)
if not kibana.judge_ilp_exist(ilp_name):
print(f"ilp_name=ilp_name not exist, please change another one")
return f"ilp_name=ilp_name not exist, please change another one"
es.put_index_template()
es.update_index_template_with_ilp()
if es.judge_alias_exist():
print(f"alias_name=index_name_write exists, please change another one")
return f"alias_name=index_name_write exists, please change another one"
es.create_rollover_index()
kibana.create_kibana_index_pattern()

tag, ret = sql.add_index_instance(instance)
return f"tag\\nret"
else:
return add_index, please post data!\\n


@app.route(/list/)
@app.route(/list/<cluster>/<index_name>/)
def list_index(cluster=None, index_name=None):
print(f"1,cluster,2index_name")
if (cluster is None) and (index_name is None):
sql = MysqlHelper(SQL_HOST, SQL_USER, SQL_PWD, SQL_DBNAME)
tag, ret = sql.get_indices_list()
sql.close_db()
if tag:
print(ret)
return json.dumps(ret)
else:
return ret
else:
sql = MysqlHelper(SQL_HOST, SQL_USER, SQL_PWD, SQL_DBNAME)
tag, ret = sql.get_index_instance(cluster, index_name)
sql.close_db()
if tag:
print(ret)
return json.dumps(ret)
else:
return ret


if __name__ == __main__:
app.run(host="0.0.0.0", port=8080)

2.2 测试

新建数据库:

数据库名称:bigdata_sre_log
表名称:es_indices
字段:
number INT,自增
cluster VARCHAR
index_name VARCHAR
save_days INT
ilp_name VARCHAR
datetime DATETIME, 默认值 CURRENT_TIMESTAMP

post 如下内容:

POST http://127.0.0.1:8080/add_index
"index_name": "test-elk", "cluster": "sre-elk", "save_days": 10, "ilp_name": "ilp-default"

elk笔记22.2--通过api快速创建索引_mysql


kibana 查看数据(先写入1条数据):

POST test-elk_write/_doc

"@timestamp": "2021-08-30T23:00:00+08:00",
"name": "test001"

elk笔记22.2--通过api快速创建索引_通过api快速创建索引_02


调试结果(正常输入相关日志):

查看详情

elk笔记9--跨集群搜索

elk笔记9--跨集群搜索​​1.跨集群搜索简介​​​​2.跨集群搜索配置​​​​3跨集群使用案例​​​​4说明​​1.跨集群搜索简介跨集群允许我们在一个或者多个远程集群上执行搜索任务,通常我们可以用跨集群搜索来过滤或... 查看详情

elk学习笔记之kibana入门使用

Kibana入门使用: 第一次导入索引:修改展示时间,不然查不到数据:点Discover,查阅数据:如果要添加新的index:点击Visualize,创建chart:点击Dashboard,布局:DashBoard完工啦!!!嘿嘿嘿  查看详情

elk笔记11--快照的使用

elk笔记11--快照的使用​​1快照介绍​​​​2快照使用​​​​2.1nfs作为存储仓库​​​​2.2hdfs作为存储仓库​​​​3使用技巧​​​​4说明​​1快照介绍快照是运行中es集群的一个备份,进行快照时候既可以全集群所有索引... 查看详情

elk学习笔记之elasticsearchshard和segment概念

Shard和segment概念: 转载: http://blog.csdn.net/likui1314159/article/details/53217750Shard(分片)一个Shard就是一个Lucene实例,是一个完整的搜索引擎。一个索引可以只包含一个Shard,只是一般情况下会用多个分片,可以拆分索引到不... 查看详情

linux快速搭建elk日志分析平台

...elastic.co/下载指定版本的安装包:elasticsearch-7.9.1.tar.gz通过XFTP上传安装包到指定目录3.解压安装包至es包下tar-zxvfelastics 查看详情

linux快速搭建elk日志分析平台

...elastic.co/下载指定版本的安装包:elasticsearch-7.9.1.tar.gz通过XFTP上传安装包到指定目录3.解压安装包至es包下tar-zxvfelasticsearch-7.9.1.tar.gzTODO 查看详情

linux快速搭建elk日志分析平台

...elastic.co/下载指定版本的安装包:elasticsearch-7.9.1.tar.gz通过XFTP上传安装包到指定目录3.解压安装包至es包下tar-zxvfelasticsearch-7.9.1.tar.gzTODO 查看详情

elk学习笔记之elasticsearch环境搭建

ELK概述:ElasticSearch是个开源分布式搜索引擎,它的特点有:分布式,零配置,自动发现,索引自动分片,索引副本机制,restful风格接口,多数据源,自动搜索负载等Logstash是一个完全开源的工具,它可以对你的日志进行收集、... 查看详情

elk日志服务器的快速搭建并收集nginx日志

    今天给大家带来的是开源实时日志分析ELK,ELK由ElasticSearch、Logstash和Kiabana三个开源工具组成。官方网站:https://www.elastic.co其中的3个软件是:  Elasticsearch是个开源分布式搜索引擎,它的特点有:分布式,... 查看详情

elk专栏之es索引-04(代码片段)

ELK专栏之ES索引-04索引Index入门为什么我们要手动创建索引?索引管理创建索引查询索引修改索引删除索引定制分词器默认分词器修改分词器的位置定制自己的分词器type底层结构及弃用原因type是什么?ES中不同的type存储... 查看详情

通过beego快速创建一个restful风格api项目及api文档自动化(转)

通过beego快速创建一个Restful风格API项目及API文档自动化本文演示如何快速(一分钟内,不写一行代码)的根据数据库及表创建一个Restful风格的API项目,及提供便于在线测试API的界面。一、创建数据库及数据表(MySQL)#db--jeedev--------------... 查看详情

2021年大数据elk(二十五):添加elasticsearch数据源

...ibana索引模式添加Elasticsearch数据源一、Kibana索引模式可以通过定义索引模式(IndexPatterns)来对应匹配Elasticsearch索引。在第一次访问Kibana的时候,系统会提示我们定义一个索引模式。或者我们可以通过点击按钮,... 查看详情

《高性能mysql》读书笔记之创建高性能的索引

索引是存储引擎用于快速找到记录的一种数据结构。索引优化是对查询性能优化的最有效手段。索引能够轻易将查询性能提高几个数量级。创建一个最优的索引经常需要重写查询。5.1 索引基础   在MySQL中,存储引擎首先在索... 查看详情

elk专栏之es快速入门-01(代码片段)

ELK专栏之ES快速入门-01ELK简介ElasticStack简介ElasticStack简介ElasticStack的特色ElasticStack组件介绍ElasticSearch是什么?搜索是什么?数据库做搜索的弊端站内搜索(垂直搜索)互联网搜索全文检索、倒排索引和Lucene全文检... 查看详情

mysql学习笔记(十五)——索引的创建和设计原则(代码片段)

1.为什么使用索引1.1不加索引没有索引,整张表读取数据,然后利用数据来比较条件,捞出符合条件的数据,表有很多数据,这些数据都会通过磁盘IO来读取,很耗时。1.2加索引加索引后,通过索引可以... 查看详情

windows10下elk环境快速搭建实践

...、应用程序日志和安全日志等。系统运维和开发人员可以通过日志了解服务器软硬件信息、检查配置过程中的错误及故障发生的原因。分析日志可以了解服务器的负荷,性能安全性,从而及时采取措施纠正错误,因此日志的重要... 查看详情

dubbo--系统学习笔记--快速启动

快速启动服务提供者服务消费者快速启动  Dubbo采用全Spring配置方式,透明化接入应用,对应用没有任何API侵入,只需用Spring加载Dubbo的配置即可,Dubbo基于Spring的Schema扩展进行加载。  如果不想使用Spring配置,而希望通过API... 查看详情