一个明显的调整是使用mp.Manager.dict()
而不是集合,并使用任意值(例如,集合the_dict[result_int] = 1
以指示集合中的成员资格)。顺便说一句,这就是在 Python 添加之前“每个人”实现集合的方式set
类型,甚至现在字典和集合也是通过基本相同的代码在幕后实现的。
稍后添加:我承认我不明白为什么您在原始代码中同时使用集合和列表,因为集合的键与列表的内容相同。如果输入顺序并不重要,为什么不完全忘记该列表呢?然后,您还可以删除原始版本中所需的锁定层,以保持集合和列表同步。
通过 dict 建议来充实它,整个函数将变得像这样:
def worker(shared_dict):
# Do some processing and get an integer
result_int = some_other_code()
shared_dict[result_int] = 1
其他进程可以做shared_dict.pop()
然后一次获取一个值(虽然,不,他们等不及.pop()
就像他们对队列所做的那样.get()
).
还有一件事:考虑使用本地(进程本地)集吗?他们会跑得更快。那么每个worker就不会添加任何重复项it知道,但可能有重复across流程。您的代码没有给出任何关于什么的提示out_q
消费者确实如此,但如果只有一个,那么其中的本地集也可以清除跨进程重复项。或者也许内存负担变得太重了?无法从这里猜测;-)
BIG EDIT
我将建议一种不同的方法:不要使用mp.Manager
根本不。大多数时候我看到人们使用它,他们会后悔,因为它没有做他们想要的事情think它正在做。他们的想法是:它提供物理共享对象。它在做什么:它正在供应语义上共享对象。从物理上讲,它们生活在“又一个”中,在幕后,对对象的进程和操作被转发到后一个进程,由该进程在其自己的地址空间中执行。它不是身体上的完全共享。因此,虽然它非常方便,但即使是最简单的操作也会产生大量的进程间开销。
因此,我建议在一个进程中使用单个普通集,这将是与清除重复项有关的唯一代码。工作进程生成整数而不关心重复 - 它们只是传递整数。一个mp.Queue
对此很好(同样,不需要mp.Manager.Queue
).
像这样,这是一个完整的可执行程序:
N = 20
def worker(outq):
from random import randrange
from time import sleep
while True:
i = randrange(N)
outq.put(i)
sleep(0.1)
def uniqueifier(inq, outq):
seen = set()
while True:
i = inq.get()
if i not in seen:
seen.add(i)
outq.put(i)
def consumer(inq):
for _ in range(N):
i = inq.get()
print(i)
if __name__ == "__main__":
import multiprocessing as mp
q1 = mp.Queue()
q2 = mp.Queue()
consume = mp.Process(target=consumer, args=(q2,))
consume.start()
procs = [mp.Process(target=uniqueifier, args=(q1, q2))]
for _ in range(4):
procs.append(mp.Process(target=worker, args=(q1,)))
for p in procs:
p.start()
consume.join()
for p in procs:
p.terminate()
第二个队列传递给uniqueifier
扮演原始队列的角色:它仅提供唯一的整数。不会尝试“共享内存”,因此不会支付由此产生的任何费用。唯一的进程间通信是通过简单、显式的mp.Queue
运营。只有一组,并且由于它不以任何方式共享,因此它运行得尽可能快。
实际上,这只是设置了一个简单的管道,尽管有多个输入。