Reading List (2021 - 03)¶
Async: What is blocking?¶
这篇文章先通过一个示例讲述了 blocking
和 non-blocking
的区别,例子是通过最简单的 sleep()
结合 tokio
完成的。还有一个点需要注意的是: tokio::join!()
将多个 task 放在一个线程 runtime 里执行,但是 tokio::spawn()
就会创建一个多线程的 runtime。
文章下半部分的重点讲述在异步环境下,该如何使用 blocking
方式执行任务,作者提 供了三种方式:
使用
takio::task::spawn_block
使用
rayon
使用
std::thread::spawn
Common Rust Lifetime Misconceptions¶
Rust 生命周期学习中容易理解偏差的地方。
T
是&T
和&mut T
的超集。&T
和&mut T
是不相交的集合。T: 'static
应当视为T
满足'static
生命周期约束。若
T: 'static
则T
可以是一个有'static
生命周期的引用类型,或者是一个所有权类型。T: 'static
在包括了全部的&'static T
的同时,包括了全部的所有权类型,如String
,Vec
等。因为
T: 'static
包含了所有权类型,所以 T:可以在运行时动态分配
不需要在整个程序运行期间都有效
可以安全,自由的修改
可以在运行时被动态 drop
可以有不同的生命周期长度
T: 'a
接受所有权类型,内部包含引用的所有权类型,引用,而&'a T
只能接受引用。几乎所有的 Rust 代码都是泛型代码,并且到处都带有被省略掉的泛型生命周期注解。
Rust 生命周期省略规则不保证在任何情况下都正确。
所有的 trait 对象都含有自动推导的生命周期。
生命周期在程序编译时被静态确定,在运行时不能改变。
两条必须要记住的生命周期规则:
如果只有一个输入生命参数,那么它被赋予所有输出生命周期参数。
如果是有多个输入生命周期的方法,而其中一个参数是
&self
或&mut self
,那么所有输出生命周期被赋予self
的生命周期。
示例程序,帮助理解:
struct Context<'context>(&'context str);
struct Parser<'parser, 'context> {
context: &'parser Context<'context>,
}
impl<'parser, 'context> Parser<'parser, 'context> {
fn parse(&'parser self) -> Result<(), &'context str> {
Err(&self.context.0[1..])
}
}
fn parse_context<'context>(context: Context<'context>) -> Result<(), &'context str> {
Parser { context: &context }.parse()
}
Tokio Tutorial¶
=> Channel 章节
使用场景: 当客户端需要起多个线程请求服务端的时候,多个 client 可能会产生非常复杂的锁问题,这种情况下,使用 channel 就可以完美的解决这个问题。
tokio 提供了很多异步 channel 原语:
mpsc,多生产者,单一消费者通道,可以发送很多个值。
oneshot,单一生产者,单一消费者通道,只可以发送一个值。
broadcast,多生产者,多消费者通道,可以发送很多个值,每个消费者都可以看到任何一个值。
watch,单一生产者,多消费者通道,可以发送很多个值,但是不保存发送信息的历史,消费者只能看到最新的值。
示例代码: https://github.com/tokio-rs/website/blob/master/tutorial-code/channels/src/main.rs 值得多次阅读。
=> I/O 章节
tokio 的异步 IO 在设计上尽可能和 std
保持一致。有两个关键的 Trait 设计,分别是 AsyncRead
, AsyncWrite
。但是在日常使用过程中,我们不会直接使用这两个 trait,我们使用 AsyncReadExt
和 AsyncWriteExt
。
几个常用的方法:
async fn read()
async fn read_to_end()
write()
write_all()
需要注意的一个地方是:
async fn read()
当它返回 Ok(0)
,这意味着远程数据流已经关闭,这一点需要格外注意。
=> Framing 章节
这一章节的内容有些难以理解,主要介绍的是如果从字符序列转换为 Frame 格式,这其中会涉及到一个 BytesMut
的使用,以及 BufWriter
的使用。
=> Async in depth 章节
use std::pin::Pin;
use std::task::{Context, Poll};
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context)
-> Poll<Self::Output>;
}
知识点:
Output
代表的是当 future 结束时,返回值的类型。Pin
是 Rust 在异步中支持借用的方式。
一个自定义 Future 实现示例:
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
struct Delay {
when: Instant,
}
impl Future for Delay {
type Output = &'static str;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<&'static str>
{
if Instant::now() >= self.when {
println!("Hello world");
Poll::Ready("done")
} else {
// Ignore this line for now.
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
#[tokio::main]
async fn main() {
let when = Instant::now() + Duration::from_millis(10);
let future = Delay { when };
let out = future.await;
assert_eq!(out, "done");
}
tokio::main
展开后的形式:
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
enum MainFuture {
// Initialized, never polled
State0,
// Waiting on `Delay`, i.e. the `future.await` line.
State1(Delay),
// The future has completed.
Terminated,
}
impl Future for MainFuture {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
use MainFuture::*;
loop {
match *self {
State0 => {
let when = Instant::now() + Duration::from_millis(10);
let future = Delay { when };
*self = State1(future);
}
State1(ref mut my_future) => match Pin::new(my_future).poll(cx) {
Poll::Ready(out) => {
assert_eq!(out, "done");
*self = Terminated;
return Poll::Ready(());
}
Poll::Pending => {
return Poll::Pending;
}
},
Terminated => {
panic!("future polled after completion")
}
}
}
}
}
知识点:
Futures 是由其他 Futures 组合而成的,MainFuture
就包含 DelayFuture
。 那么最外层的 Future
由谁来 poll
呢。答案是, 或者传递给 tokio::spawn或者
#[tokio::main]`
一个简易版本的 executor
:
use futures::task;
use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
fn main() {
let mut mini_tokio = MiniTokio::new();
mini_tokio.spawn(async {
let when = Instant::now() + Duration::from_millis(10);
let future = Delay { when };
let out = future.await;
assert_eq!(out, "done");
});
mini_tokio.run();
}
struct MiniTokio {
tasks: VecDeque<Task>,
}
type Task = Pin<Box<dyn Future<Output = ()> + Send>>;
impl MiniTokio {
fn new() -> MiniTokio {
MiniTokio {
tasks: VecDeque::new(),
}
}
/// Spawn a future onto the mini-tokio instance.
fn spawn<F>(&mut self, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
self.tasks.push_back(Box::pin(future));
}
fn run(&mut self) {
let waker = task::noop_waker();
let mut cx = Context::from_waker(&waker);
while let Some(mut task) = self.tasks.pop_front() {
if task.as_mut().poll(&mut cx).is_pending() {
self.tasks.push_back(task);
}
}
}
}
知识点:
上述实现的一个问题是,executor 会一直循环,这会严重的消耗资源。我们希望有一个消息通知组件,这就是
waker
的设计初衷。poll
函数签名中的Context
包含有一个waker()
方法,当调用该方法时,会通知 executor 推进当前 future 执行。有一个至关重要的点,需要特别注意。当在 Future 中返回
Poll::Pending
的时候,一定要确保waker
有加上,不然就永远拿不到结果。
=> Select 章节
知识点:
tokio::select!
允许同时等待多个异步任务执行,并返回先执行结束的任务值,还未执行结束的任务会被丢弃掉。<pattern> = <async expression> => <handler>
是 select 匹配 branch 的语法,最多可以匹配 64 个分支。select!
必须要增加一个else
分支,这个地方很关键!!!不同于
spawn
时,异步方法中涉及到的数据必须是拥有所有权的,select!
没有这种限制的,可以借用的,见以下示例。
use tokio::sync::oneshot;
#[tokio::main]
async fn main() {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
let mut out = String::new();
tokio::spawn(async move {
// Send values on `tx1` and `tx2`.
});
tokio::select! {
_ = rx1 => {
out.push_str("rx1 completed");
}
_ = rx2 => {
out.push_str("rx2 completed");
}
}
println!("{}", out);
}
select!
选择分支的时候,基于随机的方式选择,原因是,如果顺序选择的话,如果某一个 branch 一直都有数据,那其他分支就无法取得进展了。select!
总是在loop
里使用。
=> Streams 章节
Stream
Trait 的定义:
use std::pin::Pin;
use std::task::{Context, Poll};
pub trait Stream {
type Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
fn size_hint(&self) -> (usize, Option<usize>) {
(0, None)
}
}
Functional Programming in Rust¶
这篇文章主要将 Rust 中函数式编程部分的通俗理解。
第一个知识点是关于 Iterator
类型,以及它的两个常用方法,iter()
和 into_iter()
。
iter()
: 将数据中的每一个元素的引用传递,并没有 Copy 数据。into_iter()
: 将数据中的每一个元素都进行拷贝后,传递。
第二个知识点是关于 Iterator
类型调用 map()
, fold()
, filter()
等函数后,返回的结果本身也是一个 Iterator
类型。
第三个知识点是关于 Lazy Programming
fn main() {
let vector = (1..) // Infinite range of integers
.filter(|x| x % 2 != 0) // Collect odd numbers
.take(5) // Only take five numbers
.map(|x| x * x) // Square each number
.collect::<Vec<usize>>(); // Return as a new Vec<usize>
println!("{:?}", vector); // Print result
}
透过 Rust 探索系统的本原,并发篇¶
多线程共享状态设计时,往往会使用大量的锁,就有可能涉及到互斥锁和读写锁。这两种常见的锁的应用场景是不一样的,并不见得读写锁就优于互斥锁,在生产环境下还是需要进行严格的测试再做决定。读写锁仅仅在读比写多的场景下,具有很大的优势。
在复杂锁的情况下,提高共享状态访问效率的唯一办法就是优化锁,尽可能减少锁的粒度。
A look back at asynchronous Rust¶
该文章太深奥了,对于我这样的异步新手来说,能理解的内容不过百分之十,留着吧,以后返回来再看。
Rust iterators tips and tricks¶
主要讲了两个内容:
给自定义数据结构体封装
iter()
方法。
struct MyCollection {
data: Vec<i32>, // or any other type that itself has an `iter()` method
// ...
}
impl MyCollection {
fn iter(&self) -> impl Iterator {
self.data.iter()
}
// ...
}
针对同一份数据,返回多种
Iterators
。
enum Either<Left, Right> {
Left(Left),
Right(Right),
}
impl<Left, Right, Item> Iterator for Either<Left, Right>
where
Left: Iterator<Item = Item>,
Right: Iterator<Item = Item>,
{
type Item = Item;
fn next(&mut self) -> Option<Self::Item> {
match self {
Self::Left(it) => it.next(),
Self::Right(it) => it.next(),
}
}
}
fn forward_or_backward<T>(v: &Vec<T>, forward: bool) -> impl Iterator<Item = &T> + '_ {
if forward {
Either::Left(v.iter())
} else {
Either::Right(v.iter().rev())
}
}
fn main() {
let v = vec![0, 1, 3, 7];
let forward: Vec<_> = forward_or_backward(&v, true).collect();
let backward: Vec<_> = forward_or_backward(&v, false).collect();
println!("forward: {:?}", forward);
println!("backward: {:?}", backward);
}