如何将多处理与请求模块一起使用?

2023-11-22

我是 python 的新开发者。我的代码是下面的代码:

import warnings
import requests
import multiprocessing

from colorama import init
init(autoreset=True)

from requests.packages.urllib3.exceptions import InsecureRequestWarning
warnings.simplefilter("ignore", UserWarning)
warnings.simplefilter('ignore', InsecureRequestWarning)

from bs4 import BeautifulSoup as BS

headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}


class Worker(multiprocessing.Process):

    def run(self):
        with open('ips.txt', 'r') as urls:
            for url in urls.readlines():
                req = url.strip()
                try:
                    page = requests.get(req, headers=headers, verify=False, allow_redirects=False, stream=True,
                                        timeout=10)
                    soup = BS(page.text)
                    # string = string.encode('ascii', 'ignore')
                    print('\033[32m' + req + ' - Title: ', soup.title)
                except requests.RequestException as e:
                    print('\033[32m' + req + ' - TimeOut!')
        return


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = Worker()
        jobs.append(p)
        p.start()
    for j in jobs:
        j.join()

我正在尝试让程序读取IPs.txt并打印出每个网站的标题。

它在单线程中完美运行。现在我想通过使用使它更快multiprocessing.

但由于某种原因它只输出同一行 5 次。我是多处理新手,并尽力尝试失败。

显示问题的屏幕截图:

screen shot showing problem

我只想运行 5 个工人来检查IPs.txt在多线程或并行中......我只是想让它更快。

有什么提示、线索、帮助吗?


Issue

您的代码中的主要问题是每个Worker opens ips.txt从头开始并适用于中找到的每个 URLips.txt。于是五个工人一起打开ips.txt五次,每个 URL 工作五次。

Solution

解决这个问题的正确方法是将代码拆分为master and worker。您已经实现了大部分工作代码。让我们来看看主要部分(在if __name__ == '__main__':)现在作为主人。

现在master应该启动五个worker并通过队列向他们发送工作(multiprocessing.Queue).

The multiprocessing.Queue类提供了一种方法,让多个生产者可以将数据放入其中,多个消费者可以从中读取数据,而不会遇到竞争条件。此类实现了所有必要的锁定语义,以便在多处理上下文中安全地交换数据并防止竞争条件。

固定码

以下是如何按照我上面描述的方式重写您的代码:

import warnings
import requests
import multiprocessing

from colorama import init
init(autoreset=True)

from requests.packages.urllib3.exceptions import InsecureRequestWarning
warnings.simplefilter("ignore", UserWarning)
warnings.simplefilter('ignore', InsecureRequestWarning)

from bs4 import BeautifulSoup as BS

headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}


class Worker(multiprocessing.Process):

    def __init__(self, job_queue):
        super().__init__()
        self._job_queue = job_queue

    def run(self):
        while True:
            url = self._job_queue.get()
            if url is None:
                break

            req = url.strip()

            try:
                page = requests.get(req, headers=headers, verify=False, allow_redirects=False, stream=True,
                                    timeout=10)
                soup = BS(page.text)
                # string = string.encode('ascii', 'ignore')
                print('\033[32m' + req + ' - Title: ', soup.title)
            except requests.RequestException as e:
                print('\033[32m' + req + ' - TimeOut!')


if __name__ == '__main__':
    jobs = []
    job_queue = multiprocessing.Queue()

    for i in range(5):
        p = Worker(job_queue)
        jobs.append(p)
        p.start()

    # This is the master code that feeds URLs into queue.
    with open('ips.txt', 'r') as urls:
        for url in urls.readlines():
            job_queue.put(url)

    # Send None for each worker to check and quit.
    for j in jobs:
        job_queue.put(None)

    for j in jobs:
        j.join()

我们在上面的代码中可以看到master打开ips.txt一旦,从其中一一读取 URL 并将它们放入队列中。每个工作人员都会等待 URL 到达此队列。一旦 URL 到达队列,其中一个工作人员就会拿起它并开始忙碌。如果队列中有更多 URL,则下一个空闲工作人员会选择下一个,依此类推。

最后,我们需要某种方式让工人在所有工作完成后退出。有多种方法可以实现这一目标。在此示例中,我选择了发送五个标记值(五个None值(在这种情况下)放入队列中,每个工作人员一个,以便每个工作人员都可以拿起它并退出。

还有另一种策略,工人和主人共享一个multiprocessing.Event对象就像他们共享一个multiprocessing.Queue现在就反对。主机调用set()每当它希望工作人员退出时,都会调用该对象的方法。工作人员检查该对象是否is_set()然后退出。但是,这会给代码带来一些额外的复杂性。我在下面讨论过这个问题。

为了完整起见,也为了演示最小、完整和可验证的示例,我在下面提供了两个代码示例,它们显示了两种停止策略。

使用哨兵值阻止工人

到目前为止,这与我上面描述的内容差不多,只是代码示例已被大大简化,以消除对 Python 标准库之外的任何库的依赖。

在下面的示例中值得注意的另一件事是,我们没有创建工作类,而是使用工作函数并创建一个Process出来了。这种类型的代码经常在 Python 文档中找到,而且非常惯用。

import multiprocessing
import time
import random


def worker(input_queue):
    while True:
        url = input_queue.get()

        if url is None:
            break

        print('Started working on:', url)

        # Random delay to simulate fake processing.
        time.sleep(random.randint(1, 3))

        print('Stopped working on:', url)


def master():
    urls = [
        'https://example.com/',
        'https://example.org/',
        'https://example.net/',
        'https://stackoverflow.com/',
        'https://www.python.org/',
        'https://github.com/',
        'https://susam.in/',
    ]

    input_queue = multiprocessing.Queue()
    workers = []

    # Create workers.
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(input_queue, ))
        workers.append(p)
        p.start()

    # Distribute work.
    for url in urls:
        input_queue.put(url)

    # Ask the workers to quit.
    for w in workers:
        input_queue.put(None)

    # Wait for workers to quit.
    for w in workers:
        w.join()

    print('Done')


if __name__ == '__main__':
    master()

使用事件来停止 Worker

使用multiprocessing.Event对象在工人应该退出时发出信号会在代码中引入一些复杂性。主要需要进行三项更改:

  • 在master中,我们调用set()方法上的Event反对发出工人应尽快辞职的信号。
  • 在worker中,我们调用is_set()的方法Event定期反对以检查是否应该退出。
  • 在master中,我们需要使用multiprocessing.JoinableQueue代替multiprocessing.Queue这样它就可以在要求工作人员退出之前测试队列是否已被工作人员完全消耗掉。
  • 在worker中,我们需要调用task_done()消耗队列中的每个项目后队列的方法。这对于主机调用是必要的join()方法来测试队列是否已被清空。

所有这些更改都可以在下面的代码中找到:

import multiprocessing
import time
import random
import queue


def worker(input_queue, stop_event):
    while not stop_event.is_set():
        try:
            # Check if any URL has arrived in the input queue. If not,
            # loop back and try again.
            url = input_queue.get(True, 1)
            input_queue.task_done()
        except queue.Empty:
            continue

        print('Started working on:', url)

        # Random delay to simulate fake processing.
        time.sleep(random.randint(1, 3))

        print('Stopped working on:', url)


def master():
    urls = [
        'https://example.com/',
        'https://example.org/',
        'https://example.net/',
        'https://stackoverflow.com/',
        'https://www.python.org/',
        'https://github.com/',
        'https://susam.in/',
    ]

    input_queue = multiprocessing.JoinableQueue()
    stop_event = multiprocessing.Event()

    workers = []

    # Create workers.
    for i in range(5):
        p = multiprocessing.Process(target=worker,
                                    args=(input_queue, stop_event))
        workers.append(p)
        p.start()

    # Distribute work.
    for url in urls:
        input_queue.put(url)

    # Wait for the queue to be consumed.
    input_queue.join()

    # Ask the workers to quit.
    stop_event.set()

    # Wait for workers to quit.
    for w in workers:
        w.join()

    print('Done')


if __name__ == '__main__':
    master()
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何将多处理与请求模块一起使用? 的相关文章

  • 使用Python开发Web应用程序

    我一直在用 python 做一些工作 但这都是针对独立应用程序的 我很想知道 python 的任何分支是否支持 Web 开发 有人还会建议一个好的教程或网站吗 我可以从中学习一些使用 python 进行 Web 开发的基础知识 既然大家都说
  • 如何在刻度标签和轴之间添加空间

    我已成功增加刻度标签的字体 但现在它们距离轴太近了 我想在刻度标签和轴之间添加一点呼吸空间 如果您不想全局更改间距 通过编辑 rcParams 并且想要更简洁的方法 请尝试以下操作 ax tick params axis both whic
  • 使用 openCV 对图像中的子图像进行通用检测

    免责声明 我是计算机视觉菜鸟 我看过很多关于如何在较大图像中查找特定子图像的堆栈溢出帖子 我的用例有点不同 因为我不希望它是具体的 而且我不确定如何做到这一点 如果可能的话 但我感觉应该如此 我有大量图像数据集 有时 其中一些图像是数据集的
  • 如何使用固定的 pandas 数据框进行动态 matplotlib 绘图?

    我有一个名为的数据框benchmark returns and strategy returns 两者具有相同的时间跨度 我想找到一种方法以漂亮的动画风格绘制数据点 以便它显示逐渐加载的所有点 我知道有一个matplotlib animat
  • 如何收集列表、字典等中重复计算的结果(或制作修改每个元素的列表的副本)?

    There are a great many existing Q A on Stack Overflow on this general theme but they are all either poor quality typical
  • DreamPie 不适用于 Python 3.2

    我最喜欢的 Python shell 是DreamPie http dreampie sourceforge net 我想将它与 Python 3 2 一起使用 我使用了 添加解释器 DreamPie 应用程序并添加了 Python 3 2
  • pandas 替换多个值

    以下是示例数据框 gt gt gt df pd DataFrame a 1 1 1 2 2 b 11 22 33 44 55 gt gt gt df a b 0 1 11 1 1 22 2 1 33 3 2 44 4 3 55 现在我想根据
  • Python tcl 未正确安装

    我刚刚为 python 安装了graphics py 但是当我尝试运行以下代码时 from graphics import def main win GraphWin My Circle 100 100 c Circle Point 50
  • 使用 Pycharm 在 Windows 下启动应用程序时出现 UnicodeDecodeError

    问题是当我尝试启动应用程序 app py 时 我收到以下错误 UnicodeDecodeError utf 8 编解码器无法解码位置 5 中的字节 0xb3 起始字节无效 整个文件app py coding utf 8 from flask
  • 在pyyaml中表示具有相同基类的不同类的实例

    我有一些单元测试集 希望将每个测试运行的结果存储为 YAML 文件以供进一步分析 YAML 格式的转储数据在几个方面满足我的需求 但测试属于不同的套装 结果有不同的父类 这是我所拥有的示例 gt gt gt rz shorthand for
  • Abaqus 将曲面转化为集合

    我一直试图在模型中找到两个表面的中心 参见照片 但未能成功 它们是元素表面 面 查询中没有选项可以查找元素表面的中心 只能查找元素集的中心 找到节点集的中心也很好 但是我的节点集没有出现在工具 gt 查询 gt 质量属性选项中 而且我找不到
  • Python:字符串不会转换为浮点数[重复]

    这个问题在这里已经有答案了 我几个小时前写了这个程序 while True print What would you like me to double line raw input gt if line done break else f
  • 检查所有值是否作为字典中的键存在

    我有一个值列表和一本字典 我想确保列表中的每个值都作为字典中的键存在 目前我正在使用两组来确定字典中是否存在任何值 unmapped set foo set bar keys 有没有更Pythonic的方法来测试这个 感觉有点像黑客 您的方
  • 在 Pandas DataFrame Python 中添加新列[重复]

    这个问题在这里已经有答案了 例如 我在 Pandas 中有数据框 Col1 Col2 A 1 B 2 C 3 现在 如果我想再添加一个名为 Col3 的列 并且该值基于 Col2 式中 如果Col2 gt 1 则Col3为0 否则为1 所以
  • 在python中,如何仅搜索所选子字符串之前的一个单词

    给定文本文件中的长行列表 我只想返回紧邻其前面的子字符串 例如单词狗 描述狗的单词 例如 假设有这些行包含狗 hotdog big dog is dogged dog spy with my dog brown dogs 在这种情况下 期望
  • 如何使用google colab在jupyter笔记本中显示GIF?

    我正在使用 google colab 想嵌入一个 gif 有谁知道如何做到这一点 我正在使用下面的代码 它并没有在笔记本中为 gif 制作动画 我希望笔记本是交互式的 这样人们就可以看到代码的动画效果 而无需运行它 我发现很多方法在 Goo
  • 在 Python 类中动态定义实例字段

    我是 Python 新手 主要从事 Java 编程 我目前正在思考Python中的类是如何实例化的 我明白那个 init 就像Java中的构造函数 然而 有时 python 类没有 init 方法 在这种情况下我假设有一个默认构造函数 就像
  • 协方差矩阵的对角元素不是 1 pandas/numpy

    我有以下数据框 A B 0 1 5 1 2 6 2 3 7 3 4 8 我想计算协方差 a df iloc 0 values b df iloc 1 values 使用 numpy 作为 cov numpy cov a b I get ar
  • Spark.read 在 Databricks 中给出 KrbException

    我正在尝试从 databricks 笔记本连接到 SQL 数据库 以下是我的代码 jdbcDF spark read format com microsoft sqlserver jdbc spark option url jdbc sql
  • Python - 字典和列表相交

    给定以下数据结构 找出这两种数据结构共有的交集键的最有效方法是什么 dict1 2A 3A 4B list1 2A 4B Expected output 2A 4B 如果这也能产生更快的输出 我可以将列表 不是 dict1 组织到任何其他数

随机推荐

  • CXF 客户端代理线程安全吗?

    我正在使用 CXF 生成 SOAP 客户端类 在里面CXF 文档 他们写 JAX WS 客户端代理线程安全吗 JAX WS 官方答案 不 根据 JAX WS 规范 客户端代理不是线程安全的 要编写可移植代码 您应该将它们视为非线程安全并同步
  • 嵌套对象的远程 ViewModel 验证不起作用

    我有一个类用户 如下所示 public class User public int UserId get set Required ErrorMessage A username is required StringLength 20 Er
  • python中按多个条件排序

    我是编程新手 现在我正在用 python 编写排行榜 我想按第一积分对我的联赛进行排序 如果有两支球队积分相同 我想按净胜球对它们进行排序 如果它们有相同的净胜球 我想按名称排序 第一个条件非常简单 并且按以下方式工作 table sort
  • 使用 jQuery 查找下载链接后面的文件大小

    我想知道是否有一种方法可以使用 jQuery 来找出我链接到网页的 PDF 的文件大小 我想让鼠标悬停在下载链接上时 会弹出一个模式框 显示 PDF 的文件大小 我可以做第二位 我唯一想知道的是如何找出文件大小 我不知道 jQuery 是否
  • AWS beanstalk中的worker-tier和web-tier有什么区别

    在阅读文档时 我开始了解 AWS 中的这两层环境 但找不到它们之间的任何比较 文档中的建议是 应该为长时间运行的任务选择工作环境 以提高 Web 层的响应能力 我有几个问题来澄清我的疑惑 两层有何不同 关于执行不同的操作 每个操作中可用的服
  • 如何在 Spring 中自动装配泛型类型 的 Bean?

    我有一颗豆子Item
  • 我可以使用Spring5的WebClient返回的Flux的block()方法吗?

    我创建了 Spring Boot 2 0 演示应用程序 其中包含两个使用 WebClient 进行通信的应用程序 当我从 WebClient 的响应中使用 Flux 的 block 方法时 他们经常停止通信 这让我很痛苦 由于某些原因 我想
  • C++ 引用与返回值

    我理解引用的原则是避免复制大型结构 但是如果您正在编写的函数本身创建了大型结构怎么办 与将目标对象作为引用传递并从函数内部填充相比 在本地创建变量然后返回它是否效率较低 或者更有可能耗尽内存 我似乎无法很好地表达 所以一个具体的例子 假设一
  • JS 对象键带引号还是不带引号? [复制]

    这个问题在这里已经有答案了 可能的重复 带引号和不带引号的对象键有什么区别 感兴趣吗 什么是正确的方法 是否将对象键写在引号中 那是 var obj name Jhon or var obj name Jhon 例如 从 php 代码ech
  • Git 中的 HEAD 和 ORIG_HEAD

    这些符号代表什么以及它们的含义是什么 我在官方文档中找不到任何解释 HEAD是 直接或间接 即符号 对当前提交的引用 这是您已在工作目录中签入的提交 除非您进行了一些更改或同等更改 并且是在 git commit 之上创建新提交的提交 通常
  • 无法在具有 @objc 属性的协议中使用自定义类?

    我正在尝试创建一个用于 JSON 加载委托的协议 JSONLoaderDelegate 我的另一堂课叫做JSONLoader 应该将事件分派给它的委托 实现JSONLoaderDelegate协议 如 self delegate jsonL
  • Laravel 8 - 找不到驱动程序:Illuminate\Database\QueryException 无法找到驱动程序(SQL:从 `list` 中选择 *)

    我已经在我的 Linux Mint 20 上安装了 Laravel 8 作为我的个人实验 所以我对 Laravel 的新版本很陌生 我搜索了许多来源如何使用 CRUD 方法显示表 以便该表显示在网络中 其中包含来自 MySQL 数据库的数据
  • Spring data JPA:在结果元组中找不到别名!执行自定义查询时出错

    我正在尝试使用 mysql 数据库执行自定义查询 Queryspring data jpa 的注解 该表是 Field Type Null Key Default Extra id decimal 10 0 NO PRI NULL firs
  • iOS中如何实现弹出对话框?

    计算后 我想显示一个弹出窗口或警报框 向用户传达消息 有谁知道我在哪里可以找到有关此的更多信息 Yup a UIAlertView可能就是您正在寻找的 这是一个例子 UIAlertView alert UIAlertView alloc i
  • std::bind 创建的函子住在哪里?

    函数指针可以指向自由函数 函数对象 成员函数调用的包装器等任何内容 但是 std bind 创建的函子可以有状态 也可以有自定义创建的函子 该状态分配在哪里 谁在删除它 考虑下面的例子 当向量被删除时 状态 数字10 会被删除吗 谁知道在函
  • 计算GPS坐标以形成给定大小的半径

    我想出了一种方法 它接受坐标和范围 以英里为单位 并返回围绕原点形成圆圈的坐标列表 我似乎已经取得了一些进展 但我在降低范围部分方面遇到了问题 private const Double LAT MILE 0 0144839 private
  • 仅使用 XAML 在左键单击时显示上下文菜单

    WPF 的默认行为ContextMenu是当用户右键单击时显示它 我想要ContextMenu当用户单击鼠标左键时显示 看起来这应该是一个简单的属性ContextMenu 但事实并非如此 我操纵了它 这样我就可以处理LeftMouseBut
  • 为什么匿名函数表达式和命名函数表达式的初始化如此不同?

    我正在看第13条或 ECMAScript 规范 v 5 匿名函数表达式的初始化如下 返回按照 13 2 中的规定创建新 Function 对象的结果 其参数由 FormalParameterListopt 指定 主体由 FunctionBo
  • 作曲家自动加载

    我目前正在尝试将 PSR 0 自动加载与 Composer 结合使用 但出现以下错误 Fatal error Class Twitter Twitter not found 我的目录结构如下 Project src Twitter Twit
  • 如何将多处理与请求模块一起使用?

    我是 python 的新开发者 我的代码是下面的代码 import warnings import requests import multiprocessing from colorama import init init autores