Python Elasticsearch api

发布时间:2020-02-17 21:37:18编辑:admin阅读(2947)

    一、介绍

    ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。下面介绍了利用Python API接口进行数据查询,方便其他系统的调用。

    安装API

    pip3 install elasticsearch

     

    建立es连接

    无用户名密码状态

    from elasticsearch import Elasticsearch
    es = Elasticsearch([{'host':'10.10.13.12','port':9200}])

     

    默认的超时时间是10秒,如果数据量很大,时间设置更长一些。如果端口是9200,直接写IP即可。代码如下:

    es = Elasticsearch(['10.10.13.12'], timeout=3600)

     

    用户名密码状态

    如果Elasticsearch开启了验证,需要用户名和密码

    es = Elasticsearch(['10.10.13.12'], http_auth=('xiao', '123456'), timeout=3600)

     

    数据检索功能

    es.search(index='logstash-2015.08.20', q='http_status_code:5* AND server_name:"web1"', from_='124119')

     

    常用参数

    • index - 索引名

    • q - 查询指定匹配 使用Lucene查询语法

    • from_ - 查询起始点  默认0

    • doc_type - 文档类型

    • size - 指定查询条数 默认10

    • field - 指定字段 逗号分隔

    • sort - 排序  字段:asc/desc

    • body - 使用Query DSL

    • scroll - 滚动查询

     

    统计查询功能

    语法同search大致一样,但只输出统计值

    es.count(index='logstash-2015.08.21', q='http_status_code:500')

    输出:

    {'_shards':{'failed':0, 'successful':5, 'total':5}, 'count':17042}

     

    17042 就是统计值!

     

    知识扩展

    滚动demo

    # Initialize the scroll
    page = es.search(
        index ='yourIndex',
        doc_type ='yourType',
        scroll ='2m',
        search_type ='scan',
        size =1000,
        body ={
        # Your query's body
    })
     
    sid = page['_scroll_id']
    scroll_size = page['hits']['total']
     
    # Start scrolling
    while(scroll_size >0):
        print "Scrolling..."
        page = es.scroll(scroll_id = sid, scroll ='2m')
        # Update the scroll ID
        sid = page['_scroll_id']
        # Get the number of results that we returned in the last scroll
        scroll_size = len(page['hits']['hits'])
        print "scroll size: "+ str(scroll_size)
        # Do something with the obtained page

    以上demo实现了一次取若干数据,数据取完之后结束,不会获取到最新更新的数据。我们滚动完之后想获取最新数据怎么办?滚动的时候会有一个统计值,如total: 5。跳出循环之后,我们可以用_from参数定位到5开始滚动之后的数据。

     

    但是我用的不是这个,用的是以下方法,链接如下:

    https://www.cnblogs.com/blue163/p/8126156.html

     

    在下面的内容中,我会详细介绍此代码如何使用!

     

    二、Query DSL

    range过滤器查询范围

    gt: > 大于
    lt: < 小于
    gte: >= 大于或等于
    lte: <= 小于或等于

     

    示例代码1

    "range":{
        "money":{
            "gt":20,
            "lt":40
        }
    }


     

    时间范围

    最近时间段

    比如我要查询最近1分钟的

    "range": {
        '@timestamp': {'gt': 'now-1m'}
    }

     

    最新1小时

    "range": {
        '@timestamp': {'gt': 'now-1h'}
    }

     

    最新1天的

    "range": {
        '@timestamp': {'gt': 'now-1d'}
    }

     

    指定时间段

    那么问题来了,它是根据当前时间来计算最近的时间。但是有些情况下,我需要制定时间范围,精确到分钟

    假设需要查询早上8点到9点的数据,可以这样

    "range": {
        '@timestamp': {
            "gt" : "{}T{}:00:00".format("2018-12-17","08"),
            "lt": "{}T{}:59:59".format("2018-12-17","09"),
            "time_zone": "Asia/Shanghai"
        }
    }


     

    注意:日期和小时之间,有一个字母T来间隔。不能用空格!

    time_zone 表示时区,如果默认的时区不会,可能会影响查询结果!

     

    bool组合过滤器

    must:所有分句都必须匹配,与 AND 相同。
    must_not:所有分句都必须不匹配,与 NOT 相同。
    should:至少有一个分句匹配,与 OR 相同。

     

    示例代码

    {
        "bool":{
          "must":[],
          "should":[],
          "must_not":[],
        }
    }


     

    term过滤器

    term单过滤

    {    "terms":{
          "money":20
        }
    }

     

    表示money包含20的记录

     

    terms复数版本

    允许多个匹配条件

    {    "terms":{
          "money":20
        }
    }

     

    表示money包含20或者30的记录

     

    结合bool+term来举一个实际的例子:

    查询path字段中包含applogs最近1分钟的记录

    "bool": {
        "must": [
            {
                "terms": {
                    "path": [
                        "applogs",
                    ]
                }
            },
            {
                "range": {
                    '@timestamp': {'gt': 'now-1m'}
                }
            }
        ]
    }

     

    这里使用了terms复数版本,可以随时添加多个条件!

    正则查询 

    {
        "regexp": {
            "http_status_code": "5.*"
        }
    }

    match查询

    match 精确匹配

    {
        "match":{
          "email":"123456@qq.com"
        }
    }

     

    multi_match 多字段搜索

    {
        "multi_match":{
          "query":"11",
          "fields":["Tr","Tq"]
        }
    }


     

    demo

    获取最近一小时的数据

    {'query':
        {'filtered':
            {'filter':
                {'range':
                    {'@timestamp':{'gt':'now-1h'}}
                }
            }
        }
    }


    条件过滤查询

    {
        "query":{
            "filtered":{
                "query":{"match":{"http_status_code":500}},
                "filter":{"term":{"server_name":"vip03"}}
            }
        }
    }

     

    Terms Facet 单字段统计

    {'facets':
        {'stat':
            {'terms':
                {'field':'http_status_code',
                  'order':'count',
            'size':50}
            }
        },
        'size':0
    }

     

    一次统计多个字段

    {'facets':
        {'cip':
            {'terms':
                {'fields':['client_ip']}},
                  'status_facets':{'terms':{'fields':['http_status_code'],
                  'order':'term',
                  'size':50}}},
            'query':{'query_string':{'query':'*'}},
        'size':0
    }

     

    多个字段一起统计

    {'facets':
        {'tag':
            {'terms':
                {'fields':['http_status_code','client_ip'],
                  'size':10
               }
            }
        },
        'query':
            {'match_all':{}},
        'size':0
    }

     

    数据组装

    以下是kibana首页的demo,用来统计一段时间内的日志数量

    {
      "facets": {
        "0": {
          "date_histogram": {
            "field": "@timestamp",
            "interval": "5m"
          },
          "facet_filter": {
            "fquery": {
              "query": {
                "filtered": {
                  "query": {
                    "query_string": {
                      "query": "*"
                    }
                  },
                  "filter": {
                    "bool": {
                      "must": [
                        {
                          "range": {
                            "@timestamp": {
                              'gt': 'now-1h'
                            }
                          }
                        },
                        {
                          "exists": {
                            "field": "http_status_code.raw"
                          }
                        },
                        # --------------- -------
                        # 此处加匹配条件
                      ]
                    }
                  }
                }
              }
            }
          }
        }
      },
      "size": 0
    }

     

    如果想添加匹配条件,在以上代码标识部分加上过滤条件,按照以下代码格式即可

    {
    "query": {
        "query_string": {"query": "backend_name:baidu.com"}
        }
    },

     

    先介绍到这里,后续会有Query DSL API介绍。

     

    三、需求分析

    需求

    下面是kibana展示的日志

    1.png

    需要统计某一天的日志,统计每一个小时用户数,要求用户id不能重复。一个用户id就是一个用户,也称之为一个PV。

    看一段message字段信息

    2018-12-17 12:00:00,533 l=INFO [r=9538381535][s=2] [t=http-xxx-543] [APP=user] [Class=o.s.u.c.AccountController:1189] [Method=findCustomerByLoid]- Operation=find customer by loid,Params=loid:001,Content=start


    其中有一个[r=9538381535],这个9538381535就是用户id。那么用户登录手机APP操作,都会带着这个id,产生一条日志。

    比如user项目,那么最终要的数据格式如下:

    "user":{
        "00":1,
        "01":0,
        ...
        "22":3245,
        "23":765
    }


     

    这里使用24小时制来表示每一个时间段,有多个个用户访问了。注意:已经去重了用户id,统计用户数!

     

    四、相关技术点

    在放出最终代码之前,先来介绍相关技术点,便于理解代码。按照代码从上到下原则,分别来介绍!

    项目列表

    project_list = ['user',...]

    实际的项目是user,但是存储到elasticsearch中,是userlogs,加了一个logs后缀。这个是java后端代码定义的,便于识别!

     

    判断日期是否合法

    def isVaildDate(self, date):
        try:
            if ":" in date:
                time.strptime(date, "%Y-%m-%d %H:%M:%S")
            else:
                time.strptime(date, "%Y-%m-%d")
            return True
        except:
            return False

    因为需要统计一周的数据,所以脚本执行时,需要传一个日期参数。那么日期参数,传给程序是否合法呢?需要有一个函数来判断!

     

    记录日志

    def write_log(self, content):
            """
            写入日志文件
            :param path:
            :param content:
            :return:
            """
            path = "print.log"
            with open(path, mode='a+', encoding='utf-8') as f:
                content = time.strftime('%Y-%m-%d %H:%M:%S') + ' ' + content + "\n"
                print(content)
                f.write(content)


    为啥不用Python的日志模块呢?因为测试发现,它写入一些,我不想要的信息,太占用磁盘空间了。所以,我单独写了一个记录日志方法。

     

    获取elasticsearch数据

    def Get_Data_By_Body(self, project, fixed_date, hour):
        """
        获取数据
        :param project: 项目名
        :param fixed_date: 指定日期
        :param hour: 24小时制中的某一个小时
        :return: object
        """
        # 查询条件,查询项目最近1小时的数据。
        doc = {
            "query": {
                "bool": {
                    "must": [
                        {
                            "terms": {
                                "path": [
                                    project + "logs",
                                ]
                            }
                        },
                        {
                            "range": {
                                '@timestamp': {
                                    "gt": "{}T{}:00:00".format(fixed_date, hour),
                                    "lt": "{}T{}:59:59".format(fixed_date, hour),
                                    "time_zone": "Asia/Shanghai"
                                }
                            }
                        }
                    ]
                }
            }
        }


    由于线上数据量过大,因此直接查询一天的数据,会卡死。所以是切分为每一个小时查询!

    上面的query表示查询语句,大概就是查询指定项目(项目名+logs),1小时范围内的数据

     

    scroll获取数据

    由于1小时内的数据量,也很大。不能直接返回!默认不指定size,是返回10条数据!

    size = 1000  # 指定返回1000条
    queryData = self.es.search(index=self.index_name, body=doc, size=size, scroll='1m', )


    参数解释:

    size 指定返回的条数,默认返回10条

    index 指定索引名

    body 查询语句

    scroll 告诉 Elasticsearch 把搜索上下文再保持一分钟。1m表示1分钟

     

    返回结果

    mdata = queryData.get("hits").get("hits")  # 返回数据,它是一个列表类型
    if not mdata:
        self.write_log('%s mdata is empty!' % project)


    queryData 返回一个字典,那么真正的查询结果在queryData['hits']['hits']中,如果这个值没有,表示没有查询到数据!

    注意:它并不是返回所有的结果,而是一页的数据,是一个列表类型。因为我们使用了scroll获取数据,只返回一页!

     

    分页数据

    上面只是返回了1页,我要所有数据,怎么办?需要使用分页,先来看一下分页公式

    divmod(总条数, 每页大小)

     

    注意:divmod返回一个元祖,第一个元素,就是要分页数

     

    总条数,使用

    total = queryData['hits']['total']  # 返回数据的总条数

     

    每页大小,就是上面指定的size

    size = 1000  # 指定返回1000条

     

    那么遍历每一页数据,需要这样

    scroll_id = queryData['_scroll_id']  # 获取scrollID
    total = queryData['hits']['total']  # 返回数据的总条数
    
    # 使用divmod设置分页查询
    # divmod(total,1000)[0]+1 表示总条数除以1000,结果取整数加1
    for i in range(divmod(total, size)[0] + 1):
        res = self.es.scroll(scroll_id=scroll_id, scroll='1m')  # scroll参数必须指定否则会报错
        mdata += res["hits"]["hits"]  # 扩展列表


     

    scroll_id给es.scroll获取数据使用,这个参数必须要有。

    由于Python中的range是顾头不顾尾,所以需要加1。使用for循环,就可以遍历每一个分页数

    es.scroll(scroll_id=scroll_id, scroll='1m') 才是真正查询每一页的数据,必须要指定这2个参数。它的返回结果,就是查询结果!返回一个列表

    上面的mdata是一个列表,res也是列表。因此使用+=就可以扩展列表,得到所有数据!

     

    创建年月日目录

    def create_folder(self, fixed_date):
        """
        创建年/月/日 文件夹
        :return: path
        """
    
        # 系统当前时间年份
        # year = time.strftime('%Y', time.localtime(time.time()))
        # # 月份
        # month = time.strftime('%m', time.localtime(time.time()))
        # # 日期
        # day = time.strftime('%d', time.localtime(time.time()))
    
        # 年月日
        year, month, day = fixed_date.split("-")
    
        # 具体时间 小时分钟毫秒
        # mdhms = time.strftime('%m%d%H%M%S', time.localtime(time.time()))
    
        # 判断基础目录是否存在
        if not os.path.exists(os.path.join(self.BASE_DIR, 'data_files')):
            os.mkdir(os.path.join(self.BASE_DIR, 'data_files'))
    
        # 年月日
        fileYear = os.path.join(self.BASE_DIR, 'data_files', year)
        fileMonth = os.path.join(fileYear, month)
        fileDay = os.path.join(fileMonth, day)
    
        # 判断目录是否存在,否则创建
        try:
            if not os.path.exists(fileYear):
                os.mkdir(fileYear)
                os.mkdir(fileMonth)
                os.mkdir(fileDay)
            else:
                if not os.path.exists(fileMonth):
                    os.mkdir(fileMonth)
                    os.mkdir(fileDay)
                else:
                    if not os.path.exists(fileDay):
                        os.mkdir(fileDay)
    
            return fileDay
        except Exception as e:
            print(e)
            return False


    统计结果是最终写入到一个txt里面,那么如何存储呢?使用年月日目录在区分,可以知道这个txt文件,是属于哪一天的。到了一定时间后,可以定期清理,非常方便!

    这里使用的传参方式,传入一个日期。所以使用"-"就可以切割出年月日

    # 年月日year, month, day = fixed_date.split("-")

     

    输出24小时

    使用以下代码就可以实现

    hour_list = ['{num:02d}'.format(num=i) for i in range(24)]

    输出:

    ['00', '01', '02', '03', '04', '05', '06', '07', '08', '09', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20', '21', '22', '23']

     

    项目统计字典

    需要统计每一个项目的每一个小时的用户id,用户id必须去重。既然要去重,我们首先会想到用集合。

    但是还有一个办法,使用字典,也可以去重。因为字典的key是唯一的。

    构造24小时字典

    先来构造项目user的数据,格式如下:

    "basebusiness": {
        "00": {},
        "01": {},
        "02": {},
        "03": {},
        "04": {},
        "05": {},
        "06": {},
        "07": {},
        "08": {},
        "09": {},
        "10": {},
        "11": {},
        "12": {},
        "13": {},
        "14": {},
        "15": {},
        "16": {},
        "17": {},
        "18": {},
        "19": {},
        "20": {},
        "21": {},
        "22": {},
        "23": {},
    }

     

    这只是一个项目,实际有很多项目。所以每一个字典,都有这样的24小时数据。相关代码如下:

    project_dic = {}  # 项目统计字典
    # 24小时
    hour_list = ['{num:02d}'.format(num=i) for i in range(24)]
    
    for hour in hour_list:  # 遍历24小时
        # print("查询{}点的数据###############".format(hour))
        self.write_log("查询{}点的数据###############".format(hour))
        for project in project_list:  # 遍历项目列表
            if not project_dic.get(project):
                project_dic[project] = {}  # 初始化项目字典
    
            if not project_dic[project].get(hour):
                project_dic[project][hour] = {}  # 初始化项目小时字典

     

    这里的每一个小时,都是空字典。还没有添加数据,需要添加用户id,下面会讲到!

     

    正则匹配用户id

    看这一点字符串

    2018-12-17 12:00:00,533 l=INFO [r=9538381535][s=2] [t=http-xxx-543] [APP=user]

     

    需要提取出9538381535,思路就是:匹配中括号内容-->提取以r=开头的内容-->使用等号切割,获取用户id

    匹配中括号内容

    p1 = re.compile(r'[[](.*?)[]]', re.S)  # 最小匹配,匹配中括号的内容

     

    注意:这里要使用最小匹配,不能使用贪婪匹配。这一段正则,我是用网上找的,测试ok

     

    提取和切割,就比较简单了。使用startswith和split方法,就可以了!

    使用字典去重

    接下来,需要将用户id写入到字典中,需要去重,否则字典添加时,会报错!

    那么如何使用字典去重呢?只需要遵循一个原则即可! 有则忽略,无则添加

    # 判断字典中rid不存在时,避免字典键值重复
    if not project_dic[project][hour].get(rid):
        project_dic[project][hour][rid] = True  # 添加值


    生成器

    这里主要在2个方法中,使用了生成器。生成器的优点,就是节省内容。

    一处在是Get_Data_By_Body方法中,它需要返回所有查询的数据。数据量非常大,因此必须要生成器,否则服务器内容就溢出!

    还有一处,就main方法。它是返回项目的统计结果。注意,它不是最终结果。它里面保存了每一个项目,每一个小时中的用户id,是已经去重的用户id。

    数据量也是比较大,当然,没有Get_Data_By_Body方法返回的结果大。

     

    统计每一个小时用户数

    main方法,返回的字典格式如下:

    "user":{
        "00":{
            "242412":True,
        }
        "01":{
            "":True,
        },
        ...
        "22":{
            "457577":True,
            "546583":True,
        },
        "23":{
            "457577":True,
            "546583":True,
            "765743":True,
        }
    }


     

    我需要知道,每一个小时的用户数。怎么统计呢?用2个方法

    1. 遍历字典的每一个小时,使用计数器

    2. 使用len方法(推荐)

     

    最简单的方法,就是使用len方法,就可以知道每一个小时有多少个key

    for i in dic:  # 遍历数据
        if not final_dic.get(i):
            final_dic[i] = {}  # 初始化字典
    
        for h in sorted(dic[i]):  # 遍历项目的每一个小时
            # 统计字典的长度
            final_dic[i][h] = len(dic[i][h])


     

    有序字典

    看下面的数据

    1.png

    可以发现,24小时,排序是乱的。这样给领导看时,不太美观。所以需要对24小时进行排序!

    在Python 3.6之前,字典的key是无序的。因此,需要定义一个有序字典,在写入之前,要对字典的key做一次排序。

    这样顺序写入到有序字典之后,之后再次调用,依然是有序的!

    order_dic = OrderedDict()  # 实例化一个有序字典
        final_dic = {}  # 最终统计结果
        for dic in data:  # 遍历生成器
            for i in dic:  # 遍历数据
                if not final_dic.get(i):
                    final_dic[i] = order_dic  # 初始化字典
    
                # 有序字典必须先对普通字典key做排序
                for h in sorted(dic[i]):  # 遍历项目的每一个小时
                    # 统计字典的长度
                    final_dic[i][h] = len(dic[i][h])


     

    完整代码

    #!/usr/bin/env python3
    # coding: utf-8
    
    
    import re
    import os
    import sys
    import json
    import time
    from collections import OrderedDict
    from elasticsearch import Elasticsearch
    
    # 项目列表
    project_list = ['usercenter', ['login']]
    
    
    # yesterday = (datetime.datetime.now() + datetime.timedelta(days=-1)).strftime("%Y-%m-%d")
    # today = datetime.datetime.now().strftime("%Y-%m-%d")
    
    
    class ElasticObj:
        def __init__(self, index_name, ip, fixed_date, timeout=3600):
            '''
            :param index_name: 索引名称
            :param ip: elasticsearch地址
            :param timeout: 设置超时间,默认是10秒的,如果数据量很大,时间要设置更长一些
            '''
            self.index_name = index_name
            self.ip = ip
            self.timeout = timeout
            # 无用户名密码状态
            # self.es = Elasticsearch([self.ip], timeout=self.timeout)
            # 用户名密码状态
            # self.es = Elasticsearch([ip],http_auth=('elastic', 'password'),port=9200)
            self.es = Elasticsearch([self.ip], http_auth=('elastic', '123456'), timeout=self.timeout)
    
            self.fixed_date = fixed_date  # 指定日期
    
            # 当前py文件所在的文件夹
            self.BASE_DIR = os.path.dirname(os.path.abspath(__file__))
            self.fileDay = self.create_folder()  # 创建日志和数据目录
    
        @staticmethod
        def isVaildDate(date):
            """
            判断日期是否合法
            :param date: 日期,比如: 2018-03-30
            :return: bool
            """
            try:
                if ":" in date:
                    time.strptime(date, "%Y-%m-%d %H:%M:%S")
                else:
                    time.strptime(date, "%Y-%m-%d")
                return True
            except:
                return False
    
        def write_log(self, content):
            """
            写入日志文件
            :param content: 写入内容
            :return:
            """
            path = os.path.join(self.fileDay,"output_%s.log" %self.fixed_date)
            # path = "output_{}.log".format(self.fixed_date)
            with open(path, mode='a+', encoding='utf-8') as f:
                content = time.strftime('%Y-%m-%d %H:%M:%S') + ' ' + content + "\n"
                print(content)
                f.write(content)
    
        def Get_Data_By_Body(self, project, hour):
            """
            获取数据
            :param project: 项目名
            :param hour: 24小时制中的某一个小时
            :return: 生成器
            """
            # doc = {'query': {'match_all': {}}}
            # 查询条件,查询项目最近1小时的数据。now-1h表示最近1小时
            # print(type(fixed_date))
            # print("{date}T00:00:00".format(date=fixed_date))
            # 24小时
    
            doc = {
                "query": {
                    "bool": {
                        "must": [
                            {
                                "terms": {
                                    "path": [
                                        project + "logs",
                                    ]
                                }
                            },
                            {
                                # "range": {
                                #     '@timestamp': {'gt': 'now-1m'}
                                # }
                                "range": {
                                    '@timestamp': {
                                        "gt": "{}T{}:00:00".format(self.fixed_date, hour),
                                        "lt": "{}T{}:59:59".format(self.fixed_date, hour),
                                        "time_zone": "Asia/Shanghai"
                                    }
                                }
                            }
                        ]
                    }
                }
            }
            # queryData = self.es.search(index=self.index_name, body=doc)
            # scroll 参数告诉 Elasticsearch 把搜索上下文再保持一分钟,1m表示1分钟
            # size 参数允许我们配置没匹配结果返回的最大命中数。每次调用 scroll API 都会返回下一批结果,直到不再有可以返回的结果,即命中数组为空。
            size = 1000  # 指定返回1000条
            queryData = self.es.search(index=self.index_name, body=doc, size=size, scroll='1m', )
            # print(queryData['hits']['total'])
    
            mdata = queryData.get("hits").get("hits")  # 返回查询的数据,不是所有数据,而是一页的数据,它是一个列表类型
            if not mdata:
                self.write_log('%s mdata is empty!' % project)
    
            # scroll_id 的值就是上一个请求中返回的 _scroll_id 的值
            scroll_id = queryData['_scroll_id']  # 获取scrollID
            total = queryData['hits']['total']  # 返回数据的总条数
            # print("查询项目{} {}点的数据,总共有{}条".format(project,hour,total))
            self.write_log("查询项目{} {}点的数据,总共有{}条".format(project, hour, total))
    
            # 使用divmod设置分页查询
            # divmod(total,1000)[0]+1 表示总条数除以1000,结果取整数加1
            for i in range(divmod(total, size)[0] + 1):
                res = self.es.scroll(scroll_id=scroll_id, scroll='1m')  # scroll参数必须指定否则会报错
                mdata += res["hits"]["hits"]  # 扩展列表
                # yield mdata
    
            # print(mdata)
            # return mdata
            yield mdata
    
        def create_folder(self):
            """
            创建年/月/日 文件夹
            :return: path
            """
    
            # 系统当前时间年份
            # year = time.strftime('%Y', time.localtime(time.time()))
            # # 月份
            # month = time.strftime('%m', time.localtime(time.time()))
            # # 日期
            # day = time.strftime('%d', time.localtime(time.time()))
    
            # 年月日
            year, month, day = self.fixed_date.split("-")
    
            # 具体时间 小时分钟毫秒
            # mdhms = time.strftime('%m%d%H%M%S', time.localtime(time.time()))
    
            # 判断基础目录是否存在
            if not os.path.exists(os.path.join(self.BASE_DIR, 'data_files')):
                os.mkdir(os.path.join(self.BASE_DIR, 'data_files'))
    
            # 年月日
            fileYear = os.path.join(self.BASE_DIR, 'data_files', year)
            fileMonth = os.path.join(fileYear, month)
            fileDay = os.path.join(fileMonth, day)
    
            # 判断目录是否存在,否则创建
            try:
                if not os.path.exists(fileYear):
                    os.mkdir(fileYear)
                    os.mkdir(fileMonth)
                    os.mkdir(fileDay)
                else:
                    if not os.path.exists(fileMonth):
                        os.mkdir(fileMonth)
                        os.mkdir(fileDay)
                    else:
                        if not os.path.exists(fileDay):
                            os.mkdir(fileDay)
    
                return fileDay
            except Exception as e:
                print(e)
                return False
    
        def main(self):
            """
            主要处理逻辑
            :return: 生成器
            """
            project_dic = {}  # 项目统计字典
            # fixed_date = datetime.datetime.strptime(fixed_date, "%Y-%m-%d")
            # strftime("%Y-%m-%d")
            # conv_date = fixed_date.strftime("%Y-%m-%d")
            # print(conv_date, type(conv_date))
            # exit()
            # now_hour = fixed_date.strftime('%H')  # 当前时间的小时
            # print(now_hour)
            # 24小时
            hour_list = ['{num:02d}'.format(num=i) for i in range(24)]
            # hour_list = ['{num:02d}'.format(num=i) for i in range(2)]
    
            # project="usercenter"
            # project_dic[project] = {now_hour: {}}  # 初始化字典
            for hour in hour_list:  # 遍历24小时
                # print("查询{}点的数据###############".format(hour))
                self.write_log("查询{}点的数据###############".format(hour))
                for project in project_list:  # 遍历项目列表
                    if not project_dic.get(project):
                        project_dic[project] = {}  # 初始化项目字典
    
                    if not project_dic[project].get(hour):
                        project_dic[project][hour] = {}  # 初始化项目小时字典
    
                    mdata = self.Get_Data_By_Body(project, hour)  # 获取数据
                    for item in mdata:  # 遍历生成器
                        for hit in item:  # 遍历返回数据
                            # hit是一个字典
                            str1 = hit['_source']['message']  # 查询指定字典
                            p1 = re.compile(r'[[](.*?)[]]', re.S)  # 最小匹配,匹配中括号的内容
                            for i in re.findall(p1, str1):  # 遍历结果
                                if i.startswith('r='):  # 判断以r=开头的
                                    rid = i.split("=")[1]  # 获取rid
                                    # print("rid",rid)
                                    # 判断字典中rid不存在时,避免字典键值重复
                                    if not project_dic[project][hour].get(rid):
                                        project_dic[project][hour][rid] = True  # 添加值
    
                time.sleep(1)  # 休眠1秒钟
    
            # return project_dic
            yield project_dic
    
    
    if __name__ == '__main__':
        # fixed_date = "2018-12-16"
        fixed_date = sys.argv[1]  # 日期参数
        if not ElasticObj.isVaildDate(fixed_date):
            print("日期不合法!")
            exit()
    
        startime = time.time()  # 开始时间
    
        index_name = "common-*"
        es_server = "192.168.92.131"
    
        obj = ElasticObj(index_name, es_server, fixed_date)  # 连接elasticsearch
        print("正在查询日期%s这一天的数据" % fixed_date)
        obj.write_log("###########################################")
        obj.write_log("正在查询日期%s这一天的数据" % fixed_date)
    
        data = obj.main()
        # print("初步结果",data)
    
        # fileDay = obj.create_folder()  # 创建目录
        # if not fileDay:
        #     # print("创建目录失败!")
        #     obj.write_log("创建目录失败!")
        #     exit()
    
        order_dic = OrderedDict()  # 实例化一个有序字典
        final_dic = {}  # 最终统计结果
        for dic in data:  # 遍历生成器
            for i in dic:  # 遍历数据
                if not final_dic.get(i):
                    final_dic[i] = order_dic  # 初始化字典
    
                # 有序字典必须先对普通字典key做排序
                for h in sorted(dic[i]):  # 遍历项目的每一个小时
                    # 统计字典的长度
                    final_dic[i][h] = len(dic[i][h])
    
        # print("最终结果",final_dic)  # 统计结果
        obj.write_log("最终结果执行完毕!")
    
        # 写入文件
        with open(os.path.join(obj.fileDay, 'access_data.txt'), encoding='utf-8', mode='a') as f:
            f.write(json.dumps(final_dic) + "\n")
    
        endtime = time.time()
        take_time = endtime - startime
    
        if take_time < 1:  # 判断不足1秒时
            take_time = 1  # 设置为1秒
        # 计算花费时间
        m, s = divmod(take_time, 60)
        h, m = divmod(m, 60)
    
        # print("本次花费时间 %02d:%02d:%02d" % (h, m, s))
        obj.write_log("统计日期%s这一天的数据完成!请查阅data_files目录的日志和数据文件" % fixed_date)
        obj.write_log("本次花费时间 %02d:%02d:%02d" % (h, m, s))

     

    日志文件和数据文件,都在年月日目录里面!

     

    本文参考链接:

    http://www.cnblogs.com/letong/p/4749234.html

    http://www.linuxyw.com/790.html

    https://www.cnblogs.com/blue163/p/8126156.html


关键字