Python之多线程爬虫实践

2023-05-16

多线程爬虫实践

    • 一、多线程的介绍及threading的基本使用
      • 1. 什么是多线程?
      • 2. 如何创建一个基本的多线程:
    • 二、使用Thread类创建多线程
      • 1. 查看当前线程
      • 2. 继承自threading.Thread类
    • 三、多线程共享全局变量的问题
      • 1. 问题
      • 2. 锁机制和threading.Lock类
    • 四、Lock版生产者和消费者模式
      • 1.生产者和消费者模式
      • 2.Lock版生产者和消费者模式
      • 3.Condition版的生产者和消费者模式
    • 五、Queue线程安全队列
    • 六、GIL理解和正确的利用GIL
      • 1.多线程的GIL锁
      • 2.有了GIL,为什么还需要Lock呢?
      • 3.小结
    • 实例:爬取王者荣耀高清壁纸并下载到文件夹中
  • 结尾

一、多线程的介绍及threading的基本使用

1. 什么是多线程?

  • 默认情况下,一个程序只有一个进程和一个线程,代码是依次线性执行的;而多线程则可以并发执行,相当于一次性多个人做多件事,效率就会比单线程快
  • 具体概念可查看百度百科:https://baike.baidu.com/item/多线程/1190404

2. 如何创建一个基本的多线程:

使用threading模块中的Thread类即可创建一个线程,Thread类中有一个target参数,需要指定一个函数,将此线程执行的时候,这个代码就会被执行,示例代码如下:

import threading
import time

def coding():
    for i in range(3):
        print(i)
        time.sleep(1)

def drawing():
    for i in range(4, 7):
        print(i)
        time.sleep(1)
        
def multi_thread():
    thr1 = threading.Thread(target=coding)
    thr2 = threading.Thread(target=drawing)
    thr1.start()
    thr2.start()

if __name__ == '__main__':
    multi_thread()

二、使用Thread类创建多线程

1. 查看当前线程

  • 使用threading.current_thread(),在线程中执行该函数,会返回当前线程的对象
  • 使用threading.enumerate()可以获取整个程序中所有的线程(每个程序都会默认有一个主线程MainThread)

2. 继承自threading.Thread类

  • 我们自己写的类必须继承自threading.Thread类
  • 线程代码需要放在run方法中执行
  • 以后创建线程的时候,直接使用我们自己创建的类来创建线程就可以了
  • 为什么要使用类的方式来创建线程,原因如下:
    (1)类可以让线程代码更好的封装
    (2)类可以更加方便地管理我们的代码
    (3)可以让我们使用面向对象来编程
    示例代码如下:
import threading
import time

class coding(threading.Thread):
    def run(self) -> None:
        for i in range(3):
            print(i)
            time.sleep(1)

class drawing(threading.Thread):
    def run(self) -> None:
        for i in range(4, 7):
            print(i)
            time.sleep(1)

def multi_thread():
    th1 = coding()
    th2 = drawing()
    th1.start()
    th2.start()

if __name__ == '__main__':
    multi_thread()

三、多线程共享全局变量的问题

1. 问题

多线程都是在同一个进程中运行的,因此在进程中的全局变量所有线程都是共享的,这就造成了一个问题,因为线程执行的顺序是无序的,有可能会造成数据错误,比如以下代码:

import threading

x = 0
def add():
    # 如果在函数中修改全局变量,需要使用关键字global进行申明
    global x
    for i in range(10000000):
        x += 1
    print(x)

def main():
    for i in range(2):
        th = threading.Thread(target=add)
        th.start()

if __name__ == '__main__':
    main()

按照正常情况,最后的结果应该是10000000和20000000,但是因为多线程运行的不确定性,导致最后的结果也会是随机的
在这里插入图片描述

2. 锁机制和threading.Lock类

为了解决共享全局变量的问题,threading提供了一个Lock类,这个类可以在某个线程访问某个变量的时候加锁,其他线程此时就不能进来,知道当前线程处理完后,把锁释放了,其他线程才能进来处理,示例代码如下:

import threading

x = 0
glock = threading.Lock()  # 创建一个锁

def add():
    global x
    glock.acquire()  # 上锁
    for i in range(10000000):
        x += 1
    print(x)
    glock.release()  # 释放锁

def main():
    for i in range(2):
        th = threading.Thread(target=add)
        th.start()

if __name__ == '__main__':
    main()

结果如下:
在这里插入图片描述
使用锁的原则:
(1)把尽量少的和不耗时的代码放到锁中执行
(2)代码执行完成后记得释放锁


四、Lock版生产者和消费者模式

1.生产者和消费者模式

生产者和消费者模式是多线程开发中经常见到的一种模式。生产者的线程专门用来生成一些数据,然后存放到一个中间变量中。消费者再从这个中间变量中取出数据进行消费。通过生产者和消费者模式,可以让代码达到高内聚低耦合的目标,程序分工更加明确,线程更加方便管理
在这里插入图片描述

2.Lock版生产者和消费者模式

生产者和消费者因为要使用中间变量,而中间变量经常是一些全局变量,因此需要使用锁在保证数据完整性
生产者和消费者模式模拟的示例代码如下:

import threading
import random
import time

# 创建中间变量
moneys = 0
glock = threading.Lock()  # 创建lock锁
# 全局变量,记录生产者的生产次数
times = 0

# 创建生产者
class producer(threading.Thread):
    def run(self) -> None:
        global moneys
        global times
        while 1:
            glock.acquire()
            # 加条件,不然生产者会一直生产下去,同时退出循环前一定要解锁,不然会一直卡在这里重复退出
            if times >= 10:
                glock.release()
                break
            money = random.randint(0, 100)
            moneys += money
            times += 1
            print("%s生产了%d元" % (threading.current_thread().name, money))
            time.sleep(1)
            glock.release()

# 创建消费者
class consumer(threading.Thread):
    def run(self) -> None:
        global moneys
        while 1:
            glock.acquire()
            money = random.randint(0, 100)
            # 当钱不够且生产者不再生产时退出程序,需要提前释放锁
            if moneys < money and times >= 10:
                glock.release()
                break
            if moneys >= money:
                moneys -= money
                print("%s消费了%d元" % (threading.current_thread().name, money))
            else:
                print("消费者想消费%d元,但是余额只有%d元" % (money, moneys))
            time.sleep(1)
            glock.release()


def main():
    # 创建5个生产者
    for i in range(5):
        th = producer(name="生产者%s号" % i)
        th.start()
    # 创建5个消费者
    for i in range(5):
        th = consumer(name="消费者%s号" % i)
        th.start()

if __name__ == '__main__':
    main()

3.Condition版的生产者和消费者模式

Lock版的生产者和消费者模式可以正常运行。但是存在一个不足之处,在消费者中,总是通过while 1死循环并且上锁的方式去判断钱够不够。上锁是一个很耗费CPU资源的行为,因此这种方式不是最好的。还有一种更好的方式便是使用threading.Condition来实现。threading.Condition可以在没有数据的时候处于阻塞等待状态,一旦有合适的数据了,还可以使用notify相关的函数来通知其他处于等待状态的线程。这样就可以不用做一些无用的上锁和解锁的动作。可以提高程序的性能
首先对threading.Condition相关的函数做个介绍,threading.Condition类似threading.Lock,可以在修改全局数据的时候进行上锁,也可以在修改完毕后进行解锁。以下将一些常用的函数做个简单的介绍:
(1)acquire:上锁
(2)release:解锁
(3)wait:将当前线程处于等待状态,并且会释放锁,可以被其他线程使用notify和notify_all函数唤醒。被唤醒后会继续等待上锁,上锁后继续执行下面的代码
(4)notify:通知某个正在等待的线程,默认是第一个等待的线程
(5)notify_all:通知所有正在等待的线程。notify和notify_all不会释放锁。并且需要在release之前调用

以下是对模拟lock版的生产者和消费者模式代码的改进:

import threading
import random
import time

# 全局变量,也是中间变量
moneys = 0
# 创建一个Condition对象
condition = threading.Condition()
# 全局变量,记录生产者生产的次数
times = 0

class producer(threading.Thread):
    def run(self) -> None:
        global moneys
        global times
        while 1:
            condition.acquire()  # 上锁
            # 判断语句,当生产者生产10次之后将不再生产
            if times >= 10:
                condition.release()
                break
            money = random.randint(0, 100)
            moneys += money
            times += 1
            print("%s生产了%d元" % (threading.current_thread().name, money))
            # 通知所有正在等待状态的线程,且condition.notify_all()不会释放锁,所以必须在释放之前使用
            condition.notify_all()
            time.sleep(0.5)
            condition.release()  #解锁

class consumer(threading.Thread):
    def run(self) -> None:
        global moneys
        while 1:
            condition.acquire()  #上锁
            money = random.randint(0, 100)
            while moneys < money:
                if times >= 10:
                    print("消费者想消费%d元,但是余额只有%d元,并且生产者不再生产金钱了" % (money, moneys))
                    condition.release()
                    return
                print("消费者想消费%d元,但是余额只有%d元" % (money, moneys))
                # 当金钱不够的时候,让该线程处于等待状态
                condition.wait()
            moneys -= money
            print("%s消费了%d元,剩下%d元" % (threading.current_thread().name, money, moneys))
            time.sleep(0.5)
            condition.release()  #解锁


def main():
    # 创建5个生产者
    for i in range(5):
        th = producer(name="生产者%s号" % i)
        th.start()
    # 创建5个消费者
    for i in range(5):
        th = consumer(name="消费者%s号" % i)
        th.start()

if __name__ == '__main__':
    main()

五、Queue线程安全队列

在线程中,访问一些全局变量,加锁是一个经常的过程。如果你想要把一些数据存储到某个队列中,那么Python内置了一个线程安全的模块叫queue模块。Python中的queue模块中提供了同步的、线程安全的队列类,包括FIFO(先进先出)队列Queue,LIFO(后入先出)队列LifoQueue。这些队列都实现了锁原语(可以理解为原子操作,即要么不做,要么都做完),能够在多线程中直接使用,可以使用队列来实现线程间的同步。相关的函数如下:
初始化Queue(maxsize):创建一个先进先出的队列
(1)qsize():放回队列的大小
(2)empty():判断队列是否为空
(3)full():判断队列是否满了
(4)get():从队列中取最后一个数据(当队列已空的时候,队列会处于阻塞等待状态,直到可以有新的元素添加进队列,因此可以加上block=False使它处于非阻塞状态,如果关掉阻塞,当队列已空再想往里取出元素的话,程序会报异常)
(5)put():将一个数据放到队列中(当队列已满的时候,队列会处于阻塞等待状态,直到可以放进新的元素,因此可以加上block=False使它处于非阻塞状态,当队列已满再想往里添加元素的话,程序会报异常)
示例代码如下:

from queue import Queue
import random
import threading
import time

def add_value(q):
    while 1:
        q.put(random.randint(0, 10))
        time.sleep(1)

def get_value(q):
    while 1:
        print(q.get())

def main():
    q = Queue(10)
    th1 = threading.Thread(target=add_value, args=[q])
    th2 = threading.Thread(target=get_value, args=[q])
    th1.start()
    th2.start()

六、GIL理解和正确的利用GIL

1.多线程的GIL锁

Python自带的解释器为CPython,Cpython解释器的多线程实际上是一个假的多线程(在多核CPU中,只能利用一核,不能利用多核)。同一时刻只有一个线程在执行,为了保证同一时刻只有一个线程在执行,在CPython解释器中有一个东西叫GIL(全局解释器锁),因为CPython解释器的内存管理不是线程安全的,所以这个解释器锁是有必要的。(当然除了CPython外还有其他的解释器,而有些解释器是没有GIL的,比如Jython、IronPython等等)
GIL虽然是一个假的多线程,但是在处理一些IO操作(比如文件读写和网络请求)还是可以在很大程度上提高效率的,在IO操作上建议使用多线程提高效率。在一些CPU计算操作上不建议使用多线程,而建议使用多进程

2.有了GIL,为什么还需要Lock呢?

GIL只是保证全局同一时刻只有一个线程在执行,但是它并不能保证执行代码的原子性。也就是这一个操作可能会被分成几部分来完成,这样就会导致数据出现问题,所以需要Lock锁来保证操作的原子性

3.小结

在一般的程序中,GIL锁并不会带来什么影响,但是如果在进行一些例如矩阵乘法等数学计算的程序中,GIL锁还是会存在一定的影响


实例:爬取王者荣耀高清壁纸并下载到文件夹中

  1. 进入王者荣耀高清壁纸官网,在network里面找到workList文件,获取里面的url,此url是真正的高清壁纸地址
  2. 获得url后,通过parse.unquote可以进行解码,然后将获得的地址最后的200改成0,就可以获得真正的高清壁纸地址
  3. 获取到的url中有一个page参数,通过修改page的值,可以进行翻页操作,默认page是从0开始的
  4. 总共有28页,所以page的区间是[0,27]
  5. 使用之前学到的生产者和消费者模式、线程队列加速爬取下载
  • 这里提供几个函数的使用说明:
    (1)parse.unquote():对URL进行解码
    (2)request.urlretrieve():可以直接将远程地址下载到本地
    (3)os.path.join():连接两个或更多的路径名组件
import queue
import requests
from urllib import parse
import os
from urllib import request
import threading

headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36",
    "referer": "https://pvp.qq.com/"
}

class producer(threading.Thread):
    def __init__(self, page_queue, image_queue, *args, **kwargs):
        super(producer, self).__init__(*args, **kwargs)
        self.page_queue = page_queue
        self.image_queue = image_queue

    def run(self) -> None:
        while not self.page_queue.empty():
            page_url = self.page_queue.get()
            resp = requests.get(page_url, headers=headers)
            result = resp.json()
            datas = result['List']
            for data in datas:
                sProdImgNo_8 = parse.unquote(data['sProdImgNo_8']).replace("200", "0")
                name = parse.unquote(data["sProdName"]).replace("1:1", "").strip()
                self.image_queue.put({"sProdImgNo_8": sProdImgNo_8, "name": name})

class consumer(threading.Thread):
    def __init__(self, image_queue, *args, **kwargs):
        super(consumer, self).__init__(*args, **kwargs)
        self.image_queue = image_queue

    def run(self) -> None:
        while 1:
            try:
                image_obj = self.image_queue.get()
                sProdImgNo_8 = image_obj.get("sProdImgNo_8")
                name = image_obj.get("name")
                request.urlretrieve(sProdImgNo_8, os.path.join("王者荣耀壁纸", name+".jpg"))
                print("%s下载完成" % name)
            except:
                break

def main():
    page_queue = queue.Queue(maxsize=28)
    image_queue = queue.Queue(maxsize=1000)
    for page in range(28):
        page_url = f"https://apps.game.qq.com/cgi-bin/ams/module/ishow/V1.0/query/workList_inc.cgi?activityId=2735&sVerifyCode=ABCD&sDataType=JSON&iListNum=20&totalpage=0&page={page}&iOrder=0&iSortNumClose=1&iAMSActivityId=51991&_everyRead=true&iTypeId=2&iFlowId=267733&iActId=2735&iModuleId=2735&_=1647064214252"
        page_queue.put(page_url)
    # 创建3个生产者
    for i in range(3):
        th = producer(page_queue, image_queue)
        th.start()
    # 创建5个消费者
    for x in range(5):
        th = consumer(image_queue)
        th.start()

if __name__ == '__main__':
    main()

结尾

本篇的内容到这里差不多就结束啦!
希望能让你们对多线程以及一些相关的知识有一个基本的理解
大家如果对一些知识还是不太理解也可以通过多实践来学习,实践是检验真理的唯一标准!希望能和大家一起互勉

希望大家给个点赞关注收藏三连~

在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Python之多线程爬虫实践 的相关文章

随机推荐

  • 本地与linux服务器文件互传(超简单)

    利用系统自带的命令行窗口powershell上传 xff08 win10以上系统自带的 xff0c 系统级应用 xff0c 十分推荐使用 xff09 在这Linux 用户名 xff1a hadoop ip 192 168 53 20 打开搜
  • 【剑指offer系列】剑指offer 03-06

    这次我们来讲解剑指offer的全部题目 xff0c 今天是第一天 xff0c 我们来讲解第三题到第六题 xff08 我也不清楚为什么力扣上查不到第一题和第二题 xff09 一 剑指offer 03 题目链接 xff1a 力扣 题目描述 xf
  • 什么是scrum中的3355

    scrum的3355是指 xff1a 3个工件 xff1a 产品Backlog Sprint Backlog 潜在可交付软件增量 3个角色 xff1a PO Master 团队 xff08 最适合人数为7 2到7 43 2之间 xff09
  • 搭建ant+jenkins+jmeter自动化接口测试框架(详细篇)

    引言 为什么要持续集成 1 减少风险 2 减少假定 3 减少重复过程 4 增强项目的可见性 5 持续集成可以带来两点积极效果 xff1a 1 有效决策 xff1a 持续集成系统为项目构建状态和品质指标提供了及时的信息 xff0c 有些持续集
  • Linux C生产者和消费者(线程)

    生产者和消费者 生产者消费者问题实现目标原理代码 生产者消费者问题 生产者消费者共享缓冲区 xff0c 生产者向缓冲区中放数据 xff0c 消费者从缓冲取中取数据 xff0c 当缓冲区中被放满时 xff0c 生产者进程就必须进入挂起状态 x
  • HPE DL388GEN9 /windows server 2012r2 重置管理员密码/忘记管理员密码

    有台HPE DL388GEN9 windows server 2012r2的主机 xff0c 不知道密码 从CSND上查了有人可以通过U盘PE进去用工具去改掉 实测 xff0c 难以进入PE xff08 也可能是我操作有问题 xff09 x
  • ArchLinux,ManjaroLlinux安装,运行Android软件。安装anbox(详细)

    安装anbox我也是用了一个下午的时间来进行安装 xff0c 因此我做了一下总结 xff0c 方便大家安装 这个安装教程arch和manjaro都是可以实现的 xff0c 因为manjaro是arch的分支 xff0c 同样也可以使用anb
  • ArchLinux的安装(BIOS引导方式安装)

    archlinux的安装对于很多新手朋友很不友好 xff0c 于是我对archlinux的安装做了一下整理 xff0c 方便大家安装 安装之前我们需要准备一下 xff1a archlinux的镜像iOS文件 U盘 xff0c 或者虚拟机 脑
  • 汇编语言,and、or指令

    and 和 or指令 and指令 xff0c 作用按位与运算 mov ax 1011 1100b mov bx 1100 1011b and ax bx ax的结果为 xff1a 1000 1000 or指令 xff0c 作用 xff1a
  • 解决wget错误:ERROR: The certificate of ‘xxx’ is not trusted.

    使用wget出现以下错误 wget https github com mozilla geckodriver releases download v0 31 0 geckodriver v0 31 0 linux64 tar gz ERRO
  • git 报错SSL certificate problem: unable to get local issuer certificate解决办法

    git中的SSL certificate problem unable to get local issuer certificate错误的解决办法 uManBoy 这是由于当你通过HTTPS访问Git远程仓库的时候 xff0c 如果服务器
  • Linux 对整个系统备份和还原

    对系统进行备份非常的重要 xff0c 如果有一天 xff0c 系统崩溃了 xff0c 可以重装系统 xff0c 但是重装系统后又需要进行相关的配置 xff0c 这会显得非常的麻烦 xff0c 又会浪费很多的时间 备份的方式 xff1a 分两
  • Win11安装python-3.11.1

    注意整个安装过程需要联网 xff01 xff01 xff01 python 3 11 1下载地址 xff1a https www python org ftp python 3 11 1 python 3 11 1 amd64 exe 1
  • 三分钟记住20道性能测试经典面试题

    1 什么是性能测试 xff1f 测试系统有没有性能问题 考虑时间 xff0c 空间 服务端资源是否足够 响应时间是否超时 系统是否足够稳定 2 性能测试的应用领域有哪些 xff1f 能力验证 xff1a 乙方向甲方交付项目时 xff0c 声
  • C++ 对数组的快速排序算法

    include 34 stdio h 34 交换两个数 void swap int amp a int amp b int t 61 a a 61 b b 61 t 根据第一个数 xff0c 把小于第一个数的数放在前面 xff0c 把打印第
  • C++ 广度优先搜索,搜索二叉树,并且打印

    广度优先搜索 xff08 BFS xff09 什么是广度优先搜索 广度优先搜索就是层序遍历 xff0c 一层一层的搜索整个图 BFS的代码实现 使用数组实现静态的二叉树 xff0c 树的结构如下显示 代码如下显示 include 34 st
  • C++ 得到下一天的年月日的函数

    include 34 iostream 34 include 34 string 34 include 34 stdio h 34 using namespace std 得到下一个日期 void next day int amp year
  • 树莓派使用Thonny学习笔记1

    来C站这么久了 xff0c 最近在初学树莓派Raspberry Pi PIco 所以想在C站记录一下自己的学习过程 xff0c 到时候能回头看看自己的学习之路 今天学习了如何新建全局变量与按键输入模式 xff0c 和IO的输出模式配置 xf
  • error while loading shared libraries: librosconsole.so: cannot open shared object file: No such file

    错误背景 xff1a linux下运行ros相关文件出错 错误意思 xff1a Linux下这个的错误的大概意思还是路径问题 xff0c 要么是你下载的库真的不存在 xff0c 要么是没有添加环境变量 xff0c 不过大多数都属于后者 一
  • Python之多线程爬虫实践

    多线程爬虫实践 一 多线程的介绍及threading的基本使用1 什么是多线程 xff1f 2 如何创建一个基本的多线程 xff1a 二 使用Thread类创建多线程1 查看当前线程2 继承自threading Thread类 三 多线程共