Reading List (2021 - 03)

Async: What is blocking?

这篇文章先通过一个示例讲述了 blockingnon-blocking 的区别,例子是通过最简单的 sleep() 结合 tokio 完成的。还有一个点需要注意的是: tokio::join!() 将多个 task 放在一个线程 runtime 里执行,但是 tokio::spawn() 就会创建一个多线程的 runtime。

文章下半部分的重点讲述在异步环境下,该如何使用 blocking 方式执行任务,作者提 供了三种方式:

  • 使用 takio::task::spawn_block

  • 使用 rayon

  • 使用 std::thread::spawn

Common Rust Lifetime Misconceptions

Rust 生命周期学习中容易理解偏差的地方。

  • T&T&mut T 的超集。

  • &T&mut T 是不相交的集合。

  • T: 'static 应当视为 T 满足 'static 生命周期约束。

  • T: 'staticT 可以是一个有 'static 生命周期的引用类型,或者是一个所有权类型T: 'static 在包括了全部的 &'static T 的同时,包括了全部的所有权类型,如 String, Vec 等。

  • 因为 T: 'static 包含了所有权类型,所以 T:

    • 可以在运行时动态分配

    • 不需要在整个程序运行期间都有效

    • 可以安全,自由的修改

    • 可以在运行时被动态 drop

    • 可以有不同的生命周期长度

  • T: 'a 接受所有权类型,内部包含引用的所有权类型,引用,而 &'a T 只能接受引用。

  • 几乎所有的 Rust 代码都是泛型代码,并且到处都带有被省略掉的泛型生命周期注解。

  • Rust 生命周期省略规则不保证在任何情况下都正确。

  • 所有的 trait 对象都含有自动推导的生命周期。

  • 生命周期在程序编译时被静态确定,在运行时不能改变。

  • 两条必须要记住的生命周期规则:

    • 如果只有一个输入生命参数,那么它被赋予所有输出生命周期参数。

    • 如果是有多个输入生命周期的方法,而其中一个参数是 &self&mut self,那么所有输出生命周期被赋予 self 的生命周期。

示例程序,帮助理解:

struct Context<'context>(&'context str);

struct Parser<'parser, 'context> {
    context: &'parser Context<'context>,
}

impl<'parser, 'context> Parser<'parser, 'context> {
    fn parse(&'parser self) -> Result<(), &'context str> {
        Err(&self.context.0[1..])
    }
}

fn parse_context<'context>(context: Context<'context>) -> Result<(), &'context str> {
    Parser { context: &context }.parse()
}

Tokio Tutorial

=> Channel 章节

使用场景: 当客户端需要起多个线程请求服务端的时候,多个 client 可能会产生非常复杂的锁问题,这种情况下,使用 channel 就可以完美的解决这个问题。

tokio 提供了很多异步 channel 原语:

  • mpsc,多生产者,单一消费者通道,可以发送很多个值。

  • oneshot,单一生产者,单一消费者通道,只可以发送一个值。

  • broadcast,多生产者,多消费者通道,可以发送很多个值,每个消费者都可以看到任何一个值。

  • watch,单一生产者,多消费者通道,可以发送很多个值,但是不保存发送信息的历史,消费者只能看到最新的值。

示例代码: https://github.com/tokio-rs/website/blob/master/tutorial-code/channels/src/main.rs 值得多次阅读。

=> I/O 章节

tokio 的异步 IO 在设计上尽可能和 std 保持一致。有两个关键的 Trait 设计,分别是 AsyncRead, AsyncWrite。但是在日常使用过程中,我们不会直接使用这两个 trait,我们使用 AsyncReadExtAsyncWriteExt

几个常用的方法:

  • async fn read()

  • async fn read_to_end()

  • write()

  • write_all()

需要注意的一个地方是:

async fn read() 当它返回 Ok(0),这意味着远程数据流已经关闭,这一点需要格外注意。

=> Framing 章节

这一章节的内容有些难以理解,主要介绍的是如果从字符序列转换为 Frame 格式,这其中会涉及到一个 BytesMut 的使用,以及 BufWriter 的使用。

=> Async in depth 章节

use std::pin::Pin;
use std::task::{Context, Poll};

pub trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context)
        -> Poll<Self::Output>;
}

知识点:

  • Output 代表的是当 future 结束时,返回值的类型。

  • Pin 是 Rust 在异步中支持借用的方式。

一个自定义 Future 实现示例:

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

struct Delay {
    when: Instant,
}

impl Future for Delay {
    type Output = &'static str;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<&'static str>
    {
        if Instant::now() >= self.when {
            println!("Hello world");
            Poll::Ready("done")
        } else {
            // Ignore this line for now.
            cx.waker().wake_by_ref();
            Poll::Pending
        }
    }
}

#[tokio::main]
async fn main() {
    let when = Instant::now() + Duration::from_millis(10);
    let future = Delay { when };

    let out = future.await;
    assert_eq!(out, "done");
}

tokio::main 展开后的形式:

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

enum MainFuture {
    // Initialized, never polled
    State0,
    // Waiting on `Delay`, i.e. the `future.await` line.
    State1(Delay),
    // The future has completed.
    Terminated,
}

impl Future for MainFuture {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        use MainFuture::*;

        loop {
            match *self {
                State0 => {
                    let when = Instant::now() + Duration::from_millis(10);
                    let future = Delay { when };
                    *self = State1(future);
                }
                State1(ref mut my_future) => match Pin::new(my_future).poll(cx) {
                    Poll::Ready(out) => {
                        assert_eq!(out, "done");
                        *self = Terminated;
                        return Poll::Ready(());
                    }
                    Poll::Pending => {
                        return Poll::Pending;
                    }
                },
                Terminated => {
                    panic!("future polled after completion")
                }
            }
        }
    }
}

知识点:

Futures 是由其他 Futures 组合而成的,MainFuture 就包含 DelayFuture。 那么最外层的 Future 由谁来 poll 呢。答案是, 或者传递给 tokio::spawn或者#[tokio::main]`

一个简易版本的 executor:

use futures::task;
use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

fn main() {
   let mut mini_tokio = MiniTokio::new();

   mini_tokio.spawn(async {
       let when = Instant::now() + Duration::from_millis(10);
       let future = Delay { when };

       let out = future.await;
       assert_eq!(out, "done");
   });

   mini_tokio.run();
}

struct MiniTokio {
   tasks: VecDeque<Task>,
}

type Task = Pin<Box<dyn Future<Output = ()> + Send>>;

impl MiniTokio {
   fn new() -> MiniTokio {
       MiniTokio {
           tasks: VecDeque::new(),
       }
   }

   /// Spawn a future onto the mini-tokio instance.
   fn spawn<F>(&mut self, future: F)
   where
       F: Future<Output = ()> + Send + 'static,
   {
       self.tasks.push_back(Box::pin(future));
   }

   fn run(&mut self) {
       let waker = task::noop_waker();
       let mut cx = Context::from_waker(&waker);

       while let Some(mut task) = self.tasks.pop_front() {
           if task.as_mut().poll(&mut cx).is_pending() {
               self.tasks.push_back(task);
           }
       }
   }
}

知识点:

  • 上述实现的一个问题是,executor 会一直循环,这会严重的消耗资源。我们希望有一个消息通知组件,这就是 waker 的设计初衷。

  • poll 函数签名中的 Context 包含有一个 waker() 方法,当调用该方法时,会通知 executor 推进当前 future 执行。

  • 有一个至关重要的点,需要特别注意。当在 Future 中返回 Poll::Pending 的时候,一定要确保 waker 有加上,不然就永远拿不到结果。

=> Select 章节

知识点:

  • tokio::select! 允许同时等待多个异步任务执行,并返回先执行结束的任务值,还未执行结束的任务会被丢弃掉。

  • <pattern> = <async expression> => <handler> 是 select 匹配 branch 的语法,最多可以匹配 64 个分支。

  • select! 必须要增加一个 else 分支,这个地方很关键!!!

  • 不同于 spawn 时,异步方法中涉及到的数据必须是拥有所有权的,select! 没有这种限制的,可以借用的,见以下示例。

use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (tx1, rx1) = oneshot::channel();
    let (tx2, rx2) = oneshot::channel();

    let mut out = String::new();

    tokio::spawn(async move {
        // Send values on `tx1` and `tx2`.
    });

    tokio::select! {
        _ = rx1 => {
            out.push_str("rx1 completed");
        }
        _ = rx2 => {
            out.push_str("rx2 completed");
        }
    }

    println!("{}", out);
}
  • select! 选择分支的时候,基于随机的方式选择,原因是,如果顺序选择的话,如果某一个 branch 一直都有数据,那其他分支就无法取得进展了。

  • select! 总是在 loop 里使用。

=> Streams 章节

Stream Trait 的定义:

use std::pin::Pin;
use std::task::{Context, Poll};

pub trait Stream {
    type Item;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;

    fn size_hint(&self) -> (usize, Option<usize>) {
        (0, None)
    }
}

Functional Programming in Rust

这篇文章主要将 Rust 中函数式编程部分的通俗理解。

第一个知识点是关于 Iterator 类型,以及它的两个常用方法,iter()into_iter()

  • iter(): 将数据中的每一个元素的引用传递,并没有 Copy 数据。

  • into_iter(): 将数据中的每一个元素都进行拷贝后,传递。

第二个知识点是关于 Iterator 类型调用 map(), fold(), filter() 等函数后,返回的结果本身也是一个 Iterator 类型。

第三个知识点是关于 Lazy Programming

fn main() {
    let vector = (1..)            // Infinite range of integers
        .filter(|x| x % 2 != 0)   // Collect odd numbers
        .take(5)                  // Only take five numbers
        .map(|x| x * x)           // Square each number
        .collect::<Vec<usize>>(); // Return as a new Vec<usize>
    println!("{:?}", vector);     // Print result
}

透过 Rust 探索系统的本原,并发篇

  1. 多线程共享状态设计时,往往会使用大量的锁,就有可能涉及到互斥锁和读写锁。这两种常见的锁的应用场景是不一样的,并不见得读写锁就优于互斥锁,在生产环境下还是需要进行严格的测试再做决定。读写锁仅仅在读比写多的场景下,具有很大的优势。

  2. 在复杂锁的情况下,提高共享状态访问效率的唯一办法就是优化锁,尽可能减少锁的粒度。

A look back at asynchronous Rust

该文章太深奥了,对于我这样的异步新手来说,能理解的内容不过百分之十,留着吧,以后返回来再看。

Rust iterators tips and tricks

主要讲了两个内容:

  • 给自定义数据结构体封装 iter() 方法。

struct MyCollection {
    data: Vec<i32>, // or any other type that itself has an `iter()` method
    // ...
}

impl MyCollection {
    fn iter(&self) -> impl Iterator {
        self.data.iter()
    }
    // ...
}
  • 针对同一份数据,返回多种 Iterators

enum Either<Left, Right> {
    Left(Left),
    Right(Right),
}

impl<Left, Right, Item> Iterator for Either<Left, Right>
where
    Left: Iterator<Item = Item>,
    Right: Iterator<Item = Item>,
{
    type Item = Item;
    fn next(&mut self) -> Option<Self::Item> {
        match self {
            Self::Left(it) => it.next(),
            Self::Right(it) => it.next(),
        }
    }
}

fn forward_or_backward<T>(v: &Vec<T>, forward: bool) -> impl Iterator<Item = &T> + '_ {
    if forward {
        Either::Left(v.iter())
    } else {
        Either::Right(v.iter().rev())
    }
}

fn main() {
    let v = vec![0, 1, 3, 7];
    let forward: Vec<_> = forward_or_backward(&v, true).collect();
    let backward: Vec<_> = forward_or_backward(&v, false).collect();
    println!("forward: {:?}", forward);
    println!("backward: {:?}", backward);
}