生产者与消费者模型

生产者与消费者模型

基于加锁能带来效率低和需要自己手动加锁的问题,我们最好寻找一种解决方案能够兼顾:

1.效率高(多个进程共享一块内存的数据)

2.帮我们处理好锁的问题,这就是multiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。

3.队列和管道都是将数据存放于内存中

4.队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,
我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可扩展性。

一.队列

1.进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的

创建队列的类(底层就是以管道和锁定的方式实现)  

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#创建队列的类
from multiprocessing import Queue
import time,random
#
q=Queue(3) #实例化一个队列对象,用于多进程之间的数据传递,3是队列中允许的最大项数,省略则无大小限制
# q.put() #q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
# q.get() #方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.
# q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。
# q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
# q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样
q.put([1,2,3],block=True,timeout=3)
q.put({'x':2},block=True,timeout=3)
q.put(2,block=True,timeout=3)
q.put((1,3,2,),block=True,timeout=3)
""""
Traceback (most recent call last):
File "F:/python practice/11.8 day36 互斥锁,守护进程/01.课堂练习.py", line 102, in <module>
q.put((1,3,2,),block=True,timeout=3)
File "E:\python 3.7\lib\multiprocessing\queues.py", line 83, in put
raise Full
queue.Full
"""

二.生产者消费者模型:

1.在并发编程中使用生产者和消费者模型能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

2.为什么要使用生产者和消费者模式: 在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据,反之亦然。

3.生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
while True:
res=q.get()
time.sleep(random.randint(1,3))
print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

def producer(q):
for i in range(10):
time.sleep(random.randint(1,3))
res='包子%s' %i
q.put(res)
print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res))

if __name__ == '__main__':
q=Queue()
#生产者们:即厨师们
p1=Process(target=producer,args=(q,))

#消费者们:即吃货们
c1=Process(target=consumer,args=(q,))

#开始
p1.start()
c1.start()
print('主')

此时的问题是主进程永远不会结束,原因是:生产者p在生产完后就结束了,但是消费者c在取空了q之后,则一直处于死循环中且卡在q.get()这一步。

解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就可以break出死循环

解决此问题的最好办法:JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生产者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。

#方法介绍:
JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常

q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from multiprocessing import Process,JoinableQueue
import time,random,os
def consumer(q):
while True:
res=q.get()
time.sleep(random.randint(1,3))
print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

q.task_done() #向q.join()发送一次信号,证明一个数据已经被取走了

def producer(name,q):
for i in range(10):
time.sleep(random.randint(1,3))
res='%s%s' %(name,i)
q.put(res)
print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res))
q.join()


if __name__ == '__main__':
q=JoinableQueue()
#生产者们:即厨师们
p1=Process(target=producer,args=('包子',q))
p2=Process(target=producer,args=('骨头',q))
p3=Process(target=producer,args=('泔水',q))

#消费者们:即吃货们
c1=Process(target=consumer,args=(q,))
c2=Process(target=consumer,args=(q,))
c1.daemon=True
c2.daemon=True

#开始
p_l=[p1,p2,p3,c1,c2]
for p in p_l:
p.start()

p1.join()
p2.join()
p3.join()
q.join() # 主进程等q结束,即q内数据被取干净了
print('主')

#主进程等--->p1,p2,p3等---->c1,c2
#p1,p2,p3结束了,证明c1,c2肯定全都收完了p1,p2,p3发到队列的数据
#因而c1,c2也没有存在的价值了,应该随着主进程的结束而结束,所以设置成守护进程
图灵python大海老师 wechat
python分享公众号
坚持原创技术分享,您的支持将鼓励我继续创作!