用 Python 实现的线程池

发布时间:2019-07-06 10:47:13编辑:auto阅读(1501)

    为了提高程序的效率,经常要用到多线程,尤其是IO等需要等待外部响应的部分。线程的创建、销毁和调度本身是有代价的,如果一个线程的任务相对简单,那这些时间和空间开销就不容忽视了,此时用线程池就是更好的选择,即创建一些线程然后反复利用它们,而不是在完成单个任务后就结束。

    下面是用Python实现的通用的线程池代码:

    1. import Queuethreadingsys  
    2. from threading import Thread  
    3. import time,urllib  
    4.   
    5. # working thread  
    6. class Worker(Thread):  
    7.     worker_count = 0  
    8.     def __init__self, workQueue, resultQueue, timeout = 0, **kwds):  
    9.         Thread.__init__self, **kwds )  
    10.         self.id = Worker.worker_count  
    11.         Worker.worker_count += 1  
    12.         self.setDaemon( True )  
    13.         self.workQueue = workQueue  
    14.         self.resultQueue = resultQueue  
    15.         self.timeout = timeout  
    16.   
    17.     def run( self ):  
    18.         ''' the get-some-work, do-some-work main loop of worker threads '''  
    19.         while True:  
    20.             try:  
    21.                 callable, args, kwds = self.workQueue.get(timeout=self.timeout)  
    22.                 res = callable(*args, **kwds)  
    23.                 print "worker[%2d]: %s" % (self.idstr(res) )  
    24.                 self.resultQueue.put( res )  
    25.             except Queue.Empty:  
    26.                 break  
    27.             except :  
    28.                 print 'worker[%2d]' % self.idsys.exc_info()[:2]  
    29.                   
    30. class WorkerManager:  
    31.     def __init__self, num_of_workers=10, timeout = 1):  
    32.         self.workQueue = Queue.Queue()  
    33.         self.resultQueue = Queue.Queue()  
    34.         self.workers = []  
    35.         self.timeout = timeout  
    36.         self._recruitThreads( num_of_workers )  
    37.   
    38.     def _recruitThreads( self, num_of_workers ):  
    39.         for i in range( num_of_workers ):  
    40.             worker = Worker( self.workQueue, self.resultQueue, self.timeout )  
    41.             self.workers.append(worker)  
    42.   
    43.     def start(self):  
    44.         for w in self.workers:  
    45.             w.start()  
    46.   
    47.     def wait_for_complete( self):  
    48.         # ...then, wait for each of them to terminate:  
    49.         while len(self.workers):  
    50.             worker = self.workers.pop()  
    51.             worker.join( )  
    52.             if worker.isAlive() and not self.workQueue.empty():  
    53.                 self.workers.append( worker )  
    54.         print "All jobs are are completed."  
    55.   
    56.     def add_job( selfcallable, *args, **kwds ):  
    57.         self.workQueue.put( (callable, args, kwds) )  
    58.   
    59.     def get_result( self, *args, **kwds ):  
    60.         return self.resultQueue.get( *args, **kwds )  

    Worker类是一个工作线程,不断地从workQueue队列中获取需要执行的任务,执行之,并将结果写入到resultQueue中,这里的workQueue和resultQueue都是现成安全的,其内部对各个线程的操作做了互斥。当从workQueue中获取任务超时,则线程结束。

    WorkerManager负责初始化Worker线程,提供将任务加入队列和获取结果的接口,并能等待所有任务完成。

    一个典型的测试例子如下,它用10个线程去下载一个固定页面的内容,实际应用时应该是执行不同的任务。

    1. def test_job(id, sleep = 0.001 ):  
    2.     try:  
    3.         urllib.urlopen('https://www.gmail.com/').read()  
    4.     except:  
    5.         print '[%4d]' % idsys.exc_info()[:2]  
    6.     return  id  
    7.   
    8. def test():  
    9.     import socket  
    10.     socket.setdefaulttimeout(10)  
    11.     print 'start testing'  
    12.     wm = WorkerManager(10)  
    13.     for i in range(500):  
    14.         wm.add_job( test_job, i, i*0.001 )  
    15.     wm.start()  
    16.     wm.wait_for_complete()  
    17.     print 'end testing' 

关键字

上一篇: python format函数

下一篇: 常用的python随机数