使用 python 的多处理池和映射函数测量进度

2024-01-19

我用于并行 csv 处理的以下代码:

#!/usr/bin/env python

import csv
from time import sleep
from multiprocessing import Pool
from multiprocessing import cpu_count
from multiprocessing import current_process
from pprint import pprint as pp

def init_worker(x):
  sleep(.5)
  print "(%s,%s)" % (x[0],x[1])
  x.append(int(x[0])**2)
  return x

def parallel_csv_processing(inputFile, outputFile, header=["Default", "header", "please", "change"], separator=",", skipRows = 0, cpuCount = 1):
  # OPEN FH FOR READING INPUT FILE
  inputFH   = open(inputFile,  "rt")
  csvReader = csv.reader(inputFH, delimiter=separator)

  # SKIP HEADERS
  for skip in xrange(skipRows):
    csvReader.next()

  # PARALLELIZE COMPUTING INTENSIVE OPERATIONS - CALL FUNCTION HERE
  try:
    p = Pool(processes = cpuCount)
    results = p.map(init_worker, csvReader, chunksize = 10)
    p.close()
    p.join()
  except KeyboardInterrupt:
    p.close()
    p.join()
    p.terminate()

  # CLOSE FH FOR READING INPUT
  inputFH.close()

  # OPEN FH FOR WRITING OUTPUT FILE
  outputFH  = open(outputFile, "wt")
  csvWriter = csv.writer(outputFH, lineterminator='\n')

  # WRITE HEADER TO OUTPUT FILE
  csvWriter.writerow(header)

  # WRITE RESULTS TO OUTPUT FILE
  [csvWriter.writerow(row) for row in results]

  # CLOSE FH FOR WRITING OUTPUT
  outputFH.close()

  print pp(results)
  # print len(results)

def main():
  inputFile  = "input.csv"
  outputFile = "output.csv"
  parallel_csv_processing(inputFile, outputFile, cpuCount = cpu_count())

if __name__ == '__main__':
  main()

我想以某种方式测量脚本的进度(只是纯文本,而不是任何花哨的 ASCII 艺术)。我想到的一个选择是比较已成功处理的行init_worker到 input.csv 中的所有行,并打印实际状态,例如每一秒,你能指出我正确的解决方案吗?我发现了几篇有类似问题的文章,但我无法使其适应我的需要,因为都没有使用Pool类和map方法。我还想问一下p.close(), p.join(), p.terminate()方法,我主要看到过它们Process not Pool类,他们有必要吗Pool类以及我是否正确使用它们?使用p.terminate()本来是想用 ctrl+c 来终止进程,但这是不同的 https://stackoverflow.com/questions/32160054/keyboard-interrupts-with-pythons-multiprocessing-pool-and-map-function故事还没有美好的结局。谢谢。

PS:如果重要的话,我的 input.csv 看起来像这样:

0,0
1,3
2,6
3,9
...
...
48,144
49,147

PPS:正如我所说,我是新手multiprocessing我编写的代码可以正常工作。我看到的一个缺点是整个 csv 都存储在内存中,所以如果你们有更好的想法,请随时分享。

Edit

回复 @J.F.Sebastian

这是根据您的建议我的实际代码:

#!/usr/bin/env python

import csv
from time import sleep
from multiprocessing import Pool
from multiprocessing import cpu_count
from multiprocessing import current_process
from pprint import pprint as pp
from tqdm import tqdm

def do_job(x):
  sleep(.5)
  # print "(%s,%s)" % (x[0],x[1])
  x.append(int(x[0])**2)
  return x

def parallel_csv_processing(inputFile, outputFile, header=["Default", "header", "please", "change"], separator=",", skipRows = 0, cpuCount = 1):

  # OPEN FH FOR READING INPUT FILE
  inputFH   = open(inputFile,  "rb")
  csvReader = csv.reader(inputFH, delimiter=separator)

  # SKIP HEADERS
  for skip in xrange(skipRows):
    csvReader.next()

  # OPEN FH FOR WRITING OUTPUT FILE
  outputFH  = open(outputFile, "wt")
  csvWriter = csv.writer(outputFH, lineterminator='\n')

  # WRITE HEADER TO OUTPUT FILE
  csvWriter.writerow(header)

  # PARALLELIZE COMPUTING INTENSIVE OPERATIONS - CALL FUNCTION HERE
  try:
    p = Pool(processes = cpuCount)
    # results = p.map(do_job, csvReader, chunksize = 10)
    for result in tqdm(p.imap_unordered(do_job, csvReader, chunksize=10)):
      csvWriter.writerow(result)
    p.close()
    p.join()
  except KeyboardInterrupt:
    p.close()
    p.join()

  # CLOSE FH FOR READING INPUT
  inputFH.close()

  # CLOSE FH FOR WRITING OUTPUT
  outputFH.close()

  print pp(result)
  # print len(result)

def main():
  inputFile  = "input.csv"
  outputFile = "output.csv"
  parallel_csv_processing(inputFile, outputFile, cpuCount = cpu_count())

if __name__ == '__main__':
  main()

这是输出tqdm:

1 [elapsed: 00:05,  0.20 iters/sec]

这个输出是什么意思?在您引用的页面上tqdm在循环中使用以下方式:

>>> import time
>>> from tqdm import tqdm
>>> for i in tqdm(range(100)):
...     time.sleep(1)
... 
|###-------| 35/100  35% [elapsed: 00:35 left: 01:05,  1.00 iters/sec]

这个输出是有道理的,但是我的输出是什么意思呢?而且 ctrl+c 问题似乎也没有得到解决:点击 ctrl+c 脚本后会抛出一些回溯,如果我再次点击 ctrl+c 则会得到新的回溯,依此类推。杀死它的唯一方法是将其发送到后台(ctr+z),然后杀死它(kill %1)


要显示进度,请替换pool.map with pool.imap_unordered:

from tqdm import tqdm # $ pip install tqdm

for result in tqdm(pool.imap_unordered(init_worker, csvReader, chunksize=10)):
    csvWriter.writerow(result)

tqdm https://github.com/noamraph/tqdm部分是可选的,请参阅控制台中的文本进度栏 https://stackoverflow.com/q/3173320/4279

一不小心,它修复了你的“整个 csv 存储在内存中”和“键盘中断未引发”问题。

这是一个完整的代码示例:

#!/usr/bin/env python
import itertools
import logging
import multiprocessing
import time

def compute(i):
    time.sleep(.5)
    return i**2

if __name__ == "__main__":
    logging.basicConfig(format="%(asctime)-15s %(levelname)s %(message)s",
                        datefmt="%F %T", level=logging.DEBUG)
    pool = multiprocessing.Pool()
    try:
        for square in pool.imap_unordered(compute, itertools.count(), chunksize=10):
            logging.debug(square) # report progress by printing the result
    except KeyboardInterrupt:
        logging.warning("got Ctrl+C")
    finally:
        pool.terminate()
        pool.join()

您应该每隔一段时间就会看到批量输出.5 * chunksize秒。如果你按Ctrl+C;你应该看到KeyboardInterrupt在子进程和主进程中引发。在Python 3中,主进程立即退出。在 Python 2 中,KeyboardInterrupt延迟到应该打印下一批(Python 中的错误)。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 python 的多处理池和映射函数测量进度 的相关文章

随机推荐

  • ASP .NET Core 2.0 将“localhost”更改为“主机名”

    我有一个基于MVC框架编写的Web应用程序 它在本地主机和默认端口 51290 上运行得非常好 现在我需要使用我的域名运行它 例如我的主机名 我尝试的是在 applicationhost config 部分添加一行
  • 从后台返回时 AVCaptureSession 失败

    我有一个相机预览窗口 90 的时间都运行良好 然而 有时 当返回我的应用程序时 如果它位于后台 预览将不会显示 这是我在视图加载时调用的代码 void startCamera session AVCaptureSession alloc i
  • SearchView getActionView 返回 null

    前几天还可以用 但是突然就停止了 我只想在某些片段可见时使用操作栏搜索小部件 现在我无法获得SearchView now getActionView总是返回 null 可搜索 xml
  • 使用 php 和 mysql 发送提醒电子邮件而不使用 cron-job?

    我刚刚制作了一个 php 脚本 它将在约会开始前 2 天向网站管理员发送电子邮件提醒 我本来打算自动化脚本来运行 cron 作业 却发现我托管的人 疯狂的域 似乎没有 Cron 作业 有没有办法在没有 cron jobs 的情况下做到这一点
  • 转置没有聚合的行和列

    我有以下数据集 Account Contact 1 324324324 1 674323234 2 833343432 2 433243443 3 787655455 4 754327545 4 455435435 5 543544355
  • 如何在此 Builder 实现中摆脱 instanceof

    The idea 我需要创建命令 命令可以配置参数 并非每个命令都可以接收相同的参数 所以有些必须被忽略 我有一个抽象类 Command 其中定义了一个 Builder 默认情况下 每个附加参数都会抛出 UnsupportedOperati
  • 在 pandas 中使用带有元组列的查询

    我有一个 pandas df 其中一列作为元组 我想用query使用元组的第一个条目对 df 进行子集化 最好的方法是什么 我在 pandas 23 3 Python 3 6 6 MWE import pandas as pd df pd
  • Gitlab-ci.yml 创建合并请求

    我在 DEV 分支中运行以下 gitlab ci yml 文件 目标也为 DEV 由于我无法将 TARGET 指向 MASTER 因此不会自动创建 MR 我想知道是否可以在 gitlab ci 脚本本身中创建合并请求 dev stage d
  • 拉力赛中的速度图[关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 我正在开展一个项目 从拉力赛中提取数据并创建速度图表 我了解要使用的 REST Web 服务 API 是缺陷 迭代 分层需求和迭代累积
  • 具有 IDisposable 的无限状态机

    假设我有一个无限状态机来生成随机 md5 哈希值 public static IEnumerable
  • 使用 MongoDB 生成的 _ids 作为“秘密数据”(例如,OAuth 令牌)

    是 MongoDB id字段足够随机 不可猜测来充当秘密数据 例如 如果我正在构建服务器端 OAuth 我可以使用 id 作为用户的 OAuth 令牌吗 我想这样做是因为它为数据库提供了清洁性和可索引性 例如 tokens id gt oa
  • IntentService 中未调用 OnHandleIntent()

    我知道这个问题以前曾被问过 但我已经浏览了所有我能找到的答案 但仍然无法解决问题 问题是当 BroadcastReceiver 启动时 不会调用 IntentService onHandleIntent 奇怪的是 构造函数确实运行了 正如我
  • 将计算出的键添加到集合中

    请考虑这个由男人和女人组成的数据集 我根据几个变量在第二个时刻进行过滤 type ls JsonProvider lt gt let dt ls GetSamples let dt2 dt gt Seq filter fun c gt c
  • 处理 django 查询中的外来字符

    我正在构建从 GeoNames com 导入的城市名称搜索 有些城市的名称中带有国际字符 例如 伊斯坦布尔 实际上是数据库中的 伊斯坦布尔 当人们搜索 伊斯坦布尔 时 伊斯坦布尔不会出现 有没有一种方法可以在搜索中添加过滤器或解码器来知道
  • javascript从对象数组中获取键名称

    from data ja 大阪市 en Osaka 我想要得到 ja and en 我尝试了几种方法 data map function i return i 它返回 数字数组 console log Object keys Object
  • Espresso - 如何将 typeText 切换为英语或其他语言输入模式

    我正在使用 Espresso 来实现我的应用程序的自动测试框架 但在我设计的一些测试用例中 我发现我的测试总是失败 根本原因并不在于我对功能实现代码的测试代码 根本原因是在android输入法模式下 有时候 在中文输入模式下 我输入的文字是
  • jQuery .load 回调函数中 textStatus 参数的所有可能值是什么?

    我正在利用 jQuery 的回调函数 load http api jquery com load 方法来运行某些代码 如果textStatus的参数 loadmethod 等于某个字符串 例如我有 jQuery myContainer lo
  • VSCode 中具有语义突出显示的语言服务器

    我想写一个语言服务器VSCode具有语义突出显示支持 我使用的语言有非常复杂的规则 所以我不想依赖标记器来区分标识符和关键字 我已经在以下地区提供语言服务VS Community 我在那里编写了自己的分类器 可以编写自己的分类器VSCode
  • 在 OCR(光学字符识别)之前,您推荐使用什么软件进行图像增强? [关闭]

    Closed 这个问题不符合堆栈溢出指南 help closed questions 目前不接受答案 我们目前正在研究在提交 OCR 之前提高图像质量的方法 我们当前使用的 OCR 引擎是 Nuance v15 的 Scansoft API
  • 使用 python 的多处理池和映射函数测量进度

    我用于并行 csv 处理的以下代码 usr bin env python import csv from time import sleep from multiprocessing import Pool from multiproces