python selenium redis队列提高效率

发布时间:2019-04-21 22:24:04编辑:Run阅读(5336)

    爬虫架构流程图(基于生产者消费模型)


    爬虫架构二.png



    整个程序被解耦成两部分:(先进先出,后进先出)

    1  数据生产者  页面解析(获取url,title,状态码,图片地址,文本信息等等....) 使用(lpush或rpush,看需求而定)往reids队列中存数据

    2  数据消费者  图片下载,计算图片dhash值  使用(blpop或brpop阻塞等待,如果redis队列中没有值,会一直阻塞) 或 lpop,rpop取redis队列中的数据,看需求而定


    整个项目代码有点长,就不一一贴了,写关于reids的关键部分


    数据生产者

    def connect_redis_pool(self, url_dict):
        try:
            conn_pool = redis.ConnectionPool(host='ip地址', port=端口, db=0, password='redis密码')
            re_pool = redis.Redis(connection_pool=conn_pool)
            if re_pool:
                self.log.info("redis connect success")
                re_pool.lpush("img_info", json.dumps(url_dict))
                self.log.info("redis insert data img_info success")
        except Exception as e:
            self.log.error(e)


    协程池开启多个协程往redis队列中存数据

    from gevent import monkey;monkey.patch_all()
    import gevent.pool
    from link_spider import LinkSpider
    import json
    
    
    if __name__ == '__main__':
        url_list = []
        with open('linkurl.json', encoding='utf-8', mode='r') as f:
            all_content = json.loads(f.read())
            for i in all_content:
                url_list.append(i['link_url'])
        res_l = []
        p = gevent.pool.Pool(10)
        st = LinkSpider()
        for i in url_list:
            res_l.append(p.spawn(st.setup_chrome, i))
        gevent.joinall(res_l)



    数据消费者

    def connect_redis(redis_connection_number):
        try:
            conn_pool = redis.ConnectionPool(host='ip地址', port=端口, db=0, password='redis密码')
            re_pool = redis.Redis(connection_pool=conn_pool)
            result = re_pool.rpop("img_info")
            url_dict = json.loads(result)
            log.info("redis task:{} --> 执行任务:{}".format(redis_connection_number, url_dict['source_url']))
            all_url_dict = SaveData().download_img_and_hashlib(url_dict)
            SaveMysql().run(all_url_dict)
        except Exception as e:
            log.error(e)


    异步线程池开启多个线程来下载图片,将最终结果保存到数据库中

    import redis
    import json
    import time
    from save_mysql import SaveMysql
    from mylog import log
    from multiprocessing import Pool
    from get_redis_data import SaveData
    
    
    if __name__ == '__main__':
        conn_pool = redis.ConnectionPool(host='ip地址', port=端口, db=0, password='redis密码')
        re_pool = redis.Redis(connection_pool=conn_pool)
        st = SaveData()
        p = Pool(10)
        res_l = []
        while True:
            time.sleep(3)
            # 查看redis队列数
            result = re_pool.llen("img_info")
            log.info("当前redis队列数: {}".format(result))
            if result >= 1:
                for i in range(result):
                    res = p.apply_async(connect_redis, args=(i,))
                    res_l.append(res)
                for res in res_l:
                    res.get()
            else:
                continue

    每隔3秒检测redis队列是否存在数据,有多少数据,就开启多少任务,然后调用线程池中的线程去执行,直到所有数据执行完毕,保存到mysql中


    总结:通过对程序的解耦,将耗时的图片下载部分拿出来,另外去执行。

关键字