multiprocessing中的多进程Process的基本使用
在python中,进程是通过 multiprocessing
多进程模块来管理的,multiprocessing模块提供了一个Process类来创建进程对象。
创建子进程:
Process(group, target, name, args, kwargs)
- 【group】指定进程组,⼤多数情况下⽤不到
- 【target】表示调用对象,即子进程要执行的任务
- 【name】子进程的名称,可以不设定
- 【args】给target指定的函数传递的参数,以元组的⽅式传递。当只有一个入参时注意添加逗号,如args=(a,)
- 【kwargs】给target指定的函数传递命名参数,比较少用
Process常用方法
-
t.start()
,主进程和子进行都会正常开启运行,当只使用直到所有进程运行结束后,代码运行结束。
t.join(timeout)
可以理解为1个闸门,让程序等待该子进程运行结束后 再向下运行
timeout 没有数值时:当前子进程运行时,主进程在join()位置处于等待状态;当前子进程结束后,再进入主进程运行接下来的代码,直到结束
timeout=a,当前子进程运行后a时间前,主进程在join()位置处于等待状态;当前子进程运行后a时间后,主进程开始运行
t.daemon = True
只要主进程运行结束后,立即结束子进程。必须用在t.start()前
t.terminate()
不管任务是否完成,在当前代码位置时立即结束子进程。可放在主进程的代码最后,代替t.daemon = True的使用
t.is_alive()
:判断进程⼦进程是否还存活
import multiprocessing
import time
import os
def f0(a1):
time.sleep(10)
print(a1)
# print("子进程name", multiprocessing.current_process())
# print("子进程pid", multiprocessing.current_process().pid)
# print("父进程pid", os.getppid())
def f1(a1):
time.sleep(3)
print(a1)
def test_Process():
## ===========================================================
t1 = multiprocessing.Process(target=f0,args=(12,))
# t1.daemon=True #将daemon设置为True,则主线程不用等待子进程结束,主线程结束则所有结束
t1.start() ## 子进程开始运行
# t1.join() ## 该命令让主进程等待。当前子进程结束后,主进程再继续
## ===========================================================
t2 = multiprocessing.Process(target=f1, args=(13,))
# t2.daemon = True
t2.start()
# t2.join()
print('end')
# t1.terminate() ## 该命令使没有结束的子进程立即结束
# t2.terminate()
if __name__ == '__main__': # windows下必须加这句
test_Process()
多进程Process公用内存的方法
import multiprocessing
import time
import os
import threading
def f1(data, i):
data[i]=i
time.sleep(0.5)
# print('xixihaha', data.values())
def test_Process_dist():
"""
线程之间共用内存。进程之间默认是不能共用内存的; 如果想要公用内存 使用进程里面的manager
"""
# DATA = {} ## 测试一般的字典通过多进程修改的效果
manager = multiprocessing.Manager()
DATA = manager.dict()
p = [None]*10
for i in range(10):
p[i] = multiprocessing.Process(target=f1,args=(DATA, i))
p[i].start()
# p[i].join()
for i in range(10):
p[i].join() ## 可以测试 join() 的不同位置带来的运行效果
# time.sleep(1)
print(DATA)
def test_Thread_dist():
"""
线程之间共用内存
"""
DATA = {}
for i in range(10):
p = threading.Thread(target=f1,args=(DATA,i))
p.start()
time.sleep(1)
print(DATA)
if __name__ == '__main__': # windows下必须加这句
test_Process_dist()
# test_Thread_dist()
multiprocessing中的进程池
当需要创建的⼦进程数量不多时, 可以直接利⽤multiprocessing.Process动态生成多个进程, 但如果要创建很多进程时,⼿动创建的话⼯作量会非常大,此时就可以⽤到multiprocessing模块提供的Pool去创建一个进程池。
multiprocessing.Pool常⽤函数
apply_async(func, args, kwds)
:使⽤⾮阻塞⽅式调⽤func(任务并⾏执⾏),args为传递给func的参数列表,kwds为传递给func的关键字参数列表
apply(func, args, kwds)
:使⽤阻塞⽅式调⽤func,必须等待上⼀个进程执行完任务后才能执⾏下⼀个进程,了解即可,几乎不用
close()
:关闭Pool,使其不再接受新的任务
terminate()
:不管任务是否完成,⽴即终⽌
join()
:主进程阻塞,等待⼦进程的退出,必须在close或terminate之后使⽤
初始化Pool时,可以指定⼀个最⼤进程数,当有新的任务提交到Pool中时,如果进程池还没有满,那么就会创建⼀个新的进程⽤来执⾏该任务,但如果进程池已满(池中的进程数已经达到指定的最⼤值),那么该任务就会等待,直到池中有进程结束才会创建新的进程来执⾏。
import multiprocessing
import time
import os
def f11(a,q):
q.put(a)
return a+100
def Bar(arg):
print(arg)
def test_pool():
manager = multiprocessing.Manager()
q = manager.Queue(30)
# q = multiprocessing.Queue(30) ### 这种队列对进行池无效
pool = multiprocessing.Pool(5)
for i in range(5):
# pool.apply(func=f11, args=(i,)) ## #apply里面是每个进程执行完毕了才执行下一个进程, 基本不使用这种方式
pool.apply_async(func=f11, args=(i,q), callback=Bar) ## 这里的func 是调用的函数,args是入参,Bar是回调函数,回调函数的入参就是func的返回值
pool.close()#执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
pool.join()#等待进程运行完毕,先调用close函数,否则会出错
print('=======================')
while 1:
if q.empty(): break
print(q.get())
if __name__ == '__main__': # windows下必须加这句
print('=======================')
test_pool()