python核心并发编程之futures(代码片段)

sysu_lluozh sysu_lluozh     2022-12-12     655

关键词:

无论对于哪门语言,并发编程都是一项很常用很重要的技巧

正确合理地使用并发编程,无疑会让程序带来极大的性能提升。接下来学习理解、运用Python中的并发编程——Futures

一、区分并发和并行

在学习并发编程时,常常同时听到并发(Concurrency)和并行(Parallelism)这两个术语,这两者经常一起使用导致很多人以为它们是一个意思,其实不然

1.1 理解误区

首先辨别一个误区,在Python中并发并不是指同一时刻有多个操作(thread、task)同时进行。相反,某个特定的时刻它只允许有一个操作发生,只不过线程/任务之间会互相切换,直到完成。看下面这张图:

图中出现了threadtask两种切换顺序的不同方式,分别对应Python中并发的两种形式—threadingasyncio

1.2 threading和asyncio

  • threading

对于threading,操作系统知道每个线程的所有信息,因此它会做主在适当的时候做线程切换
优点:
代码容易书写,因为程序员不需要做任何切换操作的处理
不足:
切换线程的操作,有可能出现在一个语句执行的过程中(比如 x += 1),这样就容易出现race condition 的情况

  • asyncio

对于asyncio,主程序想要切换任务时,必须得到此任务可以被切换的通知,这样一来也就可以避免刚刚提到的race condition的情况

1.3 并行的理解

至于所谓的并行,指的才是同一时刻、同时发生
Python中的multi-processing便是这个意思,对于multi-processing可以简单地这么理解:比如电脑是6核处理器,那么在运行程序时就可以强制Python开6个进程同时执行以加快运行速度,原理示意图如下:

1.4 并行和并发对比

  • 并发通常应用于I/O操作频繁的场景
    比如要从网站上下载多个文件,I/O操作的时间可能会比CPU运行处理的时间长得多

  • 并行更多应用于CPU heavy的场景
    比如MapReduce中的并行计算,为了加快运行速度一般会用多台机器、多个处理器来完成

二、并发编程之Futures

2.1 单线程与多线程性能比较

接下来通过具体的实例,从代码的角度来理解并发编程中的Futures,并进一步来比较其与单线程的性能区别

假设有一个任务是下载一些网站的内容并打印,如果用单线程的方式代码实现如下所示(为了简化代码和突出主题,代码中忽略了异常处理):

import requests
import time

def download_one(url):
    resp = requests.get(url)
    print('Read  from '.format(len(resp.content), url))
    
def download_all(sites):
    for site in sites:
        download_one(site)

def main():
    sites = [
        'https://en.wikipedia.org/wiki/Portal:Arts',
        'https://en.wikipedia.org/wiki/Portal:History',
        'https://en.wikipedia.org/wiki/Portal:Society',
        'https://en.wikipedia.org/wiki/Portal:Biography',
        'https://en.wikipedia.org/wiki/Portal:Mathematics',
        'https://en.wikipedia.org/wiki/Portal:Technology',
        'https://en.wikipedia.org/wiki/Portal:Geography',
        'https://en.wikipedia.org/wiki/Portal:Science',
        'https://en.wikipedia.org/wiki/Computer_science',
        'https://en.wikipedia.org/wiki/Python_(programming_language)',
        'https://en.wikipedia.org/wiki/Java_(programming_language)',
        'https://en.wikipedia.org/wiki/PHP',
        'https://en.wikipedia.org/wiki/Node.js',
        'https://en.wikipedia.org/wiki/The_C_Programming_Language',
        'https://en.wikipedia.org/wiki/Go_(programming_language)'
    ]
    start_time = time.perf_counter()
    download_all(sites)
    end_time = time.perf_counter()
    print('Download  sites in  seconds'.format(len(sites), end_time - start_time))
    
if __name__ == '__main__':
    main()

# 输出
Read 129886 from https://en.wikipedia.org/wiki/Portal:Arts
Read 184343 from https://en.wikipedia.org/wiki/Portal:History
Read 224118 from https://en.wikipedia.org/wiki/Portal:Society
Read 107637 from https://en.wikipedia.org/wiki/Portal:Biography
Read 151021 from https://en.wikipedia.org/wiki/Portal:Mathematics
Read 157811 from https://en.wikipedia.org/wiki/Portal:Technology
Read 167923 from https://en.wikipedia.org/wiki/Portal:Geography
Read 93347 from https://en.wikipedia.org/wiki/Portal:Science
Read 321352 from https://en.wikipedia.org/wiki/Computer_science
Read 391905 from https://en.wikipedia.org/wiki/Python_(programming_language)
Read 321417 from https://en.wikipedia.org/wiki/Java_(programming_language)
Read 468461 from https://en.wikipedia.org/wiki/PHP
Read 180298 from https://en.wikipedia.org/wiki/Node.js
Read 56765 from https://en.wikipedia.org/wiki/The_C_Programming_Language
Read 324039 from https://en.wikipedia.org/wiki/Go_(programming_language)
Download 15 sites in 2.464231112999869 seconds

这种方式应该是最直接也最简单的:

  1. 遍历存储网站的列表
  2. 对当前网站执行下载操作
  3. 等到当前操作完成后,再对下一个网站进行同样的操作,一直到结束

可以看到总共耗时约 2.4s
单线程的优点是简单明了,但是明显效率低下,因为上述程序的绝大多数时间都浪费在了I/O等待上。程序每次对一个网站执行下载操作,都必须等到前一个网站下载完成后才能开始。在实际生产环境中,需要下载的网站数量至少是以万为单位的,不难想象这种方案根本行不通

接着再来看多线程版本的代码实现:

import concurrent.futures
import requests
import threading
import time

def download_one(url):
    resp = requests.get(url)
    print('Read  from '.format(len(resp.content), url))


def download_all(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        executor.map(download_one, sites)

def main():
    sites = [
        'https://en.wikipedia.org/wiki/Portal:Arts',
        'https://en.wikipedia.org/wiki/Portal:History',
        'https://en.wikipedia.org/wiki/Portal:Society',
        'https://en.wikipedia.org/wiki/Portal:Biography',
        'https://en.wikipedia.org/wiki/Portal:Mathematics',
        'https://en.wikipedia.org/wiki/Portal:Technology',
        'https://en.wikipedia.org/wiki/Portal:Geography',
        'https://en.wikipedia.org/wiki/Portal:Science',
        'https://en.wikipedia.org/wiki/Computer_science',
        'https://en.wikipedia.org/wiki/Python_(programming_language)',
        'https://en.wikipedia.org/wiki/Java_(programming_language)',
        'https://en.wikipedia.org/wiki/PHP',
        'https://en.wikipedia.org/wiki/Node.js',
        'https://en.wikipedia.org/wiki/The_C_Programming_Language',
        'https://en.wikipedia.org/wiki/Go_(programming_language)'
    ]
    start_time = time.perf_counter()
    download_all(sites)
    end_time = time.perf_counter()
    print('Download  sites in  seconds'.format(len(sites), end_time - start_time))

if __name__ == '__main__':
    main()

## 输出
Read 151021 from https://en.wikipedia.org/wiki/Portal:Mathematics
Read 129886 from https://en.wikipedia.org/wiki/Portal:Arts
Read 107637 from https://en.wikipedia.org/wiki/Portal:Biography
Read 224118 from https://en.wikipedia.org/wiki/Portal:Society
Read 184343 from https://en.wikipedia.org/wiki/Portal:History
Read 167923 from https://en.wikipedia.org/wiki/Portal:Geography
Read 157811 from https://en.wikipedia.org/wiki/Portal:Technology
Read 91533 from https://en.wikipedia.org/wiki/Portal:Science
Read 321352 from https://en.wikipedia.org/wiki/Computer_science
Read 391905 from https://en.wikipedia.org/wiki/Python_(programming_language)
Read 180298 from https://en.wikipedia.org/wiki/Node.js
Read 56765 from https://en.wikipedia.org/wiki/The_C_Programming_Language
Read 468461 from https://en.wikipedia.org/wiki/PHP
Read 321417 from https://en.wikipedia.org/wiki/Java_(programming_language)
Read 324039 from https://en.wikipedia.org/wiki/Go_(programming_language)
Download 15 sites in 0.19936635800002023 seconds

非常明显,总耗时是0.2s左右,效率一下子提升了10倍+

具体来看这段代码,多线程版本和单线程版的主要区别所在:

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
     executor.map(download_one, sites)

这里创建了一个线程池,总共有5个线程可以分配使用
executer.map()与Python内置的map()函数类似,表示对sites中的每一个元素并发地调用函数download_one()

顺便提一下,在download_one()函数中使用的requests.get()方法是线程安全的(thread-safe),因此在多线程的环境下也可以安全使用,并不会出现race condition的情况

另外,虽然线程的数量可以自己定义,但是线程数并不是越多越好
因为线程的创建、维护和删除也会有一定的开销,所以如果设置的很大反而可能会导致速度变慢。往往需要根据实际的需求做一些测试来寻找最优的线程数量

当然,也可以用并行的方式去提高程序运行效率,只需要在download_all()函数中做出下面的变化即可:

with futures.ThreadPoolExecutor(workers) as executor
=>
with futures.ProcessPoolExecutor() as executor: 

在需要修改的这部分代码中,函数ProcessPoolExecutor()表示创建进程池,使用多个进程并行的执行程序
不过通常省略参数workers,因为系统会自动返回CPU的数量作为可以调用的进程数

刚刚提到过,并行的方式一般用在CPU heavy的场景中,因为对于I/O heavy的操作多数时间都会用于等待,相比于多线程,使用多进程并不会提升效率。反而很多时候,因为CPU数量的限制,会导致其执行效率不如多线程版本

三、什么是Futures

3.1 Futures的作用

Python中的Futures模块,位于concurrent.futuresasyncio中,它们都表示带有延迟的操作
Futures会将处于等待状态的操作包裹起来放到队列中,这些操作的状态随时可以查询。当然,它们的结果如果是异常也能够在操作完成后被获取

通常来说,作为用户不用考虑如何去创建Futures,这些Futures底层都会处理好,要做的是实际上是去schedule这些Futures的执行

比如,Futures中的Executor类,当执行executor.submit(func)时便会安排里面的func()函数执行,并返回创建好的future实例,以便你之后查询调用

3.2 一些常用的函数

  • done()

Futures中的方法done(),表示相对应的操作是否完成——True表示完成,False表示没有完成

注意:done()是non-blocking,立即返回结果

  • add_done_callback(fn)

相对应的add_done_callback(fn),表示Futures完成后相对应的参数函数fn,会被通知并执行调用

  • result()

Futures中还有一个重要的函数result(),它表示当future完成后返回其对应的结果或异常

  • as_completed(fs)

as_completed(fs),则是针对给定的future迭代器fs,在其完成后返回完成后的迭代器

上述例子也可以写成下面的形式:

import concurrent.futures
import requests
import time

def download_one(url):
    resp = requests.get(url)
    print('Read  from '.format(len(resp.content), url))

def download_all(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        to_do = []
        for site in sites:
            future = executor.submit(download_one, site)
            to_do.append(future)
            
        for future in concurrent.futures.as_completed(to_do):
            future.result()
def main():
    sites = [
        'https://en.wikipedia.org/wiki/Portal:Arts',
        'https://en.wikipedia.org/wiki/Portal:History',
        'https://en.wikipedia.org/wiki/Portal:Society',
        'https://en.wikipedia.org/wiki/Portal:Biography',
        'https://en.wikipedia.org/wiki/Portal:Mathematics',
        'https://en.wikipedia.org/wiki/Portal:Technology',
        'https://en.wikipedia.org/wiki/Portal:Geography',
        'https://en.wikipedia.org/wiki/Portal:Science',
        'https://en.wikipedia.org/wiki/Computer_science',
        'https://en.wikipedia.org/wiki/Python_(programming_language)',
        'https://en.wikipedia.org/wiki/Java_(programming_language)',
        'https://en.wikipedia.org/wiki/PHP',
        'https://en.wikipedia.org/wiki/Node.js',
        'https://en.wikipedia.org/wiki/The_C_Programming_Language',
        'https://en.wikipedia.org/wiki/Go_(programming_language)'
    ]
    start_time = time.perf_counter()
    download_all(sites)
    end_time = time.perf_counter()
    print('Download  sites in  seconds'.format(len(sites), end_time - start_time))

if __name__ == '__main__':
    main()

# 输出
Read 129886 from https://en.wikipedia.org/wiki/Portal:Arts
Read 107634 from https://en.wikipedia.org/wiki/Portal:Biography
Read 224118 from https://en.wikipedia.org/wiki/Portal:Society
Read 158984 from https://en.wikipedia.org/wiki/Portal:Mathematics
Read 184343 from https://en.wikipedia.org/wiki/Portal:History
Read 157949 from https://en.wikipedia.org/wiki/Portal:Technology
Read 167923 from https://en.wikipedia.org/wiki/Portal:Geography
Read 94228 from https://en.wikipedia.org/wiki/Portal:Science
Read 391905 from https://en.wikipedia.org/wiki/Python_(programming_language)
Read 321352 from https://en.wikipedia.org/wiki/Computer_science
Read 180298 from https://en.wikipedia.org/wiki/Node.js
Read 321417 from https://en.wikipedia.org/wiki/Java_(programming_language)
Read 468421 from https://en.wikipedia.org/wiki/PHP
Read 56765 from https://en.wikipedia.org/wiki/The_C_Programming_Language
Read 324039 from https://en.wikipedia.org/wiki/Go_(programming_language)
Download 15 sites in 0.21698231499976828 seconds
  • 首先调用executor.submit(),将下载每一个网站的内容都放进future队列to_do,等待执行
  • 然后是as_completed()函数,在future完成后便输出结果

这里要注意,future列表中每个future完成的顺序,和它在列表中的顺序并不一定完全一致。到底哪个先完成、哪个后完成,取决于系统的调度和每个future的执行时间

四、多线程的执行

那为什么多线程每次只能有一个线程执行呢?

同一时刻,Python主程序只允许有一个线程执行,所以Python的并发是通过多线程的切换完成的。这到底是为什么呢?

这里就涉及到全局解释器锁的概念

事实上,Python的解释器并不是线程安全的,为了解决由此带来的race condition等问题,Python便引入了全局解释器锁,也就是同一时刻只允许一个线程执行
当然,在执行I/O操作时,如果一个线程被block 了,全局解释器锁便会被释放,从而让另一个线程能够继续执行

五、总结

首先学习了Python中并发和并行的概念与区别

  • 并发

通过线程和任务之间互相切换的方式实现,但同一时刻只允许有一个线程或任务执行

  • 并行

指多个进程同时执行

并发通常用于I/O操作频繁的场景,而并行则适用于CPU heavy的场景

随后,通过下载网站内容的例子,比较了单线程和运用Futures的多线程版本的性能差异。显而易见,合理地运用多线程能够极大地提高程序运行效率

还学习了Futures的具体原理,介绍了一些常用函数比如done()、result()、as_completed()等的用法,并辅以实例加以理解

要注意,Python中之所以同一时刻只允许一个线程运行,其实是由于全局解释器锁的存在。但是对I/O操作而言,当其被block的时候全局解释器锁便会被释放,使其他线程继续执行

并发编程之线程进阶(代码片段)

理论知识全局解释器锁GILPython代码的执行由Python虚拟机(也叫解释器主循环)来控制。Python在设计之初就考虑到要在主循环中,同时只有一个线程在执行。虽然Python解释器中可以“运行”多个线程,但在任意时刻只有一个线程在解... 查看详情

并发编程之多进程(实践)(代码片段)

Python多进程模块multiprocessing模块介绍python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分情况需要使用多进程。Python提供了multiprocessing。multiprocessing模块用来开启子进程,... 查看详情

并发编程之io模型(代码片段)

Python并发编程之IO模型 目录:  一、IO模型介绍  二、阻塞IO (blockingIO)  三、非阻塞IO (nonblockingIO)  四、多路复用IO (IOmultiplexing)  五、异步IO(asynchronousIO)  六、IO模型比较分析  七、selsectors模块 ... 查看详情

python核心揭秘python协程(代码片段)

首先要明白什么是协程?协程是实现并发编程的一种方式。一说并发肯定想到了多线程/多进程模型,多线程/多进程正是解决并发问题的经典模型之一先从一个爬虫实例出发,用清晰的思路并且结合实战来搞懂这个不... 查看详情

并发编程之多进程操作(代码片段)

一multiprocessing模块介绍??Python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分情况需要使用多进程。??multiprocessing模块用来开启子进程并在子进程中执行我们定制的任务(如... 查看详情

python并发编程之线程的玩法(代码片段)

一、线程基础以及守护进程线程是CPU调度的最小单位全局解释器锁全局解释器锁GIL(globalinterpreterlock)全局解释器锁的出现主要是为了完成垃圾回收机制的回收机制,对不同线程的引用计数的变化记录的更加精准。全... 查看详情

python并发编程之进程的玩法(代码片段)

一、操作系统基础1.I/O操作IO操作是相对内存来说的。输入指往内存中输入,输出指从内存中往外输出。文件操作:read(输入),write(输出)网络操作:send(输出),recv(输入࿰... 查看详情

网络编程之并发网络编程(代码片段)

...另一个客户端通信完成后才能和服务端进行连接和通信。python3中提供有一个高级内置模块socketserver来帮助我们进行并发的网络编程。 socketserver模块介绍SocketServer模块处理网络请求的功能,可以通过两个主要的类来实现:一... 查看详情

java多线程之并发编程三大核心问题

概述并发编程是Java语言的重要特性之一,它能使复杂的代码变得更简单,从而极大的简化复杂系统的开发。并发编程可以充分发挥多处理器系统的强大计算能力,随着处理器数量的持续增长,如何高效的并发变得越来越重要。... 查看详情

googleguava并发编程-listenablefuture(代码片段)

...相关的类非常多,我们对嘴常用的三个类MoreExecutors、Futures、ListenableFuture的使用做一个简单的介绍。我相信通过这三 查看详情

goroutine并发调度模型深度解析之手撸一个协程池(代码片段)

...ool高并发并发(并行),一直以来都是一个编程语言里的核心主题之一,也是被开发者关注最多的话题;Go语言作为一个出道以来就自带『高并发』光环的富二代编程语言,它的并发(并行)编程肯定是值得开发者去探究的,而G... 查看详情

tornado高并发源码分析之六---异步编程的几种实现方式

 方式一:通过线程池或者进程池导入库futures是python3自带的库,如果是python2,需要pip安装future这个库备注:进程池和线程池写法相同1fromconcurrent.futuresimportThreadPoolExecutor,ProcessPoolExecutor2fromtornado.concurrentimportrun_on_executor34def 查看详情

解决并发编程之痛的良药--结构化并发编程(代码片段)

解决并发编程之痛的良药--结构化并发编程作者简介:曹家锋,Westar实验室技术专家。Westar实验室(westar.io),成立于2018年,关注于区块链及分布式前沿技术,包括区块链分层架构、二层路由,网络性能、智能合约、PoW优化等。... 查看详情

提升--04---并发编程之---有序性(代码片段)

并发编程之有序性经典案例1:importjava.util.concurrent.CountDownLatch;publicclassT01_Disorderprivatestaticintx=0,y=0;privatestaticinta=0,b=0;publicstaticvoidmain(String[]args)throwsInterruptedExcep 查看详情

学并发编程,透彻理解这三个核心是关键(代码片段)

...换思维或视角来学习远看并发,并发编程可以抽象成三个核心问题:分工、同步/协作、互斥如果你已经工作了,那么你一定听说过或者正在应用敏捷开发模式来交付日常的工作任务,我们就用你熟悉的流程来解释这三个核心问题 查看详情

并发编程之协程(代码片段)

...、gevent模块 1??协程介绍  1、前言+回顾    1.1并发的本质       基于单线程来实现并发,即只用一个主线程(很明显可利用的cpu只有一个)情况下实现并发,为此我们需要先回顾下并发的本质:切换+保存状态... 查看详情

提升--03---并发编程之---可见性(代码片段)

并发编程三大特性可见性先看一个小案例:importjava.io.IOException;publicclassT01_HelloVolatileprivatestaticbooleanrunning=true;privatestaticvoidm()System.out.println("mstart");while(running)System.out.prin 查看详情

并发编程系列之synchronized实现原理(代码片段)

并发编程系列之Synchronized实现原理1、了解synchronized字节码下面给出一个简单例子,synchronized关键字加在两个方法上,另外一个加在方法里publicclassSynchroinzedDemostaticinta;publicstaticsynchronizedvoidadd1(intb)a+=b;< 查看详情