Rust并发入坑记录

## 开始之前的碎碎念

就像之前说过的,终于得用 rust 玩点并发了。rust 的并发还是很有意思的,和完全依赖系统函数调用的 c 不太一样, rust 提供了一定的上层抽象,同时也尽可能地通过内置的静态检查去规避可能出现的线程安全问题。

由于 rust 是相当偏底层的语言之一,和自带 runtime 的种种语言相比,实现并发需要考虑的细节就更多了。

当然我们不会去写传统的多线程应用,在应用程序里手动的创建多个线程可太蠢了,都已经2023年了,就当然要用协程来写并发!

cpp 的情况类似,rust 的协程也是通过用户态线程(虚拟线程/协程)+ 协程调度器实现的。

听起来很抽象,但是根据我的理解,这玩意儿有点像是 v-dom

内核线程的创建和切换等操作都相当的花时间,但是内核态的线程是由操作系统调度且能直接控制硬件资源的。而用户态线程则恰恰相反,它的管理工作交由用户程序等上层的 runtime 实现,所以用户态线程可以同时创建几千上万个而不会阻塞当前线程的工作。

为了使得协程实际执行各项任务,在创建协程之外还需要一个协程调度器将其绑定到真实的内核线程之上,就像 golanggoroutine 一样!

比较蛋疼的是 rust 的标准库里压根就没有协程调度器的存在,所以为了愉快的开始写并发程序,还需要一个第三方运行时的支持。

本次的记录中使用的算是 rust 社区生态中相当主流的异步运行时之一——Tokio。由于大多数异步库或者应用都直接或间接的依赖于它,所以某种意义上来说,Tokio已经成为了当前 rust 异步生态的事实标准。

协程版本的你好世界

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
use mini_redis::{client, Result};

#[tokio::main]
async fn main() -> Result<()> {
// Open a connection to the mini-redis address.
let mut client = client::connect("127.0.0.1:6379").await?;

// Set the key "hello" with value "world"
client.set("hello", "world".into()).await?;

// Get key "hello"
let result = client.get("hello").await?;

println!("got value from the server; result={:?}", result);

Ok(())
}

这段代码挺简单的,唯一特别的地方可能就是 await 关键字的位置,main 函数上的宏,以及语句末尾的 ?

主要是我主力还是写前端居多,所以 await 后置感觉就怪怪的,rust 里面这样写就有的更接近于 Promisethen 调用。

? 则是 rust 里针对函数返回 Option<T> 或者 Result<T> 的一个语法糖,具体可以参考文档

而这个奇特的宏 #[tokio::main] 就比较有门道了。下面是 cargo expand 展开的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#![feature(prelude_import)]
#[prelude_import]
use std::prelude::rust_2021::*;
#[macro_use]
extern crate std;
use mini_redis::{client, Result};
fn main() -> Result<()> {
let body = async {
let mut client = client::connect("127.0.0.1:6379").await?;
client.set("hello", "world".into()).await?;
let result = client.get("hello").await?;
{
::std::io::_print(
format_args!("got value from the server; result={0:?}\n", result),
);
};
Ok(())
};
#[allow(clippy::expect_used, clippy::diverging_sub_expression)]
{
return tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("Failed building the Runtime")
.block_on(body);
}
}

原先的 main 函数的内容被转移到了 body 这个闭包中,并且通过一个奇特 block_on 方法调用。

尽管这其中有很多从没见过的方法调用,但是实际上我们只需要关心new_multi_thread 这个静态函数以及上文提到的block_on

下面是其函数签名

1
pub fn new_multi_thread() -> Builder

额,又冒出一个新类型。。。

根据文档所言,这个函数会返回一个使用多线程调度的builder

builder 则是一个用于自定义 tokio 运行时的构造函数,enable_all 是一个链式调用的配置方法,其将会同时启用 I/O 和时间相关的驱动。

然后是 build 方法,这个方法会返回一个运行时的实例。

1
pub fn build(&mut self) -> Result<Runtime>

这个 Runtime 类型就是异步环境的核心了,它完成了诸如事件循环、I/O 交互还有计时器等等操作。

简易的协程解释

尽管从现在开始,我们就已经可以用 Tokio 提供的 API 来写协程了。。但是在这之前我还是想要插入一个关于协程原理的简要介绍。

在本篇博文的开头我们已经介绍了用户态线程等概念,但这很可能会让我们先入为主的认为,“用协程去写并发就一定是多线程的”,但是事实并不完全是这样。

例如使用 js 提供的 Promiseasync/await 语法,你也可以写异步的程序,但是众所周知,js 的运行时实现几乎都是默认单线程执行的,这是怎么做到的?

类似的情况还发生在 python 的协程上,除非你手动编写多线程任务,否则py 几乎总是单线程运行的,但是通过 generatoryield 以及对应的语法糖实现,py 也可以实现协程。

如果你先前有了解过的,你可能听说过这样一个说法"协程是可以暂停的函数",而很多协程的实现就是这样的!例如Rust或者Kotlinsuspend函数,其本质上都是基于的状态机可被"暂停"执行的函数。

协程一般可以被分为有栈协程和无栈协程,使用编译期转换成状态机的一般为无栈协程,如果没有特殊说明,本文接下来提到的“协程”都为无栈协程。

哈?函数的执行可以在中途暂停吗?当然对于一个完整的函数是不行的,但是如果我把一个函数拆分成多个部分呢?

例如我们刚才的示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
async fn main() -> Result<()> {
// Open a connection to the mini-redis address.
let mut client = client::connect("127.0.0.1:6379").await?;

// Set the key "hello" with value "world"
client.set("hello", "world".into()).await?;

// Get key "hello"
let result = client.get("hello").await?;

println!("got value from the server; result={:?}", result);

Ok(())
}

其中的 .await 关键字就将整个程序切分成了以下几个部分

  1. 建立与 mini-redis 的连接
  2. 在数据库中设置键值对
  3. 通过键取出值

假设我们将这几个操作放在一个 match 中(类似于其它类C语言的switch),然后生成一个变量用于记录当前的状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
async fn fun() {
let mut state = 0;

match state {
-1 => println!("done"),
0 => {
let mut client = client::connect("127.0.0.1:6379");
state += 1;
}
1 => {
client.set("hello", "world".into());
state += 1;
}
2 => {
let result = client.get("hello");
state += 1;
}
3 => {
println!("got value from the server; result={:?}", result);
state += 1;
}
_ => {}
}
}

这只是一段伪代码,但是基本思路差不多就是这样,将状态外提并,调度器就会轮流的执行任务队列中的各个协程,直到每个任务最后都完成。

注意,这是一个极度简化的模型,Rust 中的实现是每个异步函数的返回值都是一个Future,然后通过轮询和 Waker 的结合不断地 poll 直到任务完成。(关于这些词的含义我们会在后续的章节介绍)


Rust并发入坑记录
https://ooj2003.github.io/2023/09/17/技术/Rust并发入坑记录/
作者
OOJ2003
发布于
2023年9月17日
更新于
2024年7月12日
许可协议