这是我最近的问题的延伸避免 Python 3 的多处理队列中的竞争条件 https://stackoverflow.com/questions/10607747/avoiding-race-conditions-in-python-3s-multiprocessing-queues。希望这个版本的问题更加具体。
TL;DR:在多处理模型中,工作进程从队列中使用multiprocessing.Queue
,为什么我的工作进程如此空闲?每个进程都有自己的输入队列,因此它们不会相互争夺共享队列的锁,但队列实际上花费了大量时间为空。主进程正在运行一个 I/O 密集型线程——这是否会减慢 CPU 密集型输入队列的填充速度?
我试图在一定的约束下找到 N 个集合的笛卡尔积的最大元素,每个集合都有 M_i 个元素(对于 0 is_feasible回报True
。在我的问题中,我试图找到其元素权重最大的组合:sum(element.weight for element in combination)
.
我的问题规模很大,但我公司的服务器也很大。我正在尝试将以下串行算法重写为并行算法。
from operator import itemgetter
from itertools import product # Cartesian product function from the std lib
def optimize(sets):
"""Return the largest (total-weight, combination) tuple from all
possible combinations of the elements in the several sets, subject
to the constraint that is_feasible(combo) returns True."""
return max(
map(
lambda combination: (
sum(element.weight for element in combination),
combination
),
filter(
is_feasible, # Returns True if combo meets constraint
product(*sets)
)
),
key=itemgetter(0) # Only maximize based on sum of weight
)
我当前的多处理方法是创建工作进程并为它们提供输入队列的组合。当工人们收到一份毒丸 https://stackoverflow.com/questions/10607747/avoiding-race-conditions-in-python-3s-multiprocessing-queues他们将见过的最佳组合放入输出队列并退出。我从主进程的主线程填充输入队列。这种技术的一个优点是我可以从主进程生成一个辅助线程来运行监视工具(我可以使用一个 REPL 来查看到目前为止已处理的组合数量以及队列的满度)。
+-----------+
in_q0 | worker0 |----\
/-------+-----------+ \
+-----------+ in_q1 +-----------+ \ out_q +-----------+
| main |-----------| worker1 |-----------| main |
+-----------+ +-----------+ / +-----------+
\-------+-----------+ /
in_q2 | worker2 |----/
+-----------+
我最初让所有工作人员从一个输入队列中读取数据,但发现它们都没有占用 CPU。考虑到他们把所有的时间都花在等待queue.get()解除阻塞上,我给了他们自己的队列。这增加了 CPU 的压力,因此我认为工作人员的活动频率更高。然而,队列大部分时间都是空的! (我从我提到的监控 REPL 中知道这一点)。这对我来说表明主循环填充队列的速度很慢。这是那个循环:
from itertools import cycle
main():
# (Create workers, each with its own input queue)
# Cycle through each worker's queue and add a combination to that queue
for combo, worker in zip(product(*sets), cycle(workers)):
worker.in_q.put(combo)
# (Collect results and return)
我猜瓶颈是worker.in_q.put()
。我怎样才能让它更快?我的第一反应是让工作线程变慢,但这没有意义……问题是监视器线程过于频繁地停止循环吗?我怎么能知道呢?
或者,是否有另一种方法来实现这一点,而不需要太多的等待锁?