一文讲透Python线程池ThreadPoolExecutor

2023-12-04

01 初识

Python 中已经有了 threading 模块,为什么还需要线程池呢,线程池又是什么东西呢?在介绍线程同步的信号量机制的时候,举得例子是爬虫的例子,需要控制同时爬取的线程数,例子中创建了20个线程,而同时只允许3个线程在运行,但是 20个线程都需要创建和销毁,线程的创建是需要消耗系统资源的 ,有没有更好的方案呢?其实只需要三个线程就行了,每个线程各分配一个任务,剩下的任务排队等待,当某个线程完成了任务的时候,排队任务就可以安排给这个线程继续执行。

这就是线程池的思想(当然没这么简单),但是自己编写线程池很难写的比较完美,还需要考虑复杂情况下的线程同步,很容易发生死锁。从 Python3.2 开始,标准库为我们提供了 concurrent.futures 模块,它提供了 ThreadPoolExecutor ProcessPoolExecutor 两个类,实现了对 threading multiprocessing 进一步抽象(这里主要关注线程池),不仅可以帮我们 自动调度线程 ,还可以做到:

  • 主线程可以获取某一个线程(或者任务的)的状态,以及返回值。

  • 当一个线程完成的时候,主线程能够立即知道。

  • 让多线程和多进程的编码接口一致。

02 实例

简单使用

from concurrent.futures import ThreadPoolExecutor

import time

# 参数times用来模拟网络请求的时间

def get_html(times):

   time.sleep(times)
   print("get page {}s finished".format(times))
   return times

executor = ThreadPoolExecutor(max_workers=2)

# 通过submit函数提交执行的函数到线程池中,submit函数立即返回,不阻塞

task1 = executor.submit(get_html, (3))

task2 = executor.submit(get_html, (2))

# done方法用于判定某个任务是否完成

print(task1.done())

# cancel方法用于取消某个任务,该任务没有放入线程池中才能取消成功

print(task2.cancel())time.sleep(4)print(task1.done())

# result方法可以获取task的执行结果

print(task1.result())

# 执行结果# False  

# 表明task1未执行完成# False  

# 表明task2取消失败,因为已经放入了线程池中

# get page 2s finished# get page 3s finished# True  

# 由于在get page 3s finished之后才打印,所以此时task1必然完成了

# 3     

# 得到task1的任务返回值
  • ThreadPoolExecutor 构造实例的时候,传入 max_workers 参数来设置线程池中最多能同时运行的线程数目。

  • 使用 submit 函数来提交线程需要执行的任务(函数名和参数)到线程池中,并返回该任务的句柄(类似于文件、画图),注意 submit() 不是阻塞的,而是立即返回。

  • 通过 submit 函数返回的任务句柄,能够使用 done() 方法判断该任务是否结束。上面的例子可以看出,由于任务有2s的延时,在 task1 提交后立刻判断, task1 还未完成,而在延时4s之后判断, task1 就完成了。

  • 使用 cancel() 方法可以取消提交的任务,如果任务已经在线程池中运行了,就取消不了。这个例子中,线程池的大小设置为2,任务已经在运行了,所以取消失败。如果改变线程池的大小为1,那么先提交的是 task1 task2 还在排队等候,这是时候就可以成功取消。

  • 使用 result() 方法可以获取任务的返回值。查看内部代码,发现这个方法是阻塞的。

as_completed

上面虽然提供了判断任务是否结束的方法,但是不能在主线程中一直判断啊。有时候我们是得知某个任务结束了,就去获取结果,而不是一直判断每个任务有没有结束。这是就可以使用 as_completed 方法一次取出所有任务的结果。

from concurrent.futures import ThreadPoolExecutor, as_completed

import time

# 参数times用来模拟网络请求的时间

def get_html(times):

   time.sleep(times)
   print("get page {}s finished".format(times))
   return times

executor = ThreadPoolExecutor(max_workers=2)

urls = [3, 2, 4] # 并不是真的url

all_task = [executor.submit(get_html, (url)) for url in urls]



for future in as_completed(all_task):

   data = future.result()

   print("in main: get page {}s success".format(data))#

 执行结果

# get page 2s finished

# in main: get page 2s success

# get page 3s finished

# in main: get page 3s success

# get page 4s finished

# in main: get page 4s success

as_completed() 方法是一个生成器,在没有任务完成的时候,会阻塞,在有某个任务完成的时候,会 yield 这个任务,就能执行for循环下面的语句,然后继续阻塞住,循环到所有的任务结束。从结果也可以看出, 先完成的任务会先通知主线程

map

除了上面的 as_completed 方法,还可以使用 executor.map 方法,但是有一点不同。

from concurrent.futures import ThreadPoolExecutor

import time



# 参数times用来模拟网络请求的时间def get_html(times):

   time.sleep(times)
   print("get page {}s finished".format(times))
   return times

executor = ThreadPoolExecutor(max_workers=2)

urls = [3, 2, 4] # 并不是真的url



for data in executor.map(get_html, urls):

   print("in main: get page {}s success".format(data))

# 执行结果

# get page 2s finished

# get page 3s finished

# in main: get page 3s success

# in main: get page 2s success

# get page 4s finished

# in main: get page 4s success

使用 map 方法,无需提前使用 submit 方法, map 方法与 python 标准库中的 map 含义相同,都是将序列中的每个元素都执行同一个函数。上面的代码就是对 urls 的每个元素都执行 get_html 函数,并分配各线程池。可以看到执行结果与上面的 as_completed 方法的结果不同, 输出顺序和 urls 列表的顺序相同 ,就算2s的任务先执行完成,也会先打印出3s的任务先完成,再打印2s的任务完成。

wait

wait 方法可以让主线程阻塞,直到满足设定的要求。

​from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETEDimport time

# 参数times用来模拟网络请求的时间

def get_html(times):

    time.sleep(times)
    print("get page {}s finished".format(times))
    return times

executor = ThreadPoolExecutor(max_workers=2)

urls = [3, 2, 4] # 并不是真的url

all_task = [executor.submit(get_html, (url)) for url in urls]

wait(all_task, return_when=ALL_COMPLETED)print("main")

# 执行结果 

# get page 2s finished

# get page 3s finished

# get page 4s finished

# main

​

wait 方法接收3个参数,等待的任务序列、超时时间以及等待条件。等待条件 return_when 默认为 ALL_COMPLETED ,表明要等待所有的任务都结束。可以看到运行结果中,确实是所有任务都完成了,主线程才打印出 main 。等待条件还可以设置为 FIRST_COMPLETED ,表示第一个任务完成就停止等待。

03 源码分析

cocurrent.future 模块中的 future 的意思是 未来对象 ,可以把它理解为 一个在未来完成的操作 ,这是异步编程的基础 。在线程池 submit() 之后,返回的就是这个 future 对象,返回的时候任务并没有完成,但会在将来完成。也可以称之为task的返回容器,这个里面会存储task的结果和状态。那 ThreadPoolExecutor 内部是如何操作这个对象的呢?

下面简单介绍 ThreadPoolExecutor 的部分代码:

1.init方法

图片

init 方法中主要重要的就是任务队列和线程集合,在其他方法中需要使用到。

2.submit方法

图片

submit 中有两个重要的对象, _base.Future() _WorkItem() 对象, _WorkItem() 对象负责运行任务和对 future 象进行设置,最后会将 future 对象返回,可以看到整个过程是立即返回的,没有阻塞。

3.adjust_thread_count方法

图片

这个方法的含义很好理解,主要是创建指定的线程数。但是实现上有点难以理解,比如线程执行函数中的weakref.ref,涉及到了弱引用等概念,留待以后理解。

4._WorkItem对象

图片

_WorkItem 对象的职责就是执行任务和设置结果。这里面主要复杂的还是 self.future.set_result(result)

5.线程执行函数--_worker

图片

这是线程池创建线程时指定的函数入口,主要是从队列中依次取出task执行,但是函数的第一个参数还不是很明白。留待以后。

04 总结

  • future的设计理念很棒,在线程池/进程池和携程中都存在future对象,是异步编程的核心。

  • ThreadPoolExecutor 让线程的使用更加方便,减小了线程创建/销毁的资源损耗,无需考虑线程间的复杂同步,方便主线程与子线程的交互。

  • 线程池的抽象程度很高,多线程和多进程的编码接口一致。

未完成

  • 对future模块的理解。

  • weakref.ref是什么?

  • 线程执行函数入口_worker的第一个参数的意思。

行动吧,在路上总比一直观望的要好,未来的你肯定会感谢现在拼搏的自己!如果想学习提升找不到资料,没人答疑解惑时, 请及时加入群: 786229024 ,里面有各种测试开发资料和技术可以一起交流哦。

最后: 下方这份完整的软件测试视频教程已经整理上传完成,需要的朋友们可以自行领取 【保证100%免费】 在这里插入图片描述
软件测试面试文档
我们学习必然是为了找到高薪的工作,下面这些面试题是来自阿里、腾讯、字节等一线互联网大厂最新的面试资料,并且有字节大佬给出了权威的解答,刷完这一套面试资料相信大家都能找到满意的工作。 在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

一文讲透Python线程池ThreadPoolExecutor 的相关文章

  • 将 saxon 与 python 结合使用

    我需要使用 python 处理 XSLT 目前我正在使用仅支持 XSLT 1 的 lxml 现在我需要处理 XSLT 2 有没有办法将 saxon XSLT 处理器与 python 一起使用 有两种可能的方法 设置一个 HTTP 服务 接受
  • Python(Selenium):如何通过登录重定向/组织登录登录网站

    我不是专业程序员 所以请原谅任何愚蠢的错误 我正在做一些研究 我正在尝试使用 Selenium 登录数据库来搜索大约 1000 个术语 我有两个问题 1 重定向到组织登录页面后如何使用 Selenium 登录 2 如何检索数据库 在我解决
  • 使用 matplotlib 绘制时间序列数据并仅在年初显示年份

    rcParams date autoformatter month b n Y 我正在使用 matpltolib 来绘制时间序列 如果我按上述方式设置 rcParams 则生成的图会在每个刻度处标记月份名称和年份 我怎样才能将其设置为仅在每
  • python 相当于 R 中的 get() (= 使用字符串检索符号的值)

    在 R 中 get s 函数检索名称存储在字符变量 向量 中的符号的值s e g X lt 10 r lt XVI s lt substr r 1 1 X get s 10 取罗马数字的第一个符号r并将其转换为其等效整数 尽管花了一些时间翻
  • SQLALchemy .query:类“Car”的未解析属性引用“query”

    我有一个这里已经提到的问题https youtrack jetbrains com issue PY 44557 https youtrack jetbrains com issue PY 44557 但我还没有找到解决方案 我使用 Pyt
  • 使用 Tkinter 显示 numpy 数组中的图像

    我对 Python 缺乏经验 第一次使用 Tkinter 制作一个 UI 显示我的数字分类程序与 mnist 数据集的结果 当图像来自 numpy 数组而不是我的 PC 上的文件路径时 我有一个关于在 Tkinter 中显示图像的问题 我为
  • AWS EMR Spark Python 日志记录

    我正在 AWS EMR 上运行一个非常简单的 Spark 作业 但似乎无法从我的脚本中获取任何日志输出 我尝试过打印到 stderr from pyspark import SparkContext import sys if name m
  • 绘制方程

    我正在尝试创建一个函数 它将绘制我告诉它的任何公式 import numpy as np import matplotlib pyplot as plt def graph formula x range x np array x rang
  • 添加不同形状的 numpy 数组

    我想添加两个不同形状的 numpy 数组 但不进行广播 而是将 缺失 值视为零 可能最简单的例子是 1 2 3 2 gt 3 2 3 or 1 2 3 2 1 gt 3 2 3 1 0 0 我事先不知道形状 我正在弄乱每个 np shape
  • 如何在ipywidget按钮中显示全文?

    我正在创建一个ipywidget带有一些文本的按钮 但按钮中未显示全文 我使用的代码如下 import ipywidgets as widgets from IPython display import display button wid
  • python获取上传/下载速度

    我想在我的计算机上监控上传和下载速度 一个名为 conky 的程序已经在 conky conf 中执行了以下操作 Connection quality alignr wireless link qual perc wlan0 downspe
  • Pandas:merge_asof() 对多行求和/不重复

    我正在处理两个数据集 每个数据集具有不同的关联日期 我想合并它们 但因为日期不完全匹配 我相信merge asof 是最好的方法 然而 有两件事发生merge asof 不理想的 数字重复 数字丢失 以下代码是一个示例 df a pd Da
  • Fabric env.roledefs 未按预期运行

    On the 面料网站 http docs fabfile org en 1 10 usage execution html 给出这个例子 from fabric api import env env roledefs web hosts
  • 每个 X 具有多个 Y 值的 Python 散点图

    我正在尝试使用 Python 创建一个散点图 其中包含两个 X 类别 cat1 cat2 每个类别都有多个 Y 值 如果每个 X 值的 Y 值的数量相同 我可以使用以下代码使其工作 import numpy as np import mat
  • 为字典中的一个键附加多个值[重复]

    这个问题在这里已经有答案了 我是 python 新手 我有每年的年份和值列表 我想要做的是检查字典中是否已存在该年份 如果存在 则将该值附加到特定键的值列表中 例如 我有一个年份列表 并且每年都有一个值 2010 2 2009 4 1989
  • Conda SafetyError:文件大小不正确

    使用创建 Conda 环境时conda create n env name python 3 6 我收到以下警告 Preparing transaction done Verifying transaction SafetyError Th
  • 使用其构造函数初始化 OrderedDict 以便保留初始数据的顺序的正确方法?

    初始化有序字典 OD 以使其保留初始数据的顺序的正确方法是什么 from collections import OrderedDict Obviously wrong because regular dict loses order d O
  • 在 Qt 中自动调整标签文本大小 - 奇怪的行为

    在 Qt 中 我有一个复合小部件 它由排列在 QBoxLayouts 内的多个 QLabels 组成 当小部件调整大小时 我希望标签文本缩放以填充标签区域 并且我已经在 resizeEvent 中实现了文本大小的调整 这可行 但似乎发生了某
  • Rocket UniData/UniVerse:ODBC 无法分配足够的内存

    每当我尝试使用pyodbc连接到 Rocket UniData UniVerse 数据时我不断遇到错误 pyodbc Error 00000 00000 Rocket U2 U2ODBC 0302810 Unable to allocate
  • 导入错误:没有名为 site 的模块 - mac

    我已经有这个问题几个月了 每次我想获取一个新的 python 包并使用它时 我都会在终端中收到此错误 ImportError No module named site 我不知道为什么会出现这个错误 实际上 我无法使用任何新软件包 因为每次我

随机推荐

  • 波场TRON将致力于推动各方合作打击恐怖主义融资

    随着加密行业的蓬勃发展 新的挑战也接踵而至 近期 有外媒报道称 哈马斯等美国认定的国际恐怖组织涉通过波场TRON进行融资活动 在这场风波中 区块链项目波场TRON似乎成为了质疑的焦点 然而 当我们深入了解事实真相时 或许会发现事情并非传言中
  • 获取员工其当前的薪水比其manager当前薪水还高的相关信息

    11月29日 财报大部分数据落在预期内甚至小超预期 11月30日 股价埋头下跌 这是最近评论区争论最厉害的中概股之一 美团 给下跌找理由和给上涨编故事的本质都是 首先 冒泡排序是什么 冒泡排序 Bubble Sort 是一种简单的排序算法
  • 谈谈兼容性测试

    兼容性测试是一种测试软件或网站在不同的环境下是否能够正常运行和显示的测试方法 主要目的是保证软件的功能 性能和用户体验在各种条件下都达到预期的标准 兼容性测试的范围包括以下几个方面 浏览器兼容性 测试软件或网站在不同的浏览器 如Chrome
  • 界面组件DevExpress Reporting v23.1新版亮点 - UX功能增强

    DevExpress Reporting gt https www evget com product 3373 是 NET Framework下功能完善的报表平台 它附带了易于使用的Visual Studio报表设计器和丰富的报表控件集
  • 扬帆证券:趋势线是画最低点还是收盘价?

    趋势线是股票分析中最底子的技术指标之一 趋势线是一种可帮忙股票生意者辨认价格趋势的图形方法 趋势线是可以经过联接恣意两个价格点画出的一条直线 但是 在画出趋势线时 一个常见的问题是 运用最低点还是收盘价来画趋势线 在这篇文章中 我们将从多个
  • 【计算机毕业设计】垃圾分类回收系统

    垃圾分类回收系统 如今社会上各行各业 都喜欢用自己行业的专属软件工作 互联网发展到这个时候 人们已经发现离不开了互联网 新技术的产生 往往能解决一些老技术的弊端问题 因为传统垃圾分类回收系统信息管理难度大 容错率低 管理人员处理数据费工费时
  • 扬帆证券:A股股息率逼近历史新高 价值股迎配置良机

    A股公司酬谢股东积极性持续进步 本年前三季度多达243家公司现金分红 一切核算数据只包括各个报告期分红 不包括特别分红 派现公司数量及占比均创出10年来同期新高 估计分红近2300亿元 分红率靠近33 分红额及分红率均为近10年来同期次高
  • 【计算机毕业设计】民航网上订票系统

    民航网上订票系统 传统办法管理信息首先需要花费的时间比较多 其次数据出错率比较高 而且对错误的数据进行更改也比较困难 最后 检索数据费事费力 因此 在计算机上安装民航网上订票系统软件来发挥其高效地信息处理的作用 可以规范信息管理流程 让管理
  • SQL 数据操作技巧:SELECT INTO、INSERT INTO SELECT 和 CASE 语句详解

    SQL SELECT INTO 语句 SELECT INTO 语句将数据从一个表复制到一个新表中 SELECT INTO 语法 将所有列复制到新表中 SELECT INTO newtable IN externaldb FROM oldta
  • 双馈风机虚拟惯性控制+下垂控制参与系统一次调频的Simulink模型

    欢迎来到本博客 博主优势 博客内容尽量做到思维缜密 逻辑清晰 为了方便读者 座右铭 行百里者 半于九十 本文目录如下 目录 1 概述 2 运行结果 3 参考文献 4 Simulink仿真实现
  • 邻接表表示图进行深度优先搜索,广度优先搜索,最小生成树

    图的邻接表定义 下面用邻接表实现图的深度优先搜索和广度优先搜索 用邻接矩阵来实现最小生成树 图的邻接表 首先定义一个图的邻接表的类 里面包括图的顶点数 图的边数 顶点表数组 由于顶点表数组里存放的都是图的一个个节点 因此需要用到顶点节点和边
  • 【计算机毕业设计】宠物猫认养系统

    宠物猫认养系统 传统办法管理信息首先需要花费的时间比较多 其次数据出错率比较高 而且对错误的数据进行更改也比较困难 最后 检索数据费事费力 因此 在计算机上安装宠物猫认养系统软件来发挥其高效地信息处理的作用 可以规范信息管理流程 让管理工作
  • 扬帆证券:什么是证券服务机构?

    股票市场上 除出资者之外有各式各样的生意主体 盘绕证券所打开的公司类型非常丰盛 在实践生意中 常与出资者有所联络的不止证券公司 还有证券服务组织 那什么是证券服务组织 它和证券公司之间有什么关系 关于这些 本文将借用相关常识作部分评论 来为
  • 基于GWO-BP灰狼算法优化BP神经网络时序回归预测研究(Matlab代码实现)

    欢迎来到本博客 博主优势 博客内容尽量做到思维缜密 逻辑清晰 为了方便读者 座右铭 行百里者 半于九十 目录 1 概述 2 运行结果 3 参考文献 4 Matlab代码 数据 讲解文档 1 概述 基于GWO
  • 【计算机毕业设计】红色革命文物征集管理系统

    红色革命文物征集管理系统 传统办法管理信息首先需要花费的时间比较多 其次数据出错率比较高 而且对错误的数据进行更改也比较困难 最后 检索数据费事费力 因此 在计算机上安装红色革命文物征集管理系统软件来发挥其高效地信息处理的作用 可以规范信息
  • 【计算机毕业设计】私房菜定制上门服务系统

    私房菜定制上门服务系统 如今社会上各行各业 都喜欢用自己行业的专属软件工作 互联网发展到这个时候 人们已经发现离不开了互联网 新技术的产生 往往能解决一些老技术的弊端问题 因为传统私房菜定制上门服务系统信息管理难度大 容错率低 管理人员处理
  • 光伏储能单相逆变器并网仿真模型(Simulink仿真实现)

    欢迎来到本博客 博主优势 博客内容尽量做到思维缜密 逻辑清晰 为了方便读者 座右铭 行百里者 半于九十 本文目录如下 目录 1 概述 2 运行结果 3 参考文献 4 Simulink仿真实现
  • 【计算机毕业设计】视频点播系统

    视频点播系统 传统办法管理信息首先需要花费的时间比较多 其次数据出错率比较高 而且对错误的数据进行更改也比较困难 最后 检索数据费事费力 因此 在计算机上安装视频点播系统软件来发挥其高效地信息处理的作用 可以规范信息管理流程 让管理工作可以
  • 【计算机毕业设计】可信捐赠系统

    可信捐赠系统 如今社会上各行各业 都喜欢用自己行业的专属软件工作 互联网发展到这个时候 人们已经发现离不开了互联网 新技术的产生 往往能解决一些老技术的弊端问题 因为传统可信捐赠系统信息管理难度大 容错率低 管理人员处理数据费工费时 所以专
  • 一文讲透Python线程池ThreadPoolExecutor

    01 初识 Python 中已经有了 threading 模块 为什么还需要线程池呢 线程池又是什么东西呢 在介绍线程同步的信号量机制的时候 举得例子是爬虫的例子 需要控制同时爬取的线程数 例子中创建了20个线程 而同时只允许3个线程在运行