python消息队列Queue

发布时间:2019-09-19 08:04:45编辑:auto阅读(1491)

    实例1:消息队列Queue,不要将文件命名为“queue.py”,否则会报异常“ImportError: cannot import name 'Queue'”

    #coding=utf-8
    from multiprocessing import Queue 
    
    q = Queue(3)#初始化一个Queue对象,最多可接收三条put消息
    q.put('message-1')
    q.put('message-2')
    print(q.full())#False,是否满了
    q.put('message-3')
    print(q.full())#True
    
    #因为消息队列已满,下面的try都会抛出异常,第一个try会等待2秒后再抛出异常,第二个try会立即抛出异常
    try:
    	q.put('message-4',True,2)
    except:
    	print('except1,消息队列已满,现有消息数量:%s'%q.qsize())
    
    try:
    	q.put_nowait('message-4')
    except:
    	print('except2,消息队列已满,现有消息数量:%s'%q.qsize())
    
    #判断队列是否已满
    if not q.full():
    	q.put_nowait('message-4')
    
    #读取消息时,先判断消息队列是否为空,在读取
    if not q.empty():
    	for i in range(q.qsize()):
    		print(q.get())#q.get会阻塞,q.get_nowait()不阻塞,但会抛异常 					

    False

    True

    except1,消息队列已满,现有消息数量:3

    except2,消息队列已满,现有消息数量:3

    message-1

    message-2

    message-3


    实例二:通过Process进程间通信

    from multiprocessing import Process,Queue
    import os,time,random 
    
    #写数据
    def write(q):
    	for value in ['A','B','C']:
    		print('Put %s to queue...'%value)
    		q.put(value)
    		time.sleep(random.random())
    
    #读数据
    def read(q):
    	while True:
    		if not q.empty():
    			value = q.get(True)
    			print('Get %s from queue...'%value)
    			time.sleep(random.random())
    		else:
    			break
    
    if __name__ == '__main__':
    	print('start...')
    	q = Queue()
    	#父进程的queue传递给子进程
    	pw = Process(target=write,args=(q,))
    	pr = Process(target=read,args=(q,))				
    	#写进程
    	pw.start()
    	pw.join()
    	#读进程
    	pr.start()
    	pr.join()
    	print('done...')

    start...

    Put A to queue...

    Put B to queue...

    Put C to queue...

    Get A from queue...

    Get B from queue...

    Get C from queue...

    done...


    实例三:通过Manager进程间通信

    from multiprocessing import Manager,Pool
    import os,time,random 
    
    #写数据
    def writer(q):
    	print('writer启动(%s),父进程为(%s)'%(os.getpid(),os.getppid()))
    	for i in 'chaoge':
    		q.put(i)
    
    #读数据
    def reader(q):
    	print('reader启动(%s),父进程为(%s)'%(os.getpid(),os.getppid()))
    	for i in range(q.qsize()):
    		print('reader 从Queue获取到消息:%s'%q.get())
    
    
    if __name__ == '__main__':
    	print('(%s) start'%os.getpid())
    	q = Manager().Queue()#使用Manager中的Queue来初始化
    	po=Pool()
    	#使用阻塞模式创建进程,这样就不需要再reader中使用死循环了,可以等write执行完成后,再用reader
    	po.apply(writer,(q,))
    	po.apply(reader,(q,))
    	#写进程
    	po.close()
    	po.join()
    	print('(%s) End'%os.getpid())

    (7720) start

    writer启动(7284),父进程为(7720)

    reader启动(8712),父进程为(7720)

    reader 从Queue获取到消息:c

    reader 从Queue获取到消息:h

    reader 从Queue获取到消息:a

    reader 从Queue获取到消息:o

    reader 从Queue获取到消息:g

    reader 从Queue获取到消息:e

    (7720) End


关键字