python MySQL 插入Elasticsearch

发布时间:2020-02-22 11:46:39编辑:admin阅读(2466)

    一、需求分析

    注意: 本环境使用 elasticsearch 7.0版本开发,切勿低于此版本

    mysql 表结构

    有一张表,记录的数据特别的多,需要将7天前的记录,插入到Elasticsearch中,并删除原有表7天前的记录。

    表结构如下:

    CREATE TABLE `historic_records` (
      `id` bigint(20) NOT NULL AUTO_INCREMENT,
      `user_id` varchar(50) NOT NULL DEFAULT '' COMMENT '用户id',
      `time` bigint(20) NOT NULL DEFAULT '0' COMMENT '上线/下线时间',
      `create_time` bigint(20) NOT NULL DEFAULT '0' COMMENT '创建时间',
      `update_time` bigint(20) NOT NULL DEFAULT '0' COMMENT '更新时间',
      `online_status` tinyint(1) NOT NULL DEFAULT '0' COMMENT '在线状态 默认1 0 离线 1 在线',
      `status` tinyint(1) NOT NULL DEFAULT '1' COMMENT '软删除标志:0-已删除;1-正常',
      PRIMARY KEY (`id`),
      KEY `user_id` (`user_id`),
      KEY `order_index` (`time`,`create_time`,`update_time`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='历史记录表';

     

    查询sql:

    select * from historic_records where create_time < unix_timestamp(date_add(cast(sysdate() as date), interval -7 day)) * 1000


    删除sql:

    delete from historic_records where create_time < unix_timestamp(date_add(cast(sysdate() as date), interval -7 day)) * 1000


    ES中的一些概念

    index(索引)

    相当于mysql中的数据库

    type(类型)

    相当于mysql中的一张表

    document(文档)

    相当于mysql中的一行(一条记录)

    field(域)

    相当于mysql中的一列(一个字段)

    节点

    一个服务器,由一个名字来标识

    集群

    一个或多个节点组织在一起

    分片

    将一份数据划分为多小份的能力,允许水平分割和扩展容量。多个分片可以响应请求,提高性能和吞吐量。

    副本

    复制数据,一个节点出问题时,其余节点可以顶上。

    倒排索引

    可参考 https://www.elastic.co/guide/cn/elasticsearch/guide/current/inverted-index.html

     

    es数据结构

    设定映射,规定好各个字段及其数据类型,便于es更好地进行管理。根据mysql表结构,映射如下:

    # 创建映射
    _index_mappings = {
        "settings": {
            "index": {
                "number_of_shards": 3,
                "number_of_replicas": 1
            }
        },
        "mappings": {
            # self.index_type : {
                "properties": {
                    "id": {"type": "long"},
                    "loid": {"type": "keyword"},
                    "mac": {"type": "keyword"},
                    "time": {
                        "type": "date",
                        "format": "epoch_millis"
                    },
                    "create_time": {
                        "type": "date",
                        "format": "epoch_millis"
                    },
                    "update_time": {
                        "type": "date",
                        "format": "epoch_millis"
                    },
                    "online_status": {"type": "short"},
                    "status": {"type": "short"}
                }
            # }
        }
    }

     

    解释:

    索引设置,都在 settings{...} 中

    number_of_shards
    每个索引的主分片数,默认值是 5 。这个配置在索引创建后不能修改。


    number_of_replicas
    每个主分片的副本数,默认值是 1 。对于活动的索引库,这个配置可以随时修改。

     

    映射配置,都在mappings{...} 中

    属性设置,都在 properties{...} 中

     

    Elasticsearch 支持 如下简单域类型:

    • 字符串: string

    • 整数 : byteshortintegerlong

    • 浮点数: floatdouble

    • 布尔型: boolean

    • 日期: date

     

    仔细看上面的mysql 表结构

    由于 id 的类型是 bigint(20),那么在es中就是 long,表示长整形。

    user_id 的类型是 varchar(50) ,在es中,有2中,分别是 text和 keyword。

    这2种,是有区别的。text 会创建全文索引,支持模糊搜索。而keyword则不会,必须精确搜索才行。

    由于 user_id不需要模糊搜索,因此 设置 keyword才是合理的。

     

    create_time 虽然类型是 bigint(20),但是它存储在mysql里面,表示时间戳。

    因此es中就是data,时间格式为:epoch_millis,表示微秒时间戳。

     

    online_status 的类型是tinyint(1),在es中是 short,表示短的数字

     

    三、elasticsearch和kibana搭建

    elasticsearch

    新建目录elasticsearch

    mkdir /opt/elasticsearch-7.1.1

    目录结构如下:

    ./
    ├── dockerfile
    ├── elasticsearch-7.1.1-amd64.deb
    ├── run.sh
    └── sources.list

     

    dockerfile

    FROM ubuntu:16.04
    # 修改更新源为阿里云
    ADD sources.list /etc/apt/sources.list
    ADD elasticsearch-7.1.1-amd64.deb ./
    # 安装jdk和elasticsearch
    RUN apt-get update && apt-get install -y openjdk-8-jdk --allow-unauthenticated && apt-get clean all && dpkg -i elasticsearch-7.1.1-amd64.deb && rm -rf elasticsearch-7.1.1-amd64.deb
    EXPOSE 9200
    # 添加启动脚本
    ADD run.sh .
    RUN chmod 755 run.sh
    ENTRYPOINT [ "/run.sh"]

     

    run.sh

    #!/bin/bash
    set -e
    
    # 添加时区
    TZ=Asia/Shanghai
    ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
    
    # 覆盖配置文件
    cp /etc/elasticsearch/elasticsearch.yml /etc/elasticsearch/elasticsearch.yml.bak
    echo "transport.host: localhost
    transport.tcp.port: 9300
    http.port: 9200
    network.host: 0.0.0.0" >> /etc/elasticsearch/elasticsearch.yml
    
    # 修改启动文件,去掉-d参数,避免后台运行
    sed -i 72's@-d -p $PID_FILE@-p $PID_FILE@g' /etc/init.d/elasticsearch
    
    # 启动elasticsearch,要hold住,否则容器启动就退出了!
    /etc/init.d/elasticsearch start

     

    sources.list

    deb http://mirrors.aliyun.com/ubuntu/ xenial main restricted
    deb http://mirrors.aliyun.com/ubuntu/ xenial-updates main restricted
    deb http://mirrors.aliyun.com/ubuntu/ xenial universe
    deb http://mirrors.aliyun.com/ubuntu/ xenial-updates universe
    deb http://mirrors.aliyun.com/ubuntu/ xenial multiverse
    deb http://mirrors.aliyun.com/ubuntu/ xenial-updates multiverse
    deb http://mirrors.aliyun.com/ubuntu/ xenial-backports main restricted universe multiverse
    deb http://mirrors.aliyun.com/ubuntu xenial-security main restricted
    deb http://mirrors.aliyun.com/ubuntu xenial-security universe
    deb http://mirrors.aliyun.com/ubuntu xenial-security multiverse


    生成镜像

    docker build -t elasticsearch-7.1.1 .

     

    启动容器

    docker run -d -it --restart=always -p 9200:9200 elasticsearch-7.1.1

     

    访问页面

    1.png

     

     

    kibana

    新建目录kibana

    mkdir /opt/kibana-7.1.1

    目录结构如下:

    ./
    ├── dockerfile
    ├── kibana-7.1.1-amd64.deb
    └── run.sh


    dockerfile

    FROM ubuntu:16.04
    ADD kibana-7.1.1-amd64.deb ./
    # 安装jdk和elasticsearch
    RUN dpkg -i kibana-7.1.1-amd64.deb && rm -rf kibana-7.1.1-amd64.deb
    EXPOSE 5601
    # 添加启动脚本
    ADD run.sh .
    RUN chmod 755 run.sh
    ENTRYPOINT [ "/run.sh"]

     

    run.sh

    #!/bin/bash
    
    # 添加时区
    TZ=Asia/Shanghai
    ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
    
    #elasticsearch="192.168.91.128"
    if [ -z $elasticsearch ];then
        echo "elasticsearch参数为空!比如: 192.168.91.128"
        exit
    fi
    
    # 修改配置文件
    # 修改监听地址
    sed -i '7s@#server.host: "localhost"@server.host: "0.0.0.0"@g' /etc/kibana/kibana.yml
    # 删除行,并添加一行内容
    sed -i '28d' /etc/kibana/kibana.yml
    sed -i "N;28 i elasticsearch.hosts: ["http://$elasticsearch:9200"]" /etc/kibana/kibana.yml
    
    # 启动
    /usr/share/kibana/bin/kibana "-c /etc/kibana/kibana.yml"

     

    生成镜像

    docker build -t kibana-7.1.1 .

     

    启动镜像

    docker run -d -it --restart=always -p 5601:5601 -e elasticsearch=192.168.10.104 kibana-7.1.1

     

    访问页面

    1.png

     

    二、查询mysql数据

    为了方便操作 mysql,封装了一个mysql工具类,用来查询和更新数据。

    mysql.py

    #!/usr/bin/env python3
    # coding: utf-8
    
    import pymysql
    
    class Mysql(object):
        # mysql 端口号,注意:必须是int类型
        def __init__(self,host,user,passwd,db_name,port=3306):
            self.host = host
            self.user = user
            self.passwd = passwd
            self.db_name = db_name
            self.port = port
    
        def select(self,sql):
            """
            执行sql命令
            :param sql: 命令
            :return: 元祖
            """
            try:
                conn = pymysql.connect(
                    host=self.host,
                    user=self.user,
                    passwd=self.passwd,
                    port=self.port,
                    database=self.db_name,
                    charset='utf8',
                    cursorclass=pymysql.cursors.DictCursor
                )
                cur = conn.cursor()  # 创建游标
                cur.execute(sql)  # 执行sql命令
                res = cur.fetchall()  # 获取执行的返回结果
                cur.close()
                conn.close()  # 关闭mysql 连接
                return res
            except Exception as e:
                print(e)
                return False
    
        def update(self,sql):
            """
            更新操作,比如insert, delete,update
            :param sql: sql命令
            :return: bool
            """
            try:
                conn = pymysql.connect(
                    host=self.host,
                    user=self.user,
                    passwd=self.passwd,
                    port=self.port,
                    database=self.db_name,
                )
                cur = conn.cursor(cursor=pymysql.cursors.DictCursor)  # 创建游标
                # conn.cursor()
                # print("ip: {} insert 执行命令: {}".format(self.host,sql))
                sta = cur.execute(sql)  # 执行sql命令,返回影响的行数
                # print("sta",sta,type(sta))
                #res = cur.fetchall()  # 获取执行的返回结果
                if isinstance(sta,int):  # 判断返回结果, 是数字就是正常的
                    #print('插入记录 Done')
                    pass
                    # write_log('正常,远程执行sql: %s 成功'%sql, "green")
                else:
                    write_log('错误,远程执行sql: %s 失败'%sql, "red")
                    return False
    
                conn.commit()  # 主动提交,否则执行sql不生效
                cur.close()
                conn.close()  # 关闭mysql 连接
                return sta
            except Exception as e:
                print(e)
                # write_log('错误,远程mysql执行命令: {} 异常'.format(sql), "red")
                return False

     

    使用时,就简单了。导入这个类,调用相关方法。

    mysql_test.py

    from mysql import Mysql
    
    
    host = "192.168.0.179"
    user = "sdn_db"
    passwd = "Sdn@ujmyhn"
    db_name = "terminalservice"
    port = 3306
    
    sql = "select * from terminals_record_0 where create_time < unix_timestamp(date_add(cast(sysdate() as date), interval -7 day)) * 1000"
    res = Mysql(host,user,passwd,db_name,port).select(sql)
    print(res)

     

     

    三、完整代码

    由于时间关系,代码不一一解释了。附上完整代码:

    ./
    ├── conf.py
    ├── es_bulk.py
    ├── README.md
    ├── requirements.txt
    └── utils
        ├── common.py
        └── mysql.py


    conf.py

    #!/usr/bin/env python3
    # coding: utf-8
    """
    配置文件,用于mysql和elasticsearch
    """
    import os
    BASE_DIR = os.path.dirname(os.path.abspath(__file__))  # 项目根目录
    
    # mysql
    HOST = "192.168.0.136"
    USER = "root"
    PASSWD = "123456"
    DB_NAME = "terminal"
    PORT = 3306
    
    # elasticsearch
    INDEX_NAME = "historic_records"
    INDEX_TYPE = "_doc"
    ES_IP = "192.169.3.133"
    
    MAXIMUM = 100  # 一次性插入多少条

     

    es_bulk.py

    #!/usr/bin/env python3
    # coding: utf-8
    
    import time
    from elasticsearch import Elasticsearch
    from elasticsearch import helpers
    
    import conf
    from utils.mysql import Mysql
    from utils.common import write_log,valid_ip,check_tcp
    
    
    class ElasticObj:
        def __init__(self,timeout=3600):
            '''
            :param timeout: 超时时间
            '''
            self.index_name = conf.INDEX_NAME  # 索引名称
            self.index_type = conf.INDEX_TYPE  # 索引类型
            self.es_ip = conf.ES_IP  # es ip
    
            # 无用户名密码状态
            self.es = Elasticsearch([self.es_ip], port=9200, timeout=timeout)
            # 用户名密码状态
            # self.es = Elasticsearch([self.es_ip], http_auth=('esadm', 'mdase123'), port=9200, timeout=timeout)
    
        def create_index(self):
            '''
            创建索引
            :return: bool
            '''
            # 创建映射
            _index_mappings = {
                # 索引配置
                "settings": {
                    "index": {
                        "number_of_shards": 3,  # 分片数
                        "number_of_replicas": 1  # 副本数
                    }
                },
                # 设置字段
                "mappings": {
                    "properties": {
                        "id": {"type": "long"},
                        "loid": {"type": "keyword"},
                        "mac": {"type": "keyword"},
                        "time": {
                            "type": "date",
                            "format": "epoch_millis"
                        },
                        "create_time": {
                            "type": "date",
                            "format": "epoch_millis"
                        },
                        "update_time": {
                            "type": "date",
                            "format": "epoch_millis"
                        },
                        "online_status": {"type": "short"},
                        "status": {"type": "short"}
                    }
                }
            }
            # 判断索引不存在时
            if self.es.indices.exists(index=self.index_name) is not True:
                # 创建索引
                res = self.es.indices.create(index=self.index_name, body=_index_mappings)
                # print(res)
                if not res:
                    write_log("错误,创建索引{}失败".format(self.index_name),"red")
                    return False
    
                write_log("正常,创建索引{}成功".format(self.index_name), "green")
                return True
            else:
                write_log("正常,索引{}已存在".format(self.index_name), "green")
                return True
    
        def bulk_insert(self,table,data_list):
            """
            批量写入数据
            :param table: 表名
            :param data_list: 数据列表
            [
                {
                    'online_status': 1,
                    'update_time': 1556073035327,
                    'create_time': 1556073035327,
                    'id': 1, 'status': 1,
                    'time': 1556073035327,
                    'loid': '100010000123',
                    'mac': '60:45:cb:87:c9:93'
                },
                ...
            ]
            :return: bool
            """
            # 批量插入
            start_time = time.time()  # 开始时间
            actions = []  # 临时数据列表
            i = 0  # 计数值
    
            try:
                # 循环数据列表
                for data in data_list:
                    action = {
                        "_index": self.index_name,
                        "_type": self.index_type,
                        #"_id": i,  #_id 也可以默认生成,不赋值
                        "_source": {
                            'id': data['id'],
                            'user_id': data['user_id'],
                            'time': data['time'],
                            'create_time': data['create_time'],
                            'online_status': data['online_status'],
                            'status': data['status'],
                        }
                    }
                    i += 1
                    actions.append(action)  # 添加到列表
                    if len(action) == conf.MAXIMUM:  # 列表数量达到100时
                        helpers.bulk(self.es, actions)  # 批量插入数据
                        del actions[0:len(action)]  # 删除列表元素
    
                if i > 0:  # 不足100时,插入剩余数据
                    helpers.bulk(self.es, actions)
    
                end_time = time.time()  # 结束时间
                t = round((end_time - start_time),2)  # 计算耗时
                # print('本次共写入{}条数据,用时{}s'.format(i, t))
                write_log("正常,{} 表写入ES {}条数据,用时{}s".format(table,i, t), "green")
                return True
            except Exception as e:
                print(e)
                return False
    
        def has_table(self,db_name,target_table):
            """
            远程表是否存在
            :return: bool
            """
            mysql_obj = Mysql(conf.HOST, conf.USER, conf.PASSWD, conf.DB_NAME, conf.PORT)
            sql = "select count(1) from {}.{}".format(db_name, target_table)
            res = mysql_obj.select(sql)
            # print("表是否存在",res,type(res))
    
            if res is False:
                write_log("错误,远程表 {}.{} 不存在".format(db_name,target_table),"red")
                return False
            else:
                return True
    
        def has_conf(self):
            """
            判断配置文件中的mysql和es 端口是否正常
            :return:
            """
            if not valid_ip(conf.HOST):
                write_log("错误,MySQL IP配置不正确","red")
                return False
    
            if not valid_ip(conf.ES_IP):
                write_log("错误,ES IP配置不正确","red")
                return False
    
            if not check_tcp(conf.HOST,conf.PORT):
                write_log("错误,MySQL {} 端口不可达".format(conf.PORT),"red")
                return False
    
            if not check_tcp(conf.ES_IP,9200):
                write_log("错误,ES 9200 端口不可达","red")
                return False
    
            return True
    
        def read_mysql_es(self):
            """
            读取7天的记录,并写入es
            :return: bool
            """
            # 判断配置文件中的mysql和es 端口是否正常
            if not self.has_conf():
                # print(1)
                return False
    
            # 创建索引
            if self.create_index() is False:
                # print(2)
                return False
    
            max = conf.MAXIMUM  # 一次性查询多少条
            
            flag_list = []  # 标志位列表
            mysql_obj = Mysql(conf.HOST, conf.USER, conf.PASSWD, conf.DB_NAME, conf.PORT)
            for i in range(64):  # 写入64张表
                # 判断表是否存在
                res = self.has_table(conf.DB_NAME,'historic_record_%s'%i)
                if not res:
                    flag_list.append(False)
                    return False
    
                id = 0  # 每一次查询后的最大id
                while True:
                    # 查询数据
                    sql = "select * from historic_record_%s where create_time < unix_timestamp(date_add(cast(sysdate() as date), interval -7 day)) * 1000 and id > %s order by id limit %s" % (
                    i, id, max)
                    # print(sql)
                    data_list = mysql_obj.select(sql)
                    # print(data_list)
                    if not data_list:  # 当结果为空时,结束循环
                        write_log("警告,执行sql: %s 记录为空,无需写入es" %(sql), "yellow")
                        break  # 跳出循环
    
                    last_row = data_list[-1]  # 最后一行记录
                    # print(last_row)
                    id = last_row['id']  # 修改最大id
    
                    res = self.bulk_insert('historic_record_%s' % i, data_list)
                    if not res:
                        write_log("错误,historic_record_%s 写入ES 失败"%i,"red")
                        flag_list.append(False)
                        return False
    
            if False in flag_list:
                write_log("错误,historic_record 部分表写入ES错误,请查看上文","red")
                return False
    
            write_log("正常,historic_record 64张表全部写入ES成功", "green")
            return True
    
        def delete_record(self):
            """
            删除7天的表数据
            :return: bool
            """
            max = conf.MAXIMUM  # 一次性查询多少条
            flag_list = []
            mysql_obj = Mysql(conf.HOST, conf.USER, conf.PASSWD, conf.DB_NAME, conf.PORT)
            for i in range(64):  # 64张表
                # 判断表是否存在
                res = self.has_table(conf.DB_NAME, 'historic_record_%s' % i)
                if not res:
                    flag_list.append(False)
                    return False
    
                ### 先查询数据
                id = 0  # 每一次查询后的最大id
                while True:
                    # 查询数据
                    sql = "select * from historic_record_%s where create_time < unix_timestamp(date_add(cast(sysdate() as date), interval -7 day)) * 1000 and id > %s order by id limit %s" % (
                        i, id, max)
                    # print(sql)
                    data_list = mysql_obj.select(sql)
                    # print(data_list)
                    if not data_list:  # 当结果为空时,结束循环
                        write_log("警告,执行sql: %s 记录为空,无需删除" % sql, "yellow")
                        break  # 跳出循环
    
                    ### 再删除数据
                    sql = "delete from historic_record_%s where create_time < unix_timestamp(date_add(cast(sysdate() as date), interval -7 day)) * 1000 and id > %s order by id limit %s" % (
                        i, id, max)
                    # print(sql)
                    res = mysql_obj.update(sql)
                    if res is False:
                        write_log("错误,删除 historic_record_%s 记录失败" % i, "red")
                        flag_list.append(False)
                        break
                    else:
                        write_log("正常,删除 historic_record_%s 记录成功" % i, "green")
                        
                    last_row = data_list[-1]  # 最后一行记录
                    # print(last_row)
                    id = last_row['id']  # 修改最大id
    
    
            if False in flag_list:
                write_log("错误,删除 historic_record 部分表失败,请查看上文", "red")
                return False
    
            write_log("正常,删除 historic_record 64张表记录全部成功", "green")
    
        def main(self):
            self.read_mysql_es()
            self.delete_record()
    
    ElasticObj().main()  # 执行主程序

     

    common.py

    #!/usr/bin/env python3
    # coding: utf-8
    """
    共有的方法
    """
    
    import sys
    import io
    
    def setup_io():  # 设置默认屏幕输出为utf-8编码
        sys.stdout = sys.__stdout__ = io.TextIOWrapper(sys.stdout.detach(), encoding='utf-8', line_buffering=True)
        sys.stderr = sys.__stderr__ = io.TextIOWrapper(sys.stderr.detach(), encoding='utf-8', line_buffering=True)
    setup_io()
    
    
    import os
    import time
    import conf
    import socket
    import subprocess
    import ipaddress
    from multiprocessing import cpu_count
    
    def write_log(content,colour='white',skip=False):
        """
        写入日志文件
        :param content: 写入内容
        :param colour: 颜色
        :param skip: 是否跳过打印时间
        :return:
        """
        # 颜色代码
        colour_dict = {
            'red': 31,  # 红色
            'green': 32,  # 绿色
            'yellow': 33,  # 黄色
            'blue': 34,  # 蓝色
            'purple_red': 35,  # 紫红色
            'bluish_blue': 36, # 浅蓝色
            'white': 37,  # 白色
        }
        choice = colour_dict.get(colour)  # 选择颜色
    
    
        path = os.path.join(conf.BASE_DIR,"output.log") # 日志文件
        with open(path, mode='a+', encoding='utf-8') as f:
            if skip is False:  # 不跳过打印时间时
                content = time.strftime('%Y-%m-%d %H:%M:%S') + ' ' + content
    
            info = "\033[1;{};1m{}\033[0m".format(choice, content)
            print(info)
            f.write(content+"\n")
            
    def execute_linux2(cmd, timeout=10, skip=False):
        """
        执行linux命令,返回list
        :param cmd: linux命令
        :param timeout: 超时时间,生产环境, 特别卡, 因此要3秒
        :param skip: 是否跳过超时
        :return: list
        """
        p = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=True)
        # print(p)
        # timeout = 1  # 超时时间
        t_beginning = time.time()  # 开始时间
        # seconds_passed = 0  # 执行时间
        while True:
            if p.poll() is not None:
                break
            seconds_passed = time.time() - t_beginning
            if timeout and seconds_passed > timeout:
                p.terminate()
                # raise TimeoutError(cmd, timeout)
                if not skip:
                    # self.res.code = 500
                    # print('命令: {},执行超时!'.format(cmd))
                    write_log('错误, 命令: {},本地执行超时!'.format(cmd),"red")
                    # return self.res.__dict__
                    return False
                    # return '命令: {},执行超时!'.format(cmd)
    
        # result = p.stdout.read().decode('utf-8').strip()  # 命令运行结果
        # print("result",result)
        # self.res.data = result
        # return self.res.__dict__
        result = p.stdout.readlines()
        return result
    
    def valid_ip(ip):
        """
        验证ip是否有效,比如192.168.1.256是一个不存在的ip
        :return: bool
        """
        try:
            # 判断 python 版本
            if sys.version_info[0] == 2:
                ipaddress.ip_address(ip.strip().decode("utf-8"))
            elif sys.version_info[0] == 3:
                # ipaddress.ip_address(bytes(ip.strip().encode("utf-8")))
                ipaddress.ip_address(ip)
    
            return True
        except Exception as e:
            print(e)
            return False
    
    def check_tcp(ip, port, timeout=1):
        """
        检测tcp端口
        :param ip: ip地址
        :param port: 端口号
        :param timeout: 超时时间
        :return: bool
        """
        flag = False
        try:
            socket.setdefaulttimeout(timeout)  # 整个socket层设置超时时间
            cs = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            address = (str(ip), int(port))
            status = cs.connect_ex((address))  # 开始连接
            cs.settimeout(timeout)
    
            if not status:
                flag = True
    
            return flag
        except Exception as e:
            print(e)
            return flag
    
    COROUTINE_NUMBER = cpu_count()  # 协程池数量,根据cpu核心数来开,避免cpu飙高

     

    mysql.py

    #!/usr/bin/env python3
    # coding: utf-8
    
    import pymysql
    from utils.common import write_log
    
    class Mysql(object):
        # mysql 端口号,注意:必须是int类型
        def __init__(self,host,user,passwd,db_name,port=3306):
            self.host = host
            self.user = user
            self.passwd = passwd
            self.db_name = db_name
            self.port = port
    
        def select(self,sql):
            """
            执行sql命令
            :param sql: 命令
            :return: 元祖
            """
            try:
                # print(host,self.user,self.passwd,self.port,self.db_name)
                conn = pymysql.connect(
                    host=self.host,
                    user=self.user,
                    passwd=self.passwd,
                    port=self.port,
                    database=self.db_name,
                    charset='utf8',
                    cursorclass=pymysql.cursors.DictCursor
                )
                cur = conn.cursor()  # 创建游标
                # conn.cursor()
                cur.execute(sql)  # 执行sql命令
                res = cur.fetchall()  # 获取执行的返回结果
                cur.close()
                conn.close()  # 关闭mysql 连接
                return res
            except Exception as e:
                print(e)
                return False
    
        def update(self,sql):
            """
            更新操作,比如insert, delete,update
            :param sql: sql命令
            :return: bool
            """
            try:
                conn = pymysql.connect(
                    host=self.host,
                    user=self.user,
                    passwd=self.passwd,
                    port=self.port,
                    database=self.db_name,
                )
                cur = conn.cursor(cursor=pymysql.cursors.DictCursor)  # 创建游标
                # conn.cursor()
                # print("ip: {} insert 执行命令: {}".format(self.host,sql))
                sta = cur.execute(sql)  # 执行sql命令,返回影响的行数
                # print("sta",sta,type(sta))
                #res = cur.fetchall()  # 获取执行的返回结果
                if isinstance(sta,int):  # 判断返回结果, 是数字就是正常的
                    #print('插入记录 Done')
                    pass
                    # write_log('正常,远程执行sql: %s 成功'%sql, "green")
                else:
                    write_log('错误,远程执行sql: %s 失败'%sql, "red")
                    return False
    
                conn.commit()  # 主动提交,否则执行sql不生效
                cur.close()
                conn.close()  # 关闭mysql 连接
                #Migration.flag_list.append(True)
                return sta
            except Exception as e:
                print(e)
                # write_log('错误,远程mysql执行命令: {} 异常'.format(sql), "red")
                # Migration.flag_list.append(False)
                return False

     

    requirements.txt

    PyMySQL==0.9.2
    elasticsearch==6.3.1

     

    README.md

    ## 说明
    终端历史记录表,写入到elasticsearch中。
    
    主要将(terminal.historic_record_0~63) 这64张表的7天前数据写入到elasticsearch中
    
    并删除 64张表的7天前记录
    
    `注意: 本环境使用 elasticsearch 7.0版本开发,切勿低于此版本`
    
    
    ## 配置说明
    `conf.py` 是环境配置
    
    主要修改 以下信息
    ```python
    # mysql
    HOST = "192.168.0.136"
    USER = "root"
    PASSWD = "123456"
    DB_NAME = "terminal"
    PORT = 3306
    
    # elasticsearch
    INDEX_NAME = "historic_record"
    INDEX_TYPE = "_doc"
    ES_IP = "192.169.3.133"
    ```
    
    请根据实际情况修改以上变量
    
    ## 运行说明
    ## 一键执行,迁移相关所有表
    `python es_bulk.py`
    
    ## 查看结果
    结果会输出到`output.log`文件,直接查看即可!
    
    登录到`kibana`,查看数据是否存在
    
    <br/>
    <br/>
    Copyright (c) 2019-present, xiao You

     

    注意:如果是es 6.x的版本,创建索引,需要增加 index_type,否则会报错。

    比如:

    # 创建映射
    _index_mappings = {
        # 索引配置
        "settings": {
            "index": {
                "number_of_shards": 3,  # 分片数
                "number_of_replicas": 1  # 副本数
            }
        },
        # 设置字段
        "mappings": {
            self.index_type: {
                "properties": {
                    "id": {"type": "long"},
                    "loid": {"type": "keyword"},
                    "mac": {"type": "keyword"},
                    "time": {
                        "type": "date",
                        "format": "epoch_millis"
                    },
                    "create_time": {
                        "type": "date",
                        "format": "epoch_millis"
                    },
                    "update_time": {
                        "type": "date",
                        "format": "epoch_millis"
                    },
                    "online_status": {"type": "short"},
                    "status": {"type": "short"}
                }
            }
        }
    }

     

     

    本文参考链接:

    https://www.cnblogs.com/aaanthony/p/7380662.html

    https://blog.csdn.net/m0_37673307/article/details/81153700

     


关键字