aiomysql异步操作mysql

发布时间:2020-05-07 14:41:29编辑:admin阅读(2952)

    一、概述

    aiomysql是一个从asyncio(PEP-3156/tulip)框架访问MySQL数据库的库。它依赖并重用PyMySQL的大部分部分。aiomysql试图成为一个很棒的aiopg库,并保留相同的api、外观和感觉。

    在内部aimysql是PyMySQL的副本,底层io调用切换到async,基本上是等待并在适当的位置添加async def coroutine。从aiopg移植的sqlalchemy支持。

     

    安装模块

    pip3 install aiomysql

    简单示例

    import asyncio
    import aiomysql
    
    loop = asyncio.get_event_loop()
    
    
    async def test_example():
        conn = await aiomysql.connect(host='127.0.0.1', port=3306,
                                           user='root', password='', db='mysql',
                                           loop=loop)
    
        cur = await conn.cursor()
        await cur.execute("SELECT Host,User FROM user")
        print(cur.description)
        r = await cur.fetchall()
        print(r)
        await cur.close()
        conn.close()
    
    loop.run_until_complete(test_example())

     

    二、demo演示

    环境说明

    操作系统:centos 7.6

    mysql版本:5.7

    数据库名:test

    数据库默认编码:utf8mb4

    具体表结构以及数据,请参考链接:

    https://www.cnblogs.com/xiao987334176/p/12721498.html 

    这里面有2个表

     

    单次执行

     执行select和update

    #!/usr/bin/env python3
    # coding: utf-8
    """
    mysql 异步版本
    """
    import traceback
    import logging
    import aiomysql
    import asyncio
    import time
    
    logobj = logging.getLogger('mysql')
    
    
    class Pmysql:
        def __init__(self):
            self.coon = None
            self.pool = None
    
        async def initpool(self):
            try:
                logobj.debug("will connect mysql~")
                __pool = await aiomysql.create_pool(
                    minsize=5,  # 连接池最小值
                    maxsize=10,  # 连接池最大值
                    host='192.168.31.230',
                    port=3306,
                    user='root',
                    password='abcd1234',
                    db='test',
                    autocommit=True,  # 自动提交模式
                )
                return __pool
            except:
                logobj.error('connect error.', exc_info=True)
    
        async def getCurosr(self):
            conn = await self.pool.acquire()
            # 返回字典格式
            cur = await conn.cursor(aiomysql.DictCursor)
            return conn, cur
    
        async def query(self, query, param=None):
            """
            查询操作
            :param query: sql语句
            :param param: 参数
            :return:
            """
            conn, cur = await self.getCurosr()
            try:
                await cur.execute(query, param)
                return await cur.fetchall()
            except:
                logobj.error(traceback.format_exc())
            finally:
                if cur:
                    await cur.close()
                # 释放掉conn,将连接放回到连接池中
                await self.pool.release(conn)
    
        async def execute(self, query, param=None):
            """
            增删改 操作
            :param query: sql语句
            :param param: 参数
            :return:
            """
            conn, cur = await self.getCurosr()
            try:
                await cur.execute(query, param)
                if cur.rowcount == 0:
                    return False
                else:
                    return True
            except:
                logobj.error(traceback.format_exc())
            finally:
                if cur:
                    await cur.close()
                # 释放掉conn,将连接放回到连接池中
                await self.pool.release(conn)
    
    
    async def getAmysqlobj():
        mysqlobj = Pmysql()
        pool = await mysqlobj.initpool()
        mysqlobj.pool = pool
        return mysqlobj
    
    
    async def test_select():
        mysqlobj = await getAmysqlobj()
        # UPDATE `youku`.`person` SET `psName` = '张三丰' WHERE (`id` = '3');
        exeRtn = await mysqlobj.query("select * from users")
        # print("查询结果",exeRtn)
        return exeRtn
    
    
    async def test_update():
        mysqlobj = await getAmysqlobj()
        # UPDATE `youku`.`person` SET `psName` = '张三丰' WHERE (`id` = '3');
        exeRtn = await mysqlobj.execute("update users set username='xiao1' where id='1'")
        # print("exeRtn", exeRtn, type(exeRtn))
        if exeRtn:
            # print('操作成功')
            return '操作成功'
        else:
            # print('操作失败')
            return '操作失败'
    
    
    async def main():  # 调用方
        tasks = [test_select(), test_update()]  # 把所有任务添加到task中
        done, pending = await asyncio.wait(tasks)  # 子生成器
        for r in done:  # done和pending都是一个任务,所以返回结果需要逐个调用result()
            # print('协程无序返回值:'+r.result())
            print(r.result())
    
    
    if __name__ == '__main__':
        start = time.time()
        loop = asyncio.get_event_loop()  # 创建一个事件循环对象loop
        try:
            loop.run_until_complete(main())  # 完成事件循环,直到最后一个任务结束
        finally:
            loop.close()  # 结束事件循环
        print('所有IO任务总耗时%.5f秒' % float(time.time() - start))

    执行输出:

    操作成功
    [{'id': 1, 'username': 'xiao', 'password': '123', 'phone': '12345678910', 'email': '123@qq.com', 'create_time': datetime.datetime(2020, 4, 10, 1, 22, 7)}]
    所有IO任务总耗时0.03948秒

     

    批量插入

    批量插入使用executemany

    插入3万条数据

    #!/usr/bin/env python3
    # coding: utf-8
    
    import time
    import asyncio
    import aiomysql
    
    start = time.time()
    loop = asyncio.get_event_loop()
    
    async def test_example():
        conn = await aiomysql.connect(host='192.168.31.230', port=3306,
                                           user='root', password='abcd1234',
                                           db='test', loop=loop)
    
        # create default cursor
        cursor = await conn.cursor()
    
        # execute sql query
        data = []
        for i in range(1,30000):
            data.append(('xiao%s'%i, '123', '12345678910', '123@qq.com', '2020-04-10 01:22:07'),)
    
        stmt = "INSERT INTO users (username,password,phone,email,create_time) VALUES(%s,%s,%s,%s,%s);"
    
        await cursor.executemany(stmt, data)
        await conn.commit()
        # detach cursor from connection
        await cursor.close()
    
        # close connection
        conn.close()
    
    loop.run_until_complete(test_example())
    print('所有IO任务总耗时%.5f秒' % float(time.time() - start))

    执行输出:

    所有IO任务总耗时11.96885秒

     

    本文参考链接:

    https://www.cnblogs.com/ygy1997/p/11753335.html


关键字

上一篇: python3 asyncio

下一篇: Helm 从入门到实践