我有一些代码需要针对其他几个系统运行,这些系统可能会挂起或出现不受我控制的问题。我想使用 python 的多处理来生成子进程以独立于主程序运行,然后当它们挂起或出现问题时终止它们,但我不确定解决此问题的最佳方法。
当调用终止时,它确实会杀死子进程,但随后它会变成一个已失效的僵尸进程,直到进程对象消失才被释放。下面的示例代码中的循环永远不会结束,可以杀死它并允许再次调用时重生,但似乎不是解决此问题的好方法(即 multiprocessing.Process() 在 __init__() 中会更好)。
有人有建议吗?
class Process(object):
def __init__(self):
self.thing = Thing()
self.running_flag = multiprocessing.Value("i", 1)
def run(self):
self.process = multiprocessing.Process(target=self.thing.worker, args=(self.running_flag,))
self.process.start()
print self.process.pid
def pause_resume(self):
self.running_flag.value = not self.running_flag.value
def terminate(self):
self.process.terminate()
class Thing(object):
def __init__(self):
self.count = 1
def worker(self,running_flag):
while True:
if running_flag.value:
self.do_work()
def do_work(self):
print "working {0} ...".format(self.count)
self.count += 1
time.sleep(1)
您可以将子进程作为后台守护程序运行。
process.daemon = True
守护进程中的任何错误和挂起(或无限循环)都不会影响主进程,并且只有在主进程退出后才会终止。
这适用于简单的问题,直到您遇到大量子守护进程,这些子守护进程将在没有任何显式控制的情况下不断从父进程获取内存。
最好的方法是设置一个Queue
让所有子进程与父进程通信,以便我们可以join
他们并很好地清理干净。这是一些简单的代码,用于检查子进程是否挂起(又名time.sleep(1000)
),并向队列发送一条消息,供主进程对其采取操作:
import multiprocessing as mp
import time
import queue
running_flag = mp.Value("i", 1)
def worker(running_flag, q):
count = 1
while True:
if running_flag.value:
print(f"working {count} ...")
count += 1
q.put(count)
time.sleep(1)
if count > 3:
# Simulate hanging with sleep
print("hanging...")
time.sleep(1000)
def watchdog(q):
"""
This check the queue for updates and send a signal to it
when the child process isn't sending anything for too long
"""
while True:
try:
msg = q.get(timeout=10.0)
except queue.Empty as e:
print("[WATCHDOG]: Maybe WORKER is slacking")
q.put("KILL WORKER")
def main():
"""The main process"""
q = mp.Queue()
workr = mp.Process(target=worker, args=(running_flag, q))
wdog = mp.Process(target=watchdog, args=(q,))
# run the watchdog as daemon so it terminates with the main process
wdog.daemon = True
workr.start()
print("[MAIN]: starting process P1")
wdog.start()
# Poll the queue
while True:
msg = q.get()
if msg == "KILL WORKER":
print("[MAIN]: Terminating slacking WORKER")
workr.terminate()
time.sleep(0.1)
if not workr.is_alive():
print("[MAIN]: WORKER is a goner")
workr.join(timeout=1.0)
print("[MAIN]: Joined WORKER successfully!")
q.close()
break # watchdog process daemon gets terminated
if __name__ == '__main__':
main()
不终止worker
, 尝试join()
从那时起它到主进程就会永远阻塞worker
从未完成。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)