【并发编程】Python多线程、多进程、多协程入门篇

2023-11-14


B站学习视频:
Python并发编程
并发编程手册,更多细节可以看看这本书,虽然翻译可能不太行:
Python并行编程(中文版)手册

1. 基础概念

背景:按顺序执行程序,对于数量级较大、循环次数较多的任务,程序运行耗时会过长;
引入并发编程的目的:大幅提升程序运行速度;
程序提速的方法
在这里插入图片描述

CPU是指CPU运算过程,IO指文件读取过程,在电脑中CPU的运算过程和IO流操作是可以并行的。基础的程序运行流程是单线程串行的,对于单核CPU机器,则可以更改为CPU运算和IO流操作并行,即多线程并发,对于具有多核CPU的机器而言,则可以实现基于多个CPU内核的并行操作,即多CPU并行,在此基础上,有条件的话,还可以基于hadoop、hive、spark等采用多个机器并行操作,实现更高程度的并发操作,即多机器并行,如上图所示。

Python对并发编程的支持

  1. 并发编程实现方式的支持:
    ·多线程:threading,利用CPU和IO可以同时执行的原理,让CPU不会干巴巴等待IO完成;
    ·多进程:multiprocessing,利用多核CPU的能力,真正的并行执行任务;
    异步IO:asyncio,在单线程利用CPU和IO同时执行的原理,实现函数异步执行;
  2. 并发编程方法上的支持(Tips):
    ·使用Lock对资源加锁,防止对同一个文件进行修改时的冲突访问;
    ·使用Queue实现不同线程/进程之间的数据通信,实现生产者-消费者模式;
    ·使用线程池Pool/进程池Pool,简化线程/进程的任务提交、等待结束、获取结果;
    ·使用subprocess启动外部程序的进程,并进行输入输出的交互;

2. 并发编程如何选择

2.1 CPU/IO密集型计算

在这里插入图片描述

2.2 多线程、多进程、多协程对比

在这里插入图片描述
在这里插入图片描述

3. Python运行速度根因分析

3.1 速度慢两大原因

  1. python是动态类型语言,边解释边执行;
  2. GIL,导致python无法利用多核cpu并发执行;

3.1 GIL

GIL(Global Interpreter Lock),用于同步线程的一种机制,使得任何时刻仅有一个线程在执行,即使在多核处理器上,使用GIL的解释器也只允许同一时间执行一个线程;
引入GIL是为了规避并发,解决多线程之间数据完整性和状态同步问题:
在这里插入图片描述
如何规避GIL带来的限制。
在这里插入图片描述

4. 多线程

4.1 Python多线程基础爬虫

  1. 创建多线程的方法
# 1. 准备一个函数
def my_func(a, b):
	do_craw(a, b)
# 2. 怎样创建一个线程
import threading
t = threading.Thread(target=my_func, args=(100, 200))
# 2. 启动线程
t.start()
# 4. 等待结束
t.join()
  1. 爬虫案例
# blog_spider.py
import requests
import time
urls = [
	"https://www.cnblogs.com/#p{page}" for page in range(1, 50+1)
]

def craw(url):
	r = requests.get(url)
	print(url, len(r.text))

craw(urls[0])

# 01. multi_thread_craw.py
import blog_spider
import threading

def single_thread():
	print("single_thread begin")
	for url in blog_spider.urls:
		blog_spider.craw(url)
	print("single_thread end")

def multi_thread():
	print("multi_thread begin")
	threads = []
	for url in blog_spider.urls:
		threads.append(
			threading.Thread(target=blog_spider.craw, args=(url,))
		)
	
	for thread in threads:
		thread.start()

	for thread in threads:
		thread.join()
	
	print("multi_thread end")

if __name__ == "__main__":
	start = time.time()
	single_thread()
	end = time.time()
	print("single thread cost:", end - start, "seconds") # 7.98S
	start = time.time()
	multi_thread()
	end = time.time()
	print("multi thread cost:", end - start, "seconds") # 0.49s

4.2 Python实现生产者消费者模式

  1. 多组件的Pipeline技术架构
    在这里插入图片描述
  2. 生产者消费者爬虫的架构
    在这里插入图片描述
  3. 多线程数据通信的queue.Queue
# queue.Queue可以用于多线程之间的、线程安全的数据通信
# 1. 导入类库
import queue
# 2. 创建Queue
q = queue.Queue()
# 3. 添加元素
q.put(item)
# 4. 获取元素
item = q.get()
# 5. 查询状态
# 查看元素的多少
q.size()
# 判断是否为空
q.empty()
# 判断是否已满
q.full()
  1. 实现生产者消费者爬虫
# blog_spider.py
import requests
import time
from bs4 import BeautifulSoup
urls = [
	"https://www.cnblogs.com/#p{page}" for page in range(1, 50+1)
]

def craw(url):
	r = requests.get(url)
	return r.text

def parse(html):
	# class="post-item-title"
	soup = BeautifulSoup(html, "html.parser")
	links = soup.find_all("a", class_="post-item-title")
	return [(link["href"], link.get_text()) for link in links]

if __name__ == "__main__":
	for result in parse(craw(url[2])):
		print(result)

# 02.producer_consumer_spider.py
import queue
import blog_spider
import time
import random
import threading

def do_craw(url_queue: queue.Queue, html_queue: queue.Queue):
	while True:
		url = url_queue.get()
		html = blog_spider.craw(url)
		html_queue.put(html)
		print(threading.current_thread().name, f"craw {url}", "url_queue.size=", url_queue.qsize())
		time.sleep(random.randint(1, 2))

def do_parse(html_queue: queue.Queue, fout):
	while True:
		html = html_queue.get()
		results = blog_spider.parse(html)
		for result in results:
			fout.write(str(result) + "\n")
		print(threading.current_thread().name, f"results.size", len(results), "html_queue.size=", html_queue.qsize())
		time.sleep(random.randint(1, 2))

if __name__ == "__main__":
	url_queue = queue.Queue()
	html_queue = queue.Queue()
	for url in blog_spider.urls:
		url_queue.put(url)
	
	for idx in range(3):
		t = threading.Thread(target=do_craw, args=(url_queue, html_queue), name=f"craq{idx}")
		t.start()
	
	fout = open("02.data.txt", "w")
	for idx in range(2):
		t = thread.Thread(target=do_parse, args=(html_queue, fout), name=f"parse{idx}")	
		t.start()

4.3 线程安全(用Lock解决安全问题)

  1. 线程安全
    在这里插入图片描述
# 用法1:try-finally模式
import threading
lock = threading.Lock()

lock.acquire()
try:
	# do somthing
finally:
	lock.release()
# 用法2: with模式
import threading
lock = threading.Lock()
with lock:
	# do somthing

# 03.lock_concurrent.py
import threading
import time

class Account:
	def __init__(self, balance):
		self.balance = balance

def draw(account, amount):
	if account.balance >= amount:
		# 如果没有用Lock,sleep会造成线程阻塞,从而进行线程切换,从而最终导致bug
		time.sleep(0.1)
		print(threading.current_thread().name, "取钱成功")
		account.balance -= amount
		print(threading.current_thread().name, "余额", account.balance)
	else:
		print(threading.current_thread().name, "取钱失败,余额不足")

if __name__ == "__main__":
	account = Account(1000)
	ta = threading.Thread(name="ta", target=draw, args=(account, 800))
	ta = threading.Thread(name="tb", target=draw, args=(account, 800))
	ta.start()
	tb.start()
"""
代码改进
"""
lock = threading.Lock()
def draw(account, amount):
	with lock:
		if account.balance >= amount:
			# 有Lock,必须lock释放后才会切换线程成功,因此避免了bug发生
			time.sleep(0.1)
			print(threading.current_thread().name, "取钱成功")
			account.balance -= amount
			print(threading.current_thread().name, "余额", account.balance)
		else:
			print(threading.current_thread().name, "取钱失败,余额不足")

4. 4 线程池

线程池
采用线程来完成任务时,每次新建线程都需要分配资源,终止线程系统需要回收资源,因此对于任务量较大的情况而言,会造成新建/终止的开销很大,因此就引入了线程池。任务也以队列的形式存储,而线程则放入线程池中执行任务,完成一个任务后等待下一个任务的到来,直到所有任务执行完毕。
在这里插入图片描述
线程池语法

from concurrent.futures import ThreadPoolExecutor, as_completed
# 用法1:map函数,注意map的那个和入参是顺序对应的
with ThreadPoolExecutor() as pool:
	results = pool.map(craw, urls)
	for result in results:
		print(result)

# 用法2: future模式,更强大,如果使用as_completed顺序是不定的
with ThreadPoolExecutor() as pool:
	futures = [pool.submit(craw, url) for url in urls]
	for future in futures:
		print(future.result())
	for future in as_completed(futures):
		print(future.result())

线程池实例

import concurrent.futures
import blog_spider

# craw
with concurrent.futures.ThreadPoolExecutor() as pool:
	htmls = pool.map(blog_spider.craw, blog_spider.urls)
	htmls = list(zip(blog_spider.urls, htmls))
	for url, html in htmls:
		print(url, len(html))

print("craw over")
# parse
with concurrent.futures.ThreadPoolExecutor() as pool:
	futures = {}
	for url, html in htmls:
		future = pool.submit(blog_spider.parse, html;)
		futures[future] = url
	
	# for future, url in futures.items():
		# print(url, future.result())
	
	for future in concurrent.futures.as_completed(futures):
		url = futures[future]
		print(url, future.result())

4.5 线程池&Web服务加速

  1. Wed服务的架构及特点
    在这里插入图片描述
    在这里插入图片描述
# 05.flask_thread_pool.py
import flask
import time
import json

app = flask.Flask(__name__)

def read_file():
	time.sleep(0.1)
	return "file result"

def read_db():
	time.sleep(0.1)
	return "db result"

def read_api():
	time.sleep(0.1)
	return "api result"

@app.route("/")
def index():
	result_file = read_file()
	result_db = read_db()
	result_api = read_api()

	return json.dumps({
		"result_file": result_file,
		"result_db": result_db,
		"result_api": result_api
	})

if __name__ == "__main__":
	app.run()

终端看url的响应时间

time curl http://127.0.0.1:5000/ # 0.634s

优化Web服务

# 05.flask_thread_pool.py
import flask
import time
import json
from concurrent.futures import ThreadPoolExcitor

app = flask.Flask(__name__)
pool = ThreadPoolExcitor()

def read_file():
	time.sleep(0.1)
	return "file result"

def read_db():
	time.sleep(0.1)
	return "db result"

def read_api():
	time.sleep(0.1)
	return "api result"

@app.route("/")
def index():
	result_file = pool.submit(read_file)
	result_db = pool.submit(read_db)
	result_api = pool.submit(read_api)

	return json.dumps({
		"result_file": result_file.result(),
		"result_db": result_db.result(),
		"result_api": result_api.result()
	})

if __name__ == "__main__":
	app.run() # 0.318s耗时减半

5. 多进程

5.1 多进程基础

使用多进程的原因:
对于IO密集型的任务,采用多线程可以大大提升程序运行速度,但是对于CPU密集型的任务,进程的切换反而会导致执行速度降低。
在这里插入图片描述
多线程与多进程对比:
在这里插入图片描述
多进程实例

# 06.thread_process_cpu_bound.py
import math

PRIMES = [11227263717219] * 100

def is_prime(n):
	if n < 2:
		return False
	if n == 2:
		return True
	if n % 2 == 0:
		return False
	sqrt_n = int(math.floor(math.sqrt(n)))
	for i in range(3, sqrt_n + 1, 2):
		if n % i == 0:
			return False
	return True

def single_thread():
	for number in PRIMES:
		is_prime(number)

def multi_thread():
	with ThreadPoolExector() as pool:
		pool.map(is_prime, PRIMES)

def multi_process():
	with ProcessPoolExecutor() as pool:
		pool.map(is_prime, PRIMES)

if __name__ == "__main__":
	start = time.time()
	single_thread()
	end = time.time()
	print("single thread cost:", end - start, "seconds") # 46.61S
	start = time.time()
	multi_thread()
	end = time.time()
	print("multi thread cost:", end - start, "seconds") # 46.65s
	start = time.time()
	multi_process()
	end = time.time()
	print("multi process cost:", end - start, "seconds") # 16.07s

5.2 多进程&Flask服务

# 07.flask_process_pool.py

from concurrent.futures import ProcessPoolExecutor
import math

app = flask.Flask()

def is_prime(n):
	if n < 2:
		return False
	if n == 2:
		return True
	if n % 2 == 0:
		return False
	sqrt_n = int(math.floor(math.sqrt(n)))
	for i in range(3, sqrt_n + 1, 2):
		if n % i == 0:
			return False
	return True

@app.route("/is_prime/<numbers>")
def api_is_prime(numbers):
	number_list = [int(x) for x in numbers.split(",")]
	results = process_pool.map(is_prime, number_list)
	return json.dumps(dict(zip(number_list, results)))


if __name__ == "__main__":
	process_pool = ProcessPoolExecutor()
	app.run()

6. 异步IO

在这里插入图片描述

6.1 异步IO爬虫

# 基本用法
import asyncio
# 获取事件循环
loop = asyncio.get_event_loop()
# 定义协程
async def myfunc(url):
	await get_url(url)
# 创建task列表
tasks = [loop.create_task(myfunc(url)) for url in urls]
# 执行爬虫时间列表
loop.run_until_complete(asyncio.wait(tasks))
"""
注意:
要用在异步IO编程中
依赖的库必须支持异步IO特性

爬虫引用中:
requests不支持异步
需要用aiohttp
"""
# 08.async_spider.py
import asyncio
import aiohttp
from blog_spider

async def async_craw(url):
	async with aiohttp.ClientSession() as session:
		async with session.get(url) as resp:
			result = await resp.text()
			print(f"craw url: {url}, {len(result)}")

loop = asyncio.get_event_loop()

tasks = [
	loop.create_task(async_craw(url))
	for url in blog_spider.urls
]

start = time.time()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print("use time seconds:", end - start, "seconds") # 1.94s

6.2 信号量控制

在这里插入图片描述

# 08.async_spider_semaphore.py
import asyncio
import aiohttp
from blog_spider

semaphore = asyncio.Semaphore(10)

async def async_craw(url):
	async with semaphore:
	print("craw url:", url)
		async with aiohttp.ClientSession() as session:
			async with session.get(url) as resp:
				result = await resp.text()
				print(f"craw url: {url}, {len(result)}")

loop = asyncio.get_event_loop()

tasks = [
	loop.create_task(async_craw(url))
	for url in blog_spider.urls
]

start = time.time()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print("use time seconds:", end - start, "seconds") # 1.94s
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

【并发编程】Python多线程、多进程、多协程入门篇 的相关文章

随机推荐

  • 简单xml的使用以及xml的解析dom4j和jaxp

    1 xml的简介 w3c组织发布 extensible markup Language 可扩展标记型语言 也是使用标签操作 可扩展 html里面的标签是固定 每个标签都有特定的含义 xml标签可以自己定义 可以写中文的标签 用途 显示数据
  • shared_ptr智能指针的使用

    一 shared ptr简单说明 如果一个实例指针被多个对象使用 而调用者不知道该什么时候释放该实例 那么可以使用shared ptr来 托管 这个实例指针 当使用它的对象都被释放掉了 这个实例可以自动被释放 可能有点绕 简单地说 就是类A
  • ubuntu重启后分辨率为1024,nvidia-smi不能用-20200819

    无法连接NVIDIA驱动 NVIDIA SMI has failed because it couldn t communicate with the NVIDIA driver CASE SOLVED NVIDIA SMI has fai
  • Postgresql 常用命令合集-建议收藏

    ps 注意空格和指令正确 以下仅用于自己记录使用 1 基本命令 createdb 创建一个新的PostgreSQL的数据库 命令可以使用大写 createuser 创建一个新的PostgreSQL的用户 dropdb 删除数据库 dropu
  • mssql数据库,数据库同步,分布式数据库,数据库集群,如何实现

    由于项目需要 产品的部署必须考虑到安全和灾难的解决办法 由于之前一直做的的小项目 基本都是单服务器 单数据库结构 但是由于一次灾难 把这个问题提上了日程 本人资历浅薄 很多东西还不是很熟悉 最近在网上百度了一大堆相关的东西 基本有了大概的思
  • 玩转Jetson nano系列(1):Jetson nano编译安装ncnn

    整个安装流程基本按照官方的步骤 build ncnn for jetson 但是在jetson nano上安装时 遇到了glslang和vulkan引发的问题 Found glslangValidator GLSLANGVALIDATOR
  • strptime和strftime

    strptime 将时间字符串转为 struct tm 格式 头文件 include
  • TortoiseGit日常使用指南

    本文在介绍了软件安装和设置后 写了TortoiseGit 常用的一些功能 包括 创建新库 添加文件及文件夹 创建分支 看分支情况及修改log 比较版本差异 合并分支 其他操作 Stash 忽略文件本文不包括 Git 服务器设置 Push 版
  • 预见未来:超强元AI诞生,抓住这个机会,利用AI变现也变得更加容易

    目录 一 引言 二 介绍 三 技术展现 四 元AI架构图展现 五 元AI变现技巧 商业版说明 六 后期规划 一 引言 如何利用AI变现已经成为了当今各个行业亟需解决的问题 随着人工智能技术的快速发展和普及 越来越多的企业开始将其应用于产品研
  • 微信小程序地理位置接口wx.getLocation接口申请方法技巧

    我们在开发微信小程序的时候 提交审核微信官方就会检测咱们的小程序有没有用到位置功能 涉及用到哪个位置接口 然后就会要求我们先申请相应的位置接口 审核通过后才可以发布小程序 这个接口审核一直是让大家头痛的事情 有的小伙伴申请几十次都不给过 有
  • vue防抖 自定义ref实现输入框防抖

    防抖 debounce 当持续触发事件时 一定时间段内没有再触发事件 事件处理函数才会执行一次 如果设定的时间到来之前 又一次触发了事件 就重新开始计时 接下来我将带大家一步步分析如何把input防抖做到极致 首先需要把input 的双向绑
  • Web前端开发实训案例教程(初级)素材

    素材下载地址 Web前端开发实训案例教程 初级 素材 内容简介 本书是按照 Web前端开发职业技能等级标准 编写的配套实践教程 其中涉及的应用技术专题和项目代码均在主流浏览器中运行通过 本书结合大学计算机相关专业Web前端开发方向课程体系
  • windows下用cygwin编译android版ijkplayer

    1 环境搭建 1 1 安装cygwin 并安装git make ysam 具体安装过程网上有很多资料 不再详述 1 2 android sdk和android ndk下载和安装 参见网上资料 1 3 下载ijkplayer源码 git cl
  • 【StyleGAN补充材料】 A Style-Based Generator Architecture for Generative Adversarial Networks

    Supplemental Material A Style Based Generator Architecture for Generative Adversarial Networks 一 前言 1 Hyperparameters an
  • latex解决×叉符号(乘符号)如何打

    简单实用 直接取走 times times
  • 抽样技术--不等概率抽样

    文章目录 不等概抽样 放回不等概抽样 只抽取一个样本单元的不等概抽样 估计量 有放回不等概整群抽样 两阶段有放回不等概抽样 多阶段有放回不等概抽样 不放回不等概抽样 两阶段不放回不等概抽样 不等概抽样 提高估计精度 放回的PPS抽样简化方差
  • 无重复字符的最长字串

    给定一个字符串 s 请你找出其中不含有重复字符的 最长子串 的长度 示例 1 输入 s abcabcbb 输出 3 解释 因为无重复字符的最长子串是 abc 所以其长度为 3 示例 2 输入 s bbbbb 输出 1 解释 因为无重复字符的
  • Zabbix监控服务详解+实战

    目录 一 监控体系概述 1 为什么需要监控 2 监控目标与流程 1 监控的目标 2 监控的流程 3 监控的对象 1 CPU监控 2 磁盘监控 3 内存监控 4 网络监控 5 系统重要进程监控 6 应用服务监控 7 硬件设备监控 8 安全监控
  • 【elasticsearch】memory locking requested for elasticsearch process but memory is not locked

    解决方法 开启bootstrap memory lock 修改文件 etc elasticsearch elasticsearch yml 上面那个报错就是开启后产生的 如果开启还要修改其它系统配置文件 bootstrap memory l
  • 【并发编程】Python多线程、多进程、多协程入门篇

    文章目录 1 基础概念 2 并发编程如何选择 2 1 CPU IO密集型计算 2 2 多线程 多进程 多协程对比 3 Python运行速度根因分析 3 1 速度慢两大原因 3 1 GIL 4 多线程 4 1 Python多线程基础爬虫 4