Python Elasticsearc

发布时间:2019-09-23 17:10:00编辑:auto阅读(1571)

    环境

    • Centos 7.4
    • Python 2.7
    • Pip 2.7 MySQL-python 1.2.5 Elasticsearc 6.3.1
    • Elasitcsearch6.3.2

    知识点

    • 调用Python Elasticsearh API
    • Python Mysqldb使用
    • DSL查询与聚合
    • Python 列表操作

    代码

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    #minyt 2018.9.1
    #获取24小时内出现的模块次数
    # 该程序通过elasticsearch python client 获取相关精简数据,可以计算请求数、超时数、错误数、正确率、错误率等等
    import MySQLdb
    from elasticsearch import Elasticsearch
    from elasticsearch import helpers
    
    #定义elasticsearch集群索引名
    index_name = "logstash-nginxlog-*"
    
    #实例化Elasticsearch类,并设置超时间为180秒,默认是10秒的,如果数据量很大,时间设置更长一些
    es = Elasticsearch(['elasticsearch01','elasticsearch02','elasticsearch03'],timeout=180)
    
    #DSL(领域特定语言)查询语法,查询top50 sname的排列次数
    data_sname = {
      "aggs": {
        "2": {
          "terms": {
            "field": "apistatus.sname.keyword",
            "size": 100,
            "order": {
              "_count": "desc"
            }
          }
        }
      },
      "size": 0,
      "_source": {
        "excludes": []
      },
      "stored_fields": [
        "*"
      ],
      "script_fields": {},
      "docvalue_fields": [
        "@timestamp"
      ],
      "query": {
        "bool": {
          "must": [
            {
              "match_all": {}
            },
            {
              "range": {
                "@timestamp": {
                  "gte" : "now-24h/h",
                  "lt" :  "now/h"
                }
              }
            }
          ],
          "filter": [],
          "should": [],
          "must_not": []
        }
      }
    }
    
    #按照DSL(特定领域语言)语法查询获取数据
    def get_original_data():
        try:
            #根据上面条件搜索数据
            res = es.search(
                index=index_name,
                size=0,
                body=data_sname
            )
            return res
    
        except:
            print "get original data failure"
    
    #初始化数据库
    def init_mysql():
        # 打开数据库连接
        db = MySQLdb.connect("localhost", "myuser", "mypassword", "mydb", charset='utf8' )
    
        # 使用cursor()方法获取操作游标 
        cursor = db.cursor()
    
        # SQL 更新语句
        sql = "update appname set count=0"
        try:
            # 执行SQL语句
            cursor.execute(sql)
            # 提交到数据库执行
            db.commit()
        except:
            # 发生错误时回滚
            db.rollback()
    
        # 关闭数据库连接
        db.close()
    
    def updata_mysql(sname_count,sname_list):
        # 打开数据库连接
           db = MySQLdb.connect("localhost", "myuser", "mypassword", "mydb", charset='utf8' )
    
        # 使用cursor()方法获取操作游标 
        cursor = db.cursor()
    
        # SQL 更新语句
        sql = "update appname set count=%d where sname = '%s'" % (sname_count,sname_list)
        try:
            # 执行SQL语句
            cursor.execute(sql)
            # 提交到数据库执行
            db.commit()
        except:
            # 发生错误时回滚
            db.rollback()
    
        # 关闭数据库连接
        db.close()
    
    #根据Index数据结构通过Elasticsearch Python Client上传数据到新的Index
    def import_process_data():
        try:
            #列表形式显示结果
            res = get_original_data()
            #print res
            res_list = res.get('aggregations').get('2').get('buckets')
            #print res_list
    
            #初始化数据库
            init_mysql()
    
            #获取24小时内出现的SNAME 
            for value in res_list:
                sname_list = value.get('key')
                sname_count = value.get('doc_count')
                print sname_list,sname_count
                #更新sname_status值
                updata_mysql(sname_count,sname_list)
    
        except Exception, e:
            print repr(e)
    
    if __name__ == "__main__":
        import_process_data()

    总结

    关键是DSL语法的编写涉及查询与聚合可以通过kibana的visualize或者devtool先测试出正确语法,然后结合python对列表、字典、除法、字符串等操作即可。下面汇总下各个算法:

    • 总请求
      http_host.keyword: api.mydomain.com

    • 超长请求
      http_host.keyword: api.mydomain.com AND request_time: [1 TO 600] NOT apistatus.status.keyword:*错误

    • 错误请求
      apistatus.status.keyword:*错误 AND (http_host.keyword: api.mydomain.com OR http_host.keyword: api.yourdomain.com )

    • 请求健康度
      域名与request_time聚合,域名请求时间小于3秒的次数除以总请求次数对应各个域名健康度

    • 请求正确率
      域名与http状态码聚合,域名http状态码为200的次数除以域名总请求数对应各个域名的请求正确率

关键字