深入浅出rust异步编程之tokio(代码片段)

author author     2022-12-12     649

关键词:

深入浅出Rust异步编程之Tokio

本文以tokio为例简单介绍Rust异步编程相关的一些知识。

首先让我们看看为什么使用rust来进行异步编程。这里tokio官方给出了一个性能测试的对比,可以看到tokio是性能最好,实际上运行这个基准测试的时候,tokio性能更好的2.0版本尚未发布,否则估计性能还有很大提升。因此,我们可以认为需要非常极致性能的时候,我们可以选择rust+tokio来实现。
技术图片

Rust网络编程

Rust实际上并不跟一定的网络编程模型强绑定,实际rust可以实现阻塞IO+多线程,非阻塞IO+回调,用户态线程等多种模型。这里着重介绍Rust实现的用户态线程。

  • 首先,Rust的用户态线程是一种基于Future的用户态线程,关于Future本身,本文后续部分有详细论述。
  • 其次,由于是Rust实现,因此可以做到零成本抽象,并且更容易做到安全。
  • 最后,由于没有运行时大量内存分配,没有动态逻辑分派,也没有GC开销,所以该实现的效率非常高。

Rust异步编程是构建在操作系统相关API上,MIO库类似Java的Nio库,针对多种操作系统的不同API做了统一封装。Future库类似Java的Future库,提供了相关接口和常用的组合能力。Tokio构建于两者之上,在MIO和future的基础上实现了用户态线程。使用Tokio进行异步编程的技术栈如下,需要注意的是,应用程序会同时接触到Tokio和future的API。
技术图片
Futures

future是rust异步编程的核心。首先我们介绍什么是future。future是一段异步计算程序,可以在将来获取产生的数据。举例来说,获取数据库查询结果,RPC调用这些实际上都可以使用future来实现。通常实现future有两种模式,一种基于推模式,也被称为基于完成的模式,一种基于拉模式,也被称为基于就绪的模式。Rust的future库实现了基于拉模式的future。

rust的future选择拉模式来实现。接口定义如下:

pub trait Future 

    type Item;

    type Error;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error>;

假设一个future要做这样的功能,从TCP数据流读取数据并计算自己读了多少个字节并进行回调。那用代码表示:

struct MyTcpStream 
    socket: TcpStream,
    nread: u64,


impl Future for MyTcpStream 
    type Item =u64;
    type Error = io::Error;

   fn poll(&mut self) -> Poll<Item, io::Error> 
        let mut buf = [0;10];
        loop 
            match self.socket.read(&mut buf) 
                Async::Ready(0) => return Async::Ready(self.nread),
                Async::Ready(n) => self.nread += n,
                Async::NotReady => return Async::NotReady,
            
        
    

每次调用poll方法,MyTcpStream都会调用socket的read方法(这里的TcpStream本身也是一个future,read内部也是调用poll方法),当read返回为Async::NotReady的时候,调度器会将当前的Task休眠,如果返回Async::Read(n)表示读到了数据,则给计数器加对应的数,如果返回Async::Ready(0),则表示TcpStream里有的数据已经读完,就将计数器返回。

为了方便大家使用,future库包提供了很多组合子,以AndThen组合子为例:

enum AndThen<A,F> 
    First(A, F),


fn poll(&mut self) -> Async<Item> 
   match fut_a.poll() 
        Async::Ready(v) => Async::Ready(f(v)),
        Async::NotReady => Async::NotReady,
    

这里AndThen枚举,First有两个值,其中A是一个future,F是一个闭包,AndThen实现的poll方法,就是假如调用future_a的poll方法有返回值,那么就调用闭包,并将其返回值包装为Async::Ready返回,如果poll的返回值是Async::NotReady则同样返回Async::NotReady。有了这个AndThen方法,通过组合子函数(比如and_then实际上是将上一个future和闭包传入生成一个AndThen future),我们就可以实现一些复杂逻辑:

let f=MyTcpStream::connect(&remote_addr)
  .and_then(|num| println!("already read %d",num);
  return num;).and_then(|num| 
    process(num)
  );

tokio::spawn(f);

上面的代码就是建立Tcp连接,然后每次读数据,都通过第一个and_then打印日志,然后再通过第二个and_then做其他处理,tokio::spawn用于执行最终的future,用图形来表示:
技术图片
如果没有数据:
技术图片
如果有数据:
技术图片
如果将MyTcpStream的poll实现改为:

fn poll(&mut self) -> Poll<Item, io::Error> 
        let mut buf = [0;1024];
        let mut bytes = bytesMut::new();
        loop 
            match self.socket.read(&mut buf) 
                Async::Ready(0) => return Async::Ready(bytes.to_vec()),
                Async::Ready(n) => bytes.put(buf[0..n]),
                Async::NotReady => return Async::NotReady,
            
        
    

这段代码主要是将socket中数据读出,然后包装为Async::Ready或者Async::NotReady供下一个future使用,我们就可以实现更复杂的逻辑,比如:

MyTcpStream::connect(&remote_addr)
  .and_then(|sock| io::write(sock, handshake)) //这里发送handshake
  .and_then(|sock| io::read_exact(sock, 10)) // 这里读handshake的响应,假设handeshake很短
  .and_then(|(sock, handshake)|   // 这个future做验证并发送请求
    validate(handshake);
    io::write(sock, request)
  )
  .and_then(|sock| io::read_exact(sock, 10))// 这里读取响应
  .and_then(|(sock, response)|  // 这里处理响应
    process(response)
  )

我们上面解释了future和组合子,漏掉一个重要的API,就是:

tokio::spawn(future)

当我们使用spawn方法的时候,tokio会将传入的future生成一个task,由于future内部包含了另外的future,所以就组成了如下所示结构,其中task就是轻量级线程。
技术图片
Tokio

上面我们介绍了future相关的内容,接下来我们先看看tokio如何使用,我们这里先用taokio启动一个服务器,代码如下:

let listener = TcpListener::bind(&addr).unwrap();

let server = listener.incoming().for_each(move |socket| 
    tokio::spawn(process(socket));
    Ok(())
).map_err(|err| 
        println!("accept error = :?", err);
);

tokio::run(server);

上面的代码首先生成一个TcpListener,listener的incomming和foreach会将连进来的tcp连接生成TcpStream(即代码中的socket),针对每一个连接启动一个用户态线程处理。

Tokio本身是基于Mio和future库来实现的,其主要包含两个主要的大功能部分(本文不是对源码进行分析,Tokio不同版本之间的差异也较大,只是进行原理说明),reactor和scheduler。

scheduler负责对task进行调度,上文所展示的task调度部分功能就是由scheduler负责,reactor部分主要是负责事件触发,比如网络事件,文件系统事件,定时器等等。用图展示如下:
技术图片
当有事件触发的时候,reactor会通过task的api通知scheduler运行该任务。
技术图片
对于Reactor来说,其中最重要的结构是Poll和io_dispatch,在linux上Poll是对Epoll实例的封装(在其他操作系统上也类似),io_dispatch其中记录了调度相关的信息,具体来说主要是记录了task的id和fd的对应关系。当通过Poll获取到FD事件的时候,通过io_dispatch找到task,然后再通知调度器。
技术图片
TcpListner实际并非rust std库中的TcpListner,tokio对其进行了包装,每次有新连接到来的时候都会生成一个新的TcpStream。

TcpStream也是tokio包装后的TcpStream,可以看到其中包含一个PollEvented,而PollEvented内部包含实际的TcpSteam。PollEvented构造之后,会调用io_dispatch中的注册接口,然后在第一次调用poll的时候,将fd和task关联。

Async/await

通过上面的文章可以看到,直接使用tokio相关API还是有些难度的,然而在rust 1.39.0之后的版本,我们可以使用async/awai特性来简化代码,使得代码更容易理解。使用async/await后,上面的代码可以简化为:

#[tokio::main]
pub async fn main() -> Result<(), Box<dyn Error>> 
    let mut stream = TcpStream::connect("127.0.0.1:6142").await?;
    println!("created stream");
    let result = stream.write(b"hello world
").await;
    println!("wrote to stream; success=:?", result.is_ok());
    Ok(())

要点在于对于需要异步的函数使用async修饰,在调用async函数的时候使用await获取返回结果。实际上async函数是由编辑器生成的future,await也是由编译器生成代码调用future的poll方法。因此真正用好async/await也需要对上面的内容了解清楚。

Tips

最后,使用tokio有一些需要注意地方:

  1. 生命周期的问题。声明周期的问题是一直贯穿rust的,具体到tokio使用上来说,最主要的是self的生命周期问题,主要是因为runtime要求借用是静态的,这个跟对象本身的声明周期是有矛盾的。我们推荐的主要做法是使用actor模型,这样可以消除掉对于静态生命周期的要求。
  2. 注意兼容问题,最主要是需要注意future01和future03的兼容性问题,future官方提供了兼容包,来做版本之间的兼容,如果要使用async/await,推荐尽量使用future03库。
  3. runtime,tokio的一个runtime对应一个线程池,因此推荐对不同业务使用不同线程池,减少业务之间相互影响。
  4. 使用TaskExecutor/Handle来spawm一个task。上面代码里经常使用的tokio::spawn是针对默认runtime的,如果使用了不同的runtime,那么就不能使用tokio::spawn。另外,TaskExecutor/Handle支持clone,可以解决一些生命周期带来的问题。
  5. 可以在代码中通过api通知task运行。

近期文章推荐

  • 为什么不应该使用数据库外键(重温旧文)
  • 为了戒网,我给每个网站自动添加3-25秒的访问延迟
  • 如何成为一名在家办公的高效工程师(一个国外团队的远程经验)
  • 详解中型系统如何一步步扩展:从1开始到支撑10万用户
  • 高性能底层怎么运作?一文帮你吃透Netty架构原理
  • 聊聊用 UUID/GUID 作为主键那些坑

技术原创及架构实践文章,欢迎通过公众号菜单「联系我们」进行投稿。

高可用架构

改变互联网的构建方式

技术图片

rust网络编程框架-深入理解tokio中的管道(代码片段)

我们在上文《Rust网络编程框架-Tokio进阶》介绍了async/await和锁的基本用法,并完成了一个Server端的DEMO代码。本文继续来探讨这个话题。客户端代码DEMO上文中依靠telnet来触发服务端代码的执行,本文我们将自己实现一个客... 查看详情

rust网络编程框架-深入理解tokio中的管道(代码片段)

我们在上文《Rust网络编程框架-Tokio进阶》介绍了async/await和锁的基本用法,并完成了一个Server端的DEMO代码。本文继续来探讨这个话题。客户端代码DEMO上文中依靠telnet来触发服务端代码的执行,本文我们将自己实现一个客... 查看详情

tokio教程之stream(代码片段)

...教程之streamhttps://tokio.rs/tokio/tutorial/streams流是一个数值的异步系列。它是Rust的 std::iter::Iterator 的异步等价物,由Stream特性表示。流可以在async函数中被迭代。它们也可以使用适配器进行转换。Tokio在 StreamExt trait上提供了许... 查看详情

rust网络编程框架-tokio进阶(代码片段)

我们在上文《小朋友也能听懂的Rust网络编程框架知识-Tokio基础篇》对于Tokio的基础知识进行了一下初步的介绍,本文就对于Tokio的用法及原理进行进一步的介绍与说明。目前市面上绝大多数编程语言所编写的程序,执行程... 查看详情

tokio教程介绍(代码片段)

...步地完成构建Redis客户端和服务器的过程。我们将从Rust的异步编程的基础知识开始,并在此基础上建立起来。我们将实现Redis命令的一个子集,但会对Tokio进行全面考察。Mini-Redis你将在本教程中构建的项目在GitHub上以Mini-R... 查看详情

tokio教程之select(代码片段)

...。现在我们将介绍一些额外的方法,用Tokio并发执行异步代码。tokio::select!tokio::select! 宏允许在多个异步计算中等待,并在单个计算完成后 查看详情

rust网络编程框架-tokio进阶(代码片段)

...效率更高,但也会导致程序更复杂。开发者需要跟踪异步操作完成后恢复工作所需的所有状态,从我的经验来看,这是一 查看详情

#rust异步网络编程

# Rust异步网络编程Rust的高性能异步网络编程模式目前是基于mio和futures这两个库构建的生态。Tokio则连接这2个库构建了一个异步非阻塞事件驱动编程平台。# 什么是 mio,futures,tokio## 1- MioMio是Rust的轻量级快速低级... 查看详情

tokio教程之桥接同步代码(代码片段)

...们用 #[tokio::main] 标记了主函数,并使整个项目成为异步的。然而,这对所有项目来说都是不可取的。例如,一个GUI应用程序可能希望在主线程上运行GUI代码,并在另一个 查看详情

小朋友也能听懂的rust网络编程框架知识-tokio基础篇(代码片段)

今天我们继续高并发的话题,传统的云计算技术,本质上都是基于虚拟机的,云平台可以将一些性能强劲的物理服务器,拆分成若干个虚拟机,提供给用户使用,但在互联网发展到今天,虚拟机还是太... 查看详情

tokio教程之i/o(代码片段)

...tutorial/ioTokio中的I/O操作方式与std中大致相同,但是是异步的。有一个特质用于读取(AsyncRead)和一个特质用于写入(AsyncWrite)。特定的类型根据情况实现这些特性(TcpStream、File、Stdout 查看详情

select(代码片段)

...。现在我们将介绍一些额外的方法,用Tokio并发执行异步代码。tokio::select!tokio::select! 宏允许在多个异步计算中等待,并在单个计算完成后 查看详情

小朋友也能听懂的rust网络编程框架知识-tokio基础篇(代码片段)

...进,根据官方的测试结果,在性能方面Rust的网络编程框架比JAVA和GO要好得多但是我意外的看到像Rust中Tokio这样优秀的高并发网络编程框架在中文技术社区却没有个完整的教程,但是在周末鸡娃的时候,我意外发现... 查看详情

rust学习教程27-深入了解特征(代码片段)

本文节选自<<Rust语言圣经>>一书欢迎大家加入Rust编程学院,一起学习交流:QQ群:1009730433深入了解特征特征之于Rust更甚于接口之于其他语言,因此特征在Rust中很重要也相对较为复杂,我们决定把特征分为... 查看详情

rust学习教程27-深入了解特征(代码片段)

本文节选自<<Rust语言圣经>>一书欢迎大家加入Rust编程学院,一起学习交流:QQ群:1009730433深入了解特征特征之于Rust更甚于接口之于其他语言,因此特征在Rust中很重要也相对较为复杂,我们决定把特征分为... 查看详情

spawning(代码片段)

Tokio教程之spawninghttps://tokio.rs/tokio/tutorial/spawning我们将换个角度,开始在Redis服务器上工作。接受套接字我们的Redis服务器需要做的第一件事是接受入站的TCP套接字。这是用 tokio::net::TcpListener 完成的。Tokio的许多类型与Rust标... 查看详情

rust实践:使用tokio实现actor系统(代码片段)

简介:原文:ActorswithTokio原文主要介绍了如何使用Tikio而不是已有的Actor库(Actix)来实现Actor系统,在我之前的文章里也讲过Actor系统是什么C++Actor并发模型框架ActorFramework(CAF),介绍的是C... 查看详情

tokio教程之通道(代码片段)

通道Tokio教程之通道https://tokio.rs/tokio/tutorial/channels假设我们想运行两个并发的Redis命令。我们可以为每个命令生成一个任务。那么这两条命令就会同时发生。起初,我们可能会尝试类似的做法。usemini_redis::client;#[tokio::main]async... 查看详情