并发编程之io模型(代码片段)

暮光微凉 暮光微凉     2022-11-12     698

关键词:

主要内容:

  一、IO模型介绍

  二、阻塞IO

  三、非阻塞IO

  四、多路复用

   五、异步IO

 

1️⃣ IO模型介绍

  1 何为同步、异步、阻塞和非阻塞

     同步:     

#所谓同步,就是在发出一个功能调用时,在没有得到结果之前,该调用就不会返回。按照这个定义,其实绝大多数函数都是同步调用。但是一般而言,我们在说同步、异步的时候,特指那些需要其他部件协作或者需要一定时间完成的任务。
#举例:
#1. multiprocessing.Pool下的apply #发起同步调用后,就在原地等着任务结束,根本不考虑任务是在计算还是在io阻塞,总之就是一股脑地等任务结束

    异步:

#异步的概念和同步相对。当一个异步功能调用发出后,调用者不能立刻得到结果。当该异步功能完成后,通过状态、通知或回调来通知调用者。如果异步功能用状态来通知,那么调用者就需要每隔一定时间检查一次,效率就很低(有些初学多线程编程的人,总喜欢用一个循环去检查某个变量的值,这其实是一 种很严重的错误)。如果是使用通知的方式,效率则很高,因为异步功能几乎不需要做额外的操作。至于回调函数,其实和通知没太多区别。
#举例:
#1. multiprocessing.Pool().apply_async() #发起异步调用后,并不会等待任务结束才返回,相反,会立即获取一个临时结果(并不是最终的结果,可能是封装好的一个对象)。

    阻塞:

#阻塞调用是指调用结果返回之前,当前线程会被挂起(如遇到io操作)。函数只有在得到结果之后才会将阻塞的线程激活。有人也许会把阻塞调用和同步调用等同起来,实际上他是不同的。对于同步调用来说,很多时候当前线程还是激活的,只是从逻辑上当前函数没有返回而已。
#举例:
#1. 同步调用:apply一个累计1亿次的任务,该调用会一直等待,直到任务返回结果为止,但并未阻塞住(即便是被抢走cpu的执行权限,那也是处于就绪态);
#2. 阻塞调用:当socket工作在阻塞模式的时候,如果没有数据的情况下调用recv函数,则当前线程就会被挂起,直到有数据为止。

    非阻塞:

#非阻塞和阻塞的概念相对应,指在不能立刻得到结果之前也会立刻返回,同时该函数不会阻塞当前线程。

  小结:

#1. 同步与异步针对的是函数/任务的调用方式:同步就是当一个进程发起一个函数(任务)调用的时候,一直等到函数(任务)完成,而进程继续处于激活状态。而异步情况下是当一个进程发起一个函数(任务)调用的时候,不会等函数返回,而是继续往下执行当,函数返回的时候通过状态、通知、事件等方式通知进程任务完成。

#2. 阻塞与非阻塞针对的是进程或线程:阻塞是当请求不能满足的时候就将进程挂起,而非阻塞则不会阻塞当前进程

  2、IO模型分类

  一般分为五类:

* blocking IO  # 阻塞IO
* nonblocking IO  # 非阻塞IO
* IO multiplexing  # 多路复用
* signal driven IO  # 信号驱动IO
* asynchronous IO  # 异步IO

# signal driven IO(信号驱动IO)在实际中并不常用,所以主要介绍其余四种IO Model。

  IO发生时涉及的对象和步骤:

   以read为例,它主要涉及两个系统对象,一个调用这个IO的process \\(or thread\\),另一个就是系统内核\\(kernel\\)。

  当一个read操作发生时,该操作会经历两个阶段:

1)等待数据准备 (Waiting for the data to be ready)
2)将数据从内核拷贝到进程中(Copying the data from the kernel to the process)

 

2️⃣ 阻塞IO(blocking IO ) 

  在linux中,默认情况下所有的socket都是blocking,一个典型的读操作流程大概是这样:

      

        当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据。

  对于network io来说,很多时候数据在一开始还没有到达(比如,还没有收到一个完整的UDP包),

  这个时候kernel就要等待足够的数据到来。   

    而在用户进程这边,整个进程会被阻塞。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,
  然后kernel返回结果,用户进程才解除block的状态,重新运行起来。所以,blocking IO的特点就是在IO执行的两个阶段
  (等待数据和拷贝数据两个阶段)都被block了。
几乎所有的程序员第一次接触到的网络编程都是从listen\\(\\)、send\\(\\)、recv\\(\\) 等接口开始的,
使用这些接口可以很方便的构建服务器/客户机的模型。然而大部分的socket接口都是阻塞型的。如下图

ps:
所谓阻塞型接口是指系统调用(一般是IO接口)不返回调用结果并让当前线程一直阻塞
只有当该系统调用获得结果或者超时出错时才返回。

      

    实际上,除非特别指定,几乎所有的IO接口 ( 包括socket接口 ) 都是阻塞型的。这给网络编程带来了一个很大的问题,

  如在调用recv(1024)的同时,线程将被阻塞,在此期间,线程将无法执行任何运算或响应任何的网络请求。

    一个简单的解决方案是:  

在服务器端使用多线程(或多进程)。多线程(或多进程)的目的是让每个连接都拥有独立的线程(或进程),
这样任何一个连接的阻塞都不会影响其他的连接。

  实例如下:

    分两部分,客户端(client.py)和服务端(server.py)

    client.py

#!/usr/bin/env python3
#-*- coding:utf-8 -*-
# write by congcong
from socket import *

def talk():
    client = socket(AF_INET,SOCK_STREAM)
    client.connect((\'127.0.0.1\',8806))

    while True:
        mes = input(\'>>>:\').strip()
        if not mes:continue
        client.send(mes.encode(\'utf-8\'))
        data = client.recv(1024)
        print(data.decode(\'utf-8\'))
    client.close()

if __name__ == \'__main__\':
    talk()
View Code

    server.py

#!/usr/bin/env python3
#-*- coding:utf-8 -*-
# write by congcong

from socket import *
from threading import Thread,currentThread

def talk(conn):
    while True:
        try:
            data = conn.recv(1024)
            if not data:break
            print(data.decode(\'utf-8\'))
            conn.send((\'%s hello\'%currentThread().getName()).encode(\'utf-8\'))
        except ConnectionResetError:
            break
    conn.close()

def server(ip,port):
    server = socket(AF_INET,SOCK_STREAM)
    server.bind((ip,port))
    server.listen(5)
    while True:
        print(\'staring...\')
        conn,addres = server.accept()
        print(addres)
        t = Thread(target=talk,args=(conn,))
        t.start()

    server.close()

if __name__ == \'__main__\':
    server(\'127.0.0.1\',8806)
View Code

  但存在问题下述问题: 

   虽然实现了并发,即实际则回避了阻塞的问题(并未解决),思路是:让主线程接收客户端的链接,而当每收到了一个链接,就新建一个线程,负责收发消息,互不影响,并没有监测IO,每个线程遇到阻塞IO时,仍然阻塞,但并不影响其他线程,从而实现并发。但会随着客户端链接的增多,服务端开的线程则越来越多,浪费资源,而为了避免机器崩溃。而设置线程池(适应问题规模较小的情况),限制并发数目,效率降低,但保证了机器健康运行。
  所以,至此,单线程下的IO阻塞问题仍未解决。

 

3️⃣  非阻塞IO(nonblocking IO)

  Linux下,可以通过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操作时,流程是这个样子:

  

  对流程图理解如下:

  当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。
从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,
它就知道数据还没有准备好,于是用户就可以在本次到下次再发起read询问的时间间隔内做其他事情,或者直接再次发送read操作。
一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存(这一阶段仍然是阻塞的),然后返回。
  
也就是说非阻塞的recvform系统调用调用之后,进程并没有被阻塞,内核马上返回给进程,如果数据还没准备好,此时会返回一个error。进程在返回之后,可以干点别的事情,然后再发起recvform系统调用。重复上面的过程,
循环往复的进行recvform系统调用。这个过程通常被称之为轮询。轮询检查内核数据,直到数据准备好,再拷贝数据到进程,进行数据处理。需要注意,拷贝数据整个过程,进程仍然是属于阻塞的状态。

   所以,在非阻塞式IO中,用户进程其实是需要不断的主动询问kernel数据准备好了没有。

  实例:

    客户端程序

#!/usr/bin/env python3
#-*- coding:utf-8 -*-
# write by congcong

from socket import *

client = socket(AF_INET,SOCK_STREAM)
client.connect((\'127.0.0.1\',6666))
while True:
    msg = input(\'>>>:\').strip()
    if not msg:continue
    client.send(msg.encode(\'utf-8\'))
    data = client.recv(1024)
    print(data.decode(\'utf-8\'))
client.close()
View Code

    服务端程序

#!/usr/bin/env python3
#-*- coding:utf-8 -*-
# write by congcong

# 非阻塞IO --> 单进程下多线程速度极快,当前程序效率很高(一直占用CPU),但影响其它程序的执行
\'\'\'
  非阻塞的recv系统调用调用之后,进程并没有被阻塞,内核马上返回给进程,如果数据还没准备好,
此时会返回一个error。进程在返回之后,可以干点别的事情,然后再发起recv系统调用。重复上面的过程,
循环往复的进行recv系统调用。这个过程通常被称之为轮询。轮询检查内核数据,直到数据准备好,再拷贝数据到进程,
进行数据处理。需要注意,拷贝数据整个过程,进程仍然是属于阻塞的状态。

\'\'\'
from socket import *
server = socket(AF_INET,SOCK_STREAM)
server.bind((\'127.0.0.1\',6666))
server.listen(5)
server.setblocking(False) # 不阻塞,默认是阻塞
rlist = [] # 存放链接
slist = [] # 存放消息
print(\'staring...\')
while True:
    try:
        conn,addres = server.accept() # IO堵塞
        rlist.append(conn)
        print(rlist)
    except BlockingIOError: # 未收到链接抛异常
        #print(\'数据未准备好!\')
        # 收消息
        del_rlist = []
        for conn in rlist: # 遍历链接列表
            try:
                data  = conn.recv(1024)
                if not data:
                    continue
                #conn.send(data.upper())
                slist.append((conn,data))
            except BlockingIOError:  # 碰到IO阻塞
                continue
            except Exception:
                conn.close()
                del_rlist.append(conn)
        # 发消息
        del_slist = []
        for item in slist:
            try:
                conn = item[0]
                data = item[1]
                conn.send(data.upper()) # 可能会抛异常,即IO阻塞
                del_slist.append(item) # 没抛异常,就将发成功的信息加到将要被删除的队列中
            except BlockingIOError: # 未发送成功
                continue
        for item in del_slist: # 遍历列表,将其中存放的已发送成功的信息删除
            slist.remove(item)
        for conn in del_rlist: # 遍历列表,将未收到数据的链接删除
            rlist.remove(conn)

server.close()
View Code

  优点:

能够在等待任务完成的时间里干其他活了(包括提交其他任务,也就是 “后台” 可以有多个任务在“”同时“”执行)。

  缺点:

1. 循环调用recv()将大幅度推高CPU占用率;这也是我们在代码中留一句time.sleep(2)的原因,否则在低配主机下极容易出现卡机情况
2. 任务完成的响应延迟增大了,因为每过一段时间才去轮询一次read操作,而任务可能在两次轮询之间的任意时间完成。
这会导致整体数据吞吐量的降低。

注意:优点难掩它的缺点,非阻塞IO模型绝不被推荐。

 

4️⃣ 多路复用(IO multiplexing)

  多路复用即select/epoll,有些地方也称这种IO方式为事件驱动IO。

  select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理就是select/epoll,

这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。它的流程如图:

   

     
当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,
当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。  
这个图和blocking IO的图其实并没有太大的不同,事实上还更差一些。因为这里需要使用两个系统调用\\(select和recvfrom\\),
而blocking IO只调用了一个系统调用\\(recvfrom\\)。但是,用select的优势在于它可以同时处理多个connection。

  在多路复用模型中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,

整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。

  注意:select的优势在于可以处理多个连接,不适用于单个连接。

实例:

  客户端程序

#!/usr/bin/env python3
#-*- coding:utf-8 -*-
# write by congcong
from socket import *


client = socket(AF_INET,SOCK_STREAM)
client.connect((\'127.0.0.1\',6868))

while True:
    mes = input(\'>>>:\').strip()
    if not mes:continue
    client.send(mes.encode(\'utf-8\'))
    data = client.recv(1024)
    print(data.decode(\'utf-8\'))
client.close()
View Code

  服务端程序

#!/usr/bin/env python3
#-*- coding:utf-8 -*-
# write by congcong

# 多路复用IO,可以同时检测多个IO,而非阻塞IO只能检测一个IO,单个链接时非阻塞IO效率高,多个链接时多路复用更佳
\'\'\'
    select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。
它的基本原理就是select/epoll这个function会不断的轮询所负责的所有socket,
当某个socket有数据到达了,就通知用户进程。
\'\'\'
from socket import *
import select

server = socket(AF_INET,SOCK_STREAM)
server.bind((\'127.0.0.1\',6868))
server.listen(5)
server.setblocking(False) # 不阻塞
rlist = [server,] # 存放链接和套接字(server,conn)
wlist = [] # 存放发送消息的套接字()
wdata =  
while True:
    rl,wl,xl = select.select(rlist,wlist, [], 0.5)  # 后两个参数分别表示异常列表和超时时间
    print(wl)
    # 收消息
    for sock in rl:
        if sock == server: # 收到server
            conn,addres = sock.accept()
            rlist.append(conn) # 加入列表
        else: # 即收到的是 conn
            try:
                data = sock.recv(1024)
                if not data:
                    sock.close()
                    rlist.remove(sock) # 针对linux系统报错,一直接收的特点
                wlist.append(sock)
                wdata[sock] = data.upper()
            except Exception: # 关闭未收到数据的无用链接和删除套接字
                sock.close()
                rlist.remove(sock)
    # 发消息
    for sock in wl:
        data = wdata[sock] # 获取字典套接字对应的数据
        sock.send(data) # 发送数据
        wlist.remove(sock) # 删除已经接收的套接字
        wdata.pop(sock) # 删除已经发送成功的数据
server.close()
View Code

select监听fd变化的过程分析:

用户进程创建socket对象,拷贝监听的fd到内核空间,每一个fd会对应一张系统文件表,内核空间的fd响应到数据后,
就会发送信号给用户进程数据已到;
用户进程再发送系统调用,比如(accept)将内核空间的数据copy到用户空间,同时作为接受数据端内核空间的数据清除,
这样重新监听时fd再有新的数据又可以响应到了(发送端因为基于TCP协议所以需要收到应答后才会清除)。

该模型的优点:

相比其他模型,使用select() 的事件驱动模型只用单线程(进程)执行,占用资源少,不消耗太多 CPU,同时能够为多客户端提供服务。
如果试图建立一个简单的事件驱动的服务器程序,这个模型有一定的参考价值。

该模型的缺点:

首先select()接口并不是实现“事件驱动”的最好选择。因为当需要探测的句柄值较大时,select()接口本身需要消耗大量时间去轮询各个句柄。
很多操作系统提供了更为高效的接口,如linux提供了epoll,BSD提供了kqueue,Solaris提供了/dev/poll,…。
如果需要实现更高效的服务器程序,类似epoll这样的接口更被推荐。遗憾的是不同的操作系统特供的epoll接口有很大差异,
所以使用类似于epoll的接口实现具有较好跨平台能力的服务器会比较困难。
其次,该模型将事件探测和事件响应夹杂在一起,一旦事件响应的执行体庞大,则对整个模型是灾难性的。

 

5️⃣ 异步IO

  Linux下的asynchronous IO其实用得不多,从内核2.6版本才开始引入。它的流程如下:

 原理分析:

  用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,

当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,

kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程

发送一个signal,告诉它read操作完成了。

网络编程进阶:并发编程之协程io模型(代码片段)

协程:基于单线程实现并发,即只用一个主线程(此时可利用的CPU只有一个)情况下实现并发;并发的本质:切换+保存状态CPU正在运行一个任务,会在两种情况下切走去执行其他任务(切换有操作系统强制控制),一种情况是... 查看详情

并发编程(学习笔记-共享模型之内存)-part4(代码片段)

文章目录并发编程-4-共享模型之内存1.Java内存模型2.可见性2-1退不出的循环2-2可见性vs原子性3.有序性3-1诡异的结果3-2解决方法3-3有序性理解3-4happens-before4.volatile原理4-1如何保证可见性4-2如何保证有序性4-3double-checkedlocking问题并发... 查看详情

并发编程之内存模型(代码片段)

本章要点详细介绍以下几个问题:1、为什么需要内存模型2、什么是内存模型3、内存模型解决什么问题 1、为什么需要内存模型  说道内存模型不得不首先说下目前的计算机组成原理,目前主流的计算机都是冯诺依曼机。... 查看详情

并发编程(学习笔记-共享模型之无锁)-part5(代码片段)

文章目录并发编程-5-共享模型之无锁1.无锁解决线程安全问题2.CAS与volatile2-1CAS2-2volatile2-3为什么无锁效率高2-4CAS特点3.原子整数4.原子引用4-1原子引用的使用4-2ABA问题及解决5.原子数组6.字段更新器7.原子累加器8.LongAdder详解8-1cas锁8... 查看详情

并发编程之i/o模型(代码片段)

1.I/O模型介绍同步(synchronous)IO和异步(asynchronous)IO,阻塞(blocking)IO和非阻塞(non-blocking)IO分别是什么,到底有什么区别?这个问题其实不同的人给出的答案都可能不同,比如wiki,就认为asynchronousIO和non-blockingIO是一个东... 查看详情

并发编程(学习笔记-共享模型之管程)-part3(代码片段)

文章目录并发编程-共享模型之管程-31.共享带来的问题1-1临界区CriticalSection1-2竞态条件RaceCondition2.synchronized解决方案2-1解决方案2-2synchronized的使用2-3方法上的synchronized3.变量的线程安全分析3-1成员变量和静态变量是否线程安全ÿ... 查看详情

golang并发编程之生产者消费者(代码片段)

golang最吸引人的地方可能就是并发了,无论代码的编写上,还是性能上面,golang都有绝对的优势学习一个语言的并发特性,我喜欢实现一个生产者消费者模型,这个模型非常经典,适用于很多的并发场景,下面我通过这个模型,... 查看详情

scala学习并发编程模型akka(代码片段)

...Actor的聊天模型    正文一,Akka简介  写并发程序很难。程序员不得不处理线程、锁和竞态条件等等,这个过程很容易出错,而且会导致程序代码难以阅读、测试和维护。Akka是JVM平台上构建高并发、分布式和容... 查看详情

并发编程之进程之进程的模型:进程同步的工具继创建进程(守护进程)(代码片段)

multiProcessing包中Process模块:   join()堵塞问题,可以理解为: 相当于每个子进程结束时都会给父进程发一条消息,join()则是接收,内部有socket的实现   1,进程之间的数据隔离问题     进程和进程之间的数据是隔离的,内... 查看详情

goroutine并发调度模型深度解析之手撸一个协程池(代码片段)

golanggoroutine协程池GroutinePool高并发并发(并行),一直以来都是一个编程语言里的核心主题之一,也是被开发者关注最多的话题;Go语言作为一个出道以来就自带『高并发』光环的富二代编程语言,它的并发(并行)编程肯定是... 查看详情

音视频开发之旅(52)-java并发编程之内存模型与volatile(代码片段)

目录 JVM内存结构和内存模型并发编程中的三个概念与重排序happens-before原则volatile原理volatile使用场景资料收获一、JVM内存结构和内存模型1.1JVM内存结构图片来自图书《深入理解Java虚拟机》Java虚拟机在运行程序时会把其自动管... 查看详情

音视频开发之旅(52)-java并发编程之内存模型与volatile(代码片段)

目录 JVM内存结构和内存模型并发编程中的三个概念与重排序happens-before原则volatile原理volatile使用场景资料收获一、JVM内存结构和内存模型1.1JVM内存结构图片来自图书《深入理解Java虚拟机》Java虚拟机在运行程序时会把其自动管... 查看详情

15并发编程-(io模型)(代码片段)

一、IO模型介绍1、阻塞与非阻塞指的是程序的两种运行状态阻塞:遇到IO就发生阻塞,程序一旦遇到阻塞操作就会停在原地,并且立刻释放CPU资源非阻塞(就绪态或运行态):没有遇到IO操作,或者通过某种手段让程序即便是遇... 查看详情

13.并发编程之协程(代码片段)

目录一、协程基础二、gevent模块三、asyncio模块一、协程基础cpython下多个线程不能利用多核:规避了所有的io操作的单线程。协程操作系统不可见协程本质就是一条线程,多个任务在一条线程上来回切换,来规避io操作,降低了线... 查看详情

并发编程之内存模型(代码片段)

...的问题。3、内存模型解决什么问题  内存模型解决了并发编程场景中的原子性、可见性和有序性问题。  1)如何解决原子性问题:    在Java中,为了保证原子性,提供了两个高级的字节码指令monitorenter和monitorexit。在s... 查看详情

并发编程:io多路复用。(代码片段)

一 IO模型: Stevens在文章中一共比较了五种IOModel:*blockingIO#阻塞模型*nonblockingIO#非阻塞*IOmultiplexing#多路复用*signaldrivenIO#信号驱动*asynchronousIO#异步由signaldrivenIO(信号驱动IO)在实际中并不常用,所以主要介绍其余四种IOMo... 查看详情

并发编程目录

并发编程目录并发编程之进程1-1进程理论1-2开启进程的两种方式1-3join方法1-4守护进程1-5互斥锁1-6队列1-7生产者消费者模型并发编程之线程2-1线程理论2-2开启线程的两种方式2-3多线程和多进程的区别2-4Thread对象的其他属性或方法2-... 查看详情

提升--03---并发编程之---可见性(代码片段)

并发编程三大特性可见性先看一个小案例:importjava.io.IOException;publicclassT01_HelloVolatileprivatestaticbooleanrunning=true;privatestaticvoidm()System.out.println("mstart");while(running)System.out.prin 查看详情