线程可以处理很长的 I/O 进程吗

2024-04-02

我在这里开始一个新主题,该主题将与这个问题 https://stackoverflow.com/questions/47250025/qthreadpool-how-to-interrupt-how-to-use-wisely-the-waitfordone-method.

我邀请您阅读背景知识,以获得全球性的想法。

所以我有一个依赖于 python 3.2 API(由私人公司开发)的下载功能。每个文件的处理过程最多可能需要 400 秒。

显然,我不仅要下载一个文件,因此几天来我一直在尝试将每个下载进程放入线程池中。池中的每个线程都应该完全独立于 GUI 主线程。当其中一个完成后,它应该只向 GUI 发送一个信号。

我做了几次测试,但无论使用什么技术,但是

  1. GUI 冻结;
  2. 结果仅在所有线程处理结束时给出,而不是根据需要一一给出。

我认为API给出的下载方法是一个阻塞函数,不能线程化。

所以我的问题很简单:如何知道一个 I/O 方法是否可以通过线程处理。


2017年11月24日更新

您将在下面找到部分满足我的期望的初稿(使用串联 multiprocessing.pool / map_async)。正如您将看到的,不幸的是,我不得不插入一个“忙等待循环”,以便在 QPlainTextEdit 上获取有关正在发生的情况的一些信息。

任务的结果仅在全局处理(行为map_async)结束时给出。这不正是我要找的。我想插入更多的实时信息,并立即在控制台上查看每个已完成的任务的消息。

import time
import multiprocessing
import private.library as bathy
from PyQt4 import QtCore, QtGui
import os
import sys

user = 'user'
password = 'password'
server = 'server'
basename = 'basename'

workers = multiprocessing.cpu_count()

node = bathy.NodeManager(user, password, server)
database = node.get_database(basename)

ids = (10547, 3071, 13845, 13846, 13851, 13844, 5639, 4612, 4613, 954,
       961, 962, 4619, 4620, 4622, 4623, 4624, 4627, 4628, 4631,
       4632, 4634, 4635, 4638, 4639, 4640, 4641, 4642, 10722, 1300,
       1301, 1303, 1310, 1319, 1316, 1318, 1321, 1322, 1323, 1324,
       1325, 1347, 1348, 1013, 1015, 1320, 8285, 8286, 8287, 10329,
       9239, 9039, 5006, 5009, 5011, 5012, 5013, 5014, 5015, 5025,
       5026, 4998, 5040, 5041, 5042, 5043, 11811, 2463, 2464, 5045,
       5046, 5047, 5048, 5049, 5053, 5060, 5064, 5065, 5068, 5069,
       5071, 5072, 5075, 5076, 5077, 5079, 5080, 5081, 5082, 5083,
       5084, 5085, 5086, 5087, 5088, 5090, 5091, 5092, 5093)


# ---------------------------------------------------------------------------------
def download(surface_id, index):
    global node
    global database

    t = time.time()
    message = 'Surface #%d - Process started\n' % index

    surface = database.get_surface(surface_id)
    metadata = surface.get_metadata()
    file_path = os.path.join("C:\\Users\\philippe\\Test_Download",
                             metadata["OBJNAM"] + ".surf")

    try:
        surface.download_bathymetry(file_path)
    except RuntimeError as error:
        message += "Error : " + str(error).split('\n')[0] + '\n'
    finally:
        message += ('Process ended : %.2f s\n' % (time.time() - t))

    return message


# ---------------------------------------------------------------------------------
 def pass_args(args):
    # Method to pass multiple arguments to download (multiprocessing.Pool)
    return download(*args)


# ---------------------------------------------------------------------------------
class Console(QtGui.QDialog):
    def __init__(self):
        super(self.__class__, self).__init__()

        self.resize(600, 300)
        self.setMinimumSize(QtCore.QSize(600, 300))
        self.setWindowTitle("Console")
        self.setModal(True)

        self.verticalLayout = QtGui.QVBoxLayout(self)

        # Text edit
        # -------------------------------------------------------------------------

        self.text_edit = QtGui.QPlainTextEdit(self)
        self.text_edit.setReadOnly(True)
        self.text_edit_cursor = QtGui.QTextCursor(self.text_edit.document())
        self.verticalLayout.addWidget(self.text_edit)

        # Ok / Close
        # -------------------------------------------------------------------------
        self.button_box = QtGui.QDialogButtonBox(self)
        self.button_box.setStandardButtons(QtGui.QDialogButtonBox.Close | 
                                           QtGui.QDialogButtonBox.Ok)
        self.button_box.setObjectName("button_box")
        self.verticalLayout.addWidget(self.button_box)

        # Connect definition
        # -------------------------------------------------------------------------

        self.connect(self.button_box.button(QtGui.QDialogButtonBox.Close),
                     QtCore.SIGNAL('clicked()'),
                     self.button_cancel_clicked)
        self.connect(self.button_box.button(QtGui.QDialogButtonBox.Ok),
                     QtCore.SIGNAL('clicked()'),
                     self.button_ok_clicked)

        # Post initialization
        # -------------------------------------------------------------------------
        self.pool = multiprocessing.Pool(processes=workers)

    # Connect functions
    # -----------------------------------------------------------------------------
    def button_cancel_clicked(self):
        self.close()

    def button_ok_clicked(self):
        jobs_args = [(surface_id, index) for index, surface_id in enumerate(ids)]
        async = pool.map_async(pass_args, jobs_args)
        pool.close()

        # Busy waiting loop
        while True:
            # pool.map_async has a _number_left attribute, and a ready() method
            if async.ready():
                self.write_stream("All tasks completed\n")
                pool.join()
                for line in async.get():
                    self.write_stream(line)
                break

            remaining = async._number_left
            self.write_stream("Waiting for %d task(s) to complete...\n" % remaining)
            time.sleep(0.5)


    # Other functions
    # -----------------------------------------------------------------------------
    def write_stream(self, text):
        self.text_edit.insertPlainText(text)
        cursor = self.text_edit.textCursor()
        self.text_edit.setTextCursor(cursor)
        app.processEvents()


# ---------------------------------------------------------------------------------
if __name__ == '__main__':
    app = QtGui.QApplication(sys.argv)
    window = Console()
    window.show()
    app.exec_()

问题

  1. 乍一看,上面的代码是否存在概念错误?
  2. 在这种特定情况下,我是否必须使用 apply_async 方法才能获得更具交互性的内容?
  3. 您能否指导我如何使用回调函数发布自定义事件来更新控制台(@ekhumoro 建议的方法)?

2017年11月25日更新

我尝试了 apply_async:

def button_ok_clicked(self):
    # Pool.apply_async - the call returns immediately instead of 
    # waiting for the result
    for index, surface_id in enumerate(ids):
        async = pool.apply_async(download, 
                                 args=(surface_id, index),
                                 callback=self.write_stream)
    pool.close()

带回调:

def write_stream(self, text):
    # This is called whenever pool.apply_async(i) returns a result
    self.text_edit.insertPlainText(text)
    cursor = self.text_edit.textCursor()
    self.text_edit.setTextCursor(cursor)
    # Update the text edit
    app.processEvents()

不幸的是,这样做会导致应用程序崩溃。我想我必须设置一个锁定机制来防止所有任务同时写入文本编辑。


下面是示例脚本的简化版本,展示了如何使用回调发布自定义事件。每个作业都通过单独处理apply_async,因此更新一个简单的计数器来指示所有作业何时完成。

import sys, time, random, multiprocessing
from PyQt4 import QtCore, QtGui

ids = (10547, 3071, 13845, 13846, 13851, 13844, 5639, 4612, 4613, 954,
       961, 962, 4619, 4620, 4622, 4623, 4624, 4627, 4628, 4631,
       4632, 4634, 4635, 4638, 4639, 4640, 4641, 4642, 10722, 1300,
       1301, 1303, 1310, 1319, 1316, 1318, 1321, 1322, 1323, 1324,
       1325, 1347, 1348, 1013, 1015, 1320, 8285, 8286, 8287, 10329,
       9239, 9039, 5006, 5009, 5011, 5012, 5013, 5014, 5015, 5025,
       5026, 4998, 5040, 5041, 5042, 5043, 11811, 2463, 2464, 5045,
       5046, 5047, 5048, 5049, 5053, 5060, 5064, 5065, 5068, 5069,
       5071, 5072, 5075, 5076, 5077, 5079, 5080, 5081, 5082, 5083,
       5084, 5085, 5086, 5087, 5088, 5090, 5091, 5092, 5093)

def download(surface_id, index):
    t = time.time()
    message = 'Surface #%s (%s) - Process started\n' % (index, surface_id)
    time.sleep(random.random())
    message += 'Process ended : %.2f s\n' % (time.time() - t)
    return message

def pass_args(args):
    return download(*args)

class CustomEvent(QtCore.QEvent):
    DownloadComplete = QtCore.QEvent.registerEventType()

    def __init__(self, typeid, *args):
        super().__init__(typeid)
        self.data = args

class Console(QtGui.QDialog):
    def __init__(self):
        super().__init__()
        self.resize(600, 300)
        self.setMinimumSize(QtCore.QSize(600, 300))
        self.setWindowTitle("Console")
        self.verticalLayout = QtGui.QVBoxLayout(self)
        self.text_edit = QtGui.QPlainTextEdit(self)
        self.text_edit.setReadOnly(True)
        self.text_edit_cursor = QtGui.QTextCursor(self.text_edit.document())
        self.verticalLayout.addWidget(self.text_edit)
        self.button_box = QtGui.QDialogButtonBox(self)
        self.button_box.setStandardButtons(
            QtGui.QDialogButtonBox.Close | QtGui.QDialogButtonBox.Ok)
        self.button_box.setObjectName("button_box")
        self.verticalLayout.addWidget(self.button_box)
        self.button_box.button(QtGui.QDialogButtonBox.Close
            ).clicked.connect(self.button_cancel_clicked)
        self.button_box.button(QtGui.QDialogButtonBox.Ok
            ).clicked.connect(self.button_ok_clicked)
        self.pool = multiprocessing.Pool(None)

    def event(self, event):
        if event.type() == CustomEvent.DownloadComplete:
            message, complete = event.data
            self.write_stream(message)
            if complete:
                self.write_stream('Downloads complete!')
        return super().event(event)

    def button_cancel_clicked(self):
        self.close()

    def button_ok_clicked(self):
        total = len(ids)
        def callback(message):
            nonlocal total
            total -= 1
            QtGui.qApp.postEvent(self, CustomEvent(
                CustomEvent.DownloadComplete, message, not total))
        for index, surface_id in enumerate(ids):
            self.pool.apply_async(
                pass_args, [(surface_id, index)], callback=callback)

    def write_stream(self, text):
        self.text_edit.insertPlainText(text)
        cursor = self.text_edit.textCursor()
        self.text_edit.setTextCursor(cursor)

if __name__ == '__main__':

    app = QtGui.QApplication(sys.argv)
    window = Console()
    window.show()
    app.exec_()
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

线程可以处理很长的 I/O 进程吗 的相关文章

  • 将 Matplotlib 误差线放置在不位于条形中心的位置

    我正在 Matplotlib 中生成带有错误栏的堆积条形图 不幸的是 某些层相对较小且数据多样 因此多个层的错误条可能重叠 从而使它们难以或无法读取 Example 有没有办法设置每个误差条的位置 即沿 x 轴移动它 以便重叠的线显示在彼此
  • 如何在flask中使用g.user全局

    据我了解 Flask 中的 g 变量 它应该为我提供一个全局位置来存储数据 例如登录后保存当前用户 它是否正确 我希望我的导航在登录后在整个网站上显示我的用户名 我的观点包含 from Flask import g among other
  • 使用带有关键字参数的 map() 函数

    这是我尝试使用的循环map功能于 volume ids 1 2 3 4 5 ip 172 12 13 122 for volume id in volume ids my function volume id ip ip 我有办法做到这一点
  • 使用 matplotlib 绘制时间序列数据并仅在年初显示年份

    rcParams date autoformatter month b n Y 我正在使用 matpltolib 来绘制时间序列 如果我按上述方式设置 rcParams 则生成的图会在每个刻度处标记月份名称和年份 我怎样才能将其设置为仅在每
  • Final字段的线程安全

    假设我有一个 JavaBeanUser这是从另一个线程更新的 如下所示 public class A private final User user public A User user this user user public void
  • 如何加速Python中的N维区间树?

    考虑以下问题 给定一组n间隔和一组m浮点数 对于每个浮点数 确定包含该浮点数的区间子集 这个问题已经通过构建一个解决区间树 https en wikipedia org wiki Interval tree 或称为范围树或线段树 已经针对一
  • AWS EMR Spark Python 日志记录

    我正在 AWS EMR 上运行一个非常简单的 Spark 作业 但似乎无法从我的脚本中获取任何日志输出 我尝试过打印到 stderr from pyspark import SparkContext import sys if name m
  • 绘制方程

    我正在尝试创建一个函数 它将绘制我告诉它的任何公式 import numpy as np import matplotlib pyplot as plt def graph formula x range x np array x rang
  • 在Python中获取文件描述符的位置

    比如说 我有一个原始数字文件描述符 我需要根据它获取文件中的当前位置 import os psutil some code that works with file lp lib open path to file p psutil Pro
  • Pygame:有没有简单的方法可以找到按下的任何字母数字的字母/数字?

    我目前正在开发的游戏需要让人们以自己的名义在高分板上计时 我对如何处理按键有点熟悉 但我只处理过寻找特定的按键 有没有一种简单的方法可以按下任意键的字母 而不必执行以下操作 for event in pygame event get if
  • 在f字符串中转义字符[重复]

    这个问题在这里已经有答案了 我遇到了以下问题f string gt gt gt a hello how to print hello gt gt gt f a a gt gt gt f a File
  • 如何在seaborn displot中使用hist_kws

    我想在同一图中用不同的颜色绘制直方图和 kde 线 我想为直方图设置绿色 为 kde 线设置蓝色 我设法弄清楚使用 line kws 来更改 kde 线条颜色 但 hist kws 不适用于显示 我尝试过使用 histplot 但我无法为
  • 每个 X 具有多个 Y 值的 Python 散点图

    我正在尝试使用 Python 创建一个散点图 其中包含两个 X 类别 cat1 cat2 每个类别都有多个 Y 值 如果每个 X 值的 Y 值的数量相同 我可以使用以下代码使其工作 import numpy as np import mat
  • 如何计算 pandas 数据帧上的连续有序值

    我试图从给定的数据帧中获取连续 0 值的最大计数 其中包含来自 pandas 数据帧的 id date value 列 如下所示 id date value 354 2019 03 01 0 354 2019 03 02 0 354 201
  • 使用其构造函数初始化 OrderedDict 以便保留初始数据的顺序的正确方法?

    初始化有序字典 OD 以使其保留初始数据的顺序的正确方法是什么 from collections import OrderedDict Obviously wrong because regular dict loses order d O
  • Scrapy:如何使用元在方法之间传递项目

    我是 scrapy 和 python 的新手 我试图将 parse quotes 中的项目 item author 传递给下一个解析方法 parse bio 我尝试了 request meta 和 response meta 方法 如 sc
  • Python 类继承 - 诡异的动作

    我观察到类继承有一个奇怪的效果 对于我正在处理的项目 我正在创建一个类来充当另一个模块的类的包装器 我正在使用第 3 方 aeidon 模块 用于操作字幕文件 但问题可能不太具体 以下是您通常如何使用该模块 project aeidon P
  • Windows 和 Linux 上的线程

    我在互联网上看到过在 Windows 上使用 C 制作多线程应用程序的教程 以及在 Linux 上执行相同操作的其他教程 但不能同时用于两者 是否存在即使在 Linux 或 Windows 上编译也能工作的函数 您需要使用一个包含两者的实现
  • Python Selenium:如何在文本文件中打印网站上的值?

    我正在尝试编写一个脚本 该脚本将从 tulsaspca org 网站获取以下 6 个值并将其打印在 txt 文件中 最终输出应该是 905 4896 7105 23194 1004 42000 放置的动物 的 HTML span class
  • Statsmodels.formula.api OLS不显示截距的统计值

    我正在运行以下源代码 import statsmodels formula api as sm Add one column of ones for the intercept term X np append arr np ones 50

随机推荐

  • Tomcat 7 支持 Java 8 吗?

    In Tomcat官方页面 http tomcat apache org whichversion html它说 Tomcat 7 支持 Java 8 如果我下载这个并使用 Java 8 运行它就可以工作 但是 在 Openshift 上是
  • 如何使 gif 在黑莓 java 上完成后消失?

    我知道如何加载 gif 并让它运行 但是如何让它消失呢 IE 它位于另一个位图背景之上 看起来像与背景交互 我希望它在完成一次后消失 您可以将 gif 设置为不重复并使最终帧 100 透明
  • TextView的设置阻止其他TextView的跑马灯滚动

    这是在其他地方问过的 但该解决方案对我不起作用 因此 在更多背景下再次提出它 问题是活动包含滚动音乐标题文本视图 该视图被更新的经过时间计数器文本视图中断 我的活动布局中有这两个 TextView 小部件 尽管它们被其他布局容器包含
  • 尝试创建 100MB 缓冲区时出现分段错误

    我正在尝试将一个大的二进制文件写入 C 程序的缓冲区中 在尝试创建与文件读取大小相同的缓冲区后 GDB 总是会出现段错误 它要么在 fclose pf 倒带或 f open 上失败 这让我相信当我尝试创建缓冲区时出现了问题 我的代码段如下
  • rmagick安装[关闭]

    Closed 这个问题是无关 help closed questions 目前不接受答案 我在安装 Rmagick 时遇到一些问题 有两种安装方法 1 使用Ruby Gem 2 bld来源 我在这两方面都面临问题 但我希望能够获得 gem
  • 有什么方法可以检查是否强制执行严格模式?

    无论如何 是否要检查是否强制执行严格模式 use strict 并且我们希望为严格模式执行不同的代码 为非严格模式执行其他代码 寻找类似的功能isStrictMode boolean 事实是this在全局上下文中调用的函数内部不会指向全局对
  • HAProxy 随机空响应

    我安装了 HAPROXY 以实现两台服务器之间的平衡 不幸的是 HAPROXY 返回随机 ERR EMPTY RESPONSE 我也安装了统计信息 但统计信息没有出现频繁地因为有时会显示统计数据 我和一些朋友仔细检查了我的配置 没有发现问题
  • 从提升的子进程获取错误和标准输出

    我创建了一个进程处理程序 它启动两种类型的进程 使用管理员用户名和密码提升的权限 另一种无需输入任何用户名和密码即可正常运行 我正在努力弄清楚如何从提升的进程中获取输出 启动进程的应用程序不需要管理员凭据即可运行 管理员凭据输入到单独的加密
  • Angular2 i18n 用于占位符文本

    有没有办法使用 Angular 2 的 i18n 翻译输入文本字段的占位符文本
  • 从派生类访问基类公共成员

    是否可以从程序中其他位置的派生类实例访问基类公共成员 class base public int x base int xx x xx class derived base public derived int xx base xx cla
  • 从python句子中删除非英语单词

    我编写了一个代码 用于向 Google 发送查询并返回结果 我从这些结果中提取片段 摘要 以进行进一步处理 然而 有时这些片段中会出现我不想要的非英语单词 例如 u02b0w u025bn w u025bn unstressed u02b0
  • 多次按下按钮时声音重叠

    当我按下一个按钮 然后按下另一个按钮时 声音会重叠 我该如何解决这个问题 以便在按下另一个声音时第一个声音停止 void playOnce NSString aSound NSString path NSBundle mainBundle
  • ORA-01840: 输入值对于 Oracle Insert 使用 Select 中的日期格式来说不够长

    我有以下查询 其中出现错误ORA 01840 input value not long enough for date format The C DATE列是日期数据类型 INSERT INTO CS LOG NAME ID C DATE
  • React.PropTypes.func.isRequired 的问题

    我是 React 新手 正在尝试定义 PropTypes 但似乎它不再起作用 以下是我如何使用它 React PropTypes func isRequired Below is the error am getting 那么这就是我所缺少
  • 是否可以检查 CSS 变量是否已定义?

    我想知道是否可以仅在定义了 css 变量的情况下应用 CSS 规则 我已经看到可以定义默认值 例如 background color var bgColor red 但我认为这不会在我的项目中起作用 因为我想要的是 当未定义变量来获取该行在
  • Conda - 从防火墙后面的 .whl 文件安装tensorflow

    我有一个Anaconda3 与 Python 3 6 Spyder 环境 正在尝试安装tensorflow但是 由于公司防火墙的原因 无法使用标准的 pip 安装 此外 出于同样的原因 我无法创建 anaconda 环境 我想做的是直接从安
  • Levene 检验的多重比较事后检验

    我想在 R 中对 Levene 的测试进行成对比较事后测试 我知道如何使用 PROC GLM 在 SAS 中执行此操作 但我似乎不知道如何在 R 中执行此操作 有人有吗主意 在下面的示例中 我希望能够测试所有 猫 级别 即 A B A C
  • Scala 中的无符号变量

    我正在将一些 C 代码转换为 Scala 因为我们正在 据称 进入企业大厦的现代世界 至少我是被告知的 某些 C 代码使用无符号变量 这些变量对其执行了大量位级 移位 操作 我对如何将它们转换为 Scala 完全处于停滞状态 因为我相信 S
  • PrimeFaces。渲染后更新数据表

    我有一个数据表并想要保留过滤器 我可以保存过滤器值并通过调用数据表将它们放回 我将过滤器值放回到渲染中 现在我想要过滤表 是的 我想调用服务并从中获取所有数据 然后我想使用保留在过滤字段中的值来过滤表 我找不到在渲染表格后启动过滤的解决方案
  • 线程可以处理很长的 I/O 进程吗

    我在这里开始一个新主题 该主题将与这个问题 https stackoverflow com questions 47250025 qthreadpool how to interrupt how to use wisely the wait