How Rust makes the Rayon data parallelism library magical

As Rust's asynchronous feature has evolved, many open source Rust projects decided to consume or provide async crates. This article will present:

  • An overview of asynchronous design in Rust.
  • How to consume Rust async crates.
  • How to expose Rust async crate with the help of the async executor.
  • How to expose Rust async crate without help from the async executor.

Overview of Rust asynchronous design

The Rust asynchronous design mainly contains the following:

  • Future trait
  • async keyword
  • await keyword
  • Executor

Let's explore how these elements are used in the following example code:

use std::future::Future;

fn foo() -> impl Future<Output = u8> {
    async {
        println!("foo() 1");
        1
    }
}

async fn bar() -> u8 {
    println!("bar() 2");
    2
}

#[tokio::main]
async fn main() {
    foo().await;
    bar().await;
}

The async keyword before fn is just syntactic sugar; it converts async fn foo() -> u8 to fn foo() -> impl Future<Output = u8>. Both of them returned anonymous.

The executor in this example is Tokio, which is responsible for resolving the Future to its Output when await is invoked. The Rust std and core do not provide any executor, trusting the open source community to provide a good option. Besides tokio, there is also:

With the help of the async keyword, both the foo() and bar() functions in our example return an anonymous type that implements the Future trait with Output set to u8, and the poll() function simply places Poll::Ready() out of the block of code. The poll() function will be invoked by the async executor upon calling .await.

If you remove any .await from the above example code, you will not get a print message for that function. For example, by replacing bar().await with bar() in the main() function, the execution output will not contain message bar() 2.

How to consume Rust async crates

The futures-rs provides join! and select!, which take multiple Future objects and resolve them concurrently.

The example code below demonstrates how to consume the async sleep() function provided by Tokio crates:

use std::time::Duration;

async fn foo() {
    tokio::time::sleep(Duration::from_secs(3)).await;
    println!("foo() slept 3");
}

async fn bar() {
    tokio::time::sleep(Duration::from_secs(1)).await;
    println!("bar() slept 1");
}

#[tokio::main]
async fn main() {
    futures::future::join(foo(), bar()).await;
}

The #[tokio::main] is a macro of the Tokio async executor. If you want more control of the Tokio runtime, you can use:

tokio::runtime::Builder::new_multi_thread()
    .enable_all()
    .build()
    .unwrap()
    .block_on(your_async_func)

Both tokio::time::sleep() and futures::future::join() are async functions, hence, .await is required.

The total runtime of the above example code is 3 seconds, indicating foo() and bar() functions are running simultaneously.

Expose Rust async crate with the async executor

To create your own async Rust crate, you need to implement the Future trait for your own future struct or enum objects. But handling the wake-up locking between threads is way too complex for small projects.

The smol::Async and tokio::io::unix::AsyncFd provide a way to wrap Unix file descriptor (file or socket) into an async struct with Future implemented.

This example code uses the POSIX C function timer_create() to mimic the tokio::time::sleep(). You can find the complete code in this GitHub repo.

// SPDX-License-Identifier: Apache-2.0

use std::time::Duration;

use nix::sys::time::TimeSpec;
use nix::sys::timerfd::{
    ClockId::CLOCK_BOOTTIME, Expiration, TimerFd, TimerFlags, TimerSetTimeFlags,
};

#[cfg(feature = "smol")]
use smol::Async as DefaultAsync;
#[cfg(not(feature = "smol"))]
use tokio::io::unix::AsyncFd as DefaultAsync;

struct AsyncSleep;

impl AsyncSleep {
    async fn sleep(dur: Duration) -> Result<(), Box<dyn std::error::Error>>{
        let timer = TimerFd::new(CLOCK_BOOTTIME, TimerFlags::empty())?;
        timer
            .set(
                Expiration::OneShot(TimeSpec::from_duration(dur)),
                TimerSetTimeFlags::empty(),
            )?;
        let _ = DefaultAsync::new(timer)?.readable().await?;
    }
}

By using #[cfg(feature = "smol")] and #[cfg(not(feature = "smol"))], we are allowing the user to choose between smol and tokio as the async executor.

The smol::Async and tokio::io::unix::AsyncFd have the same new() function definition; hence, we just conditionally import them as DefaultAsync and then hide the detail in follow up of DefaultAsync::new() to create an async object out of TimerFd().

With zero awareness of Future trait, our AsyncSleep::sleep() works out of the box with the help of the Rust async executor. But as you might have already noticed, we are forcing our crate user to use the Rust async executor we supported.

Expose Rust async crate without async executor

If your async crate should be working on all async executors, you need to implement the Future trait manually.

The definition of std::future::Future trait is:

pub trait Future {
    type Output;

    // Required method
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

The Output defines the object type after Future has been resolved. The poll() function returns Poll::Ready(T) when data been resolved, or Poll::Pending otherwise.

The poll() function will be invoked by the .await call. If Poll::Pending has been returned, the Rust executor will not invoke poll() functions unless the std::task::Waker::wake() of Context invoked. It is your responsibility to invoke std::task::Waker::wake() when the Future might be ready. Once std::task::Waker::wake() is returned, the poll() will be invoked again. It is also OK to return a Poll::Pending again to indicate we need to wait for the other wake() call.

Thanks to Jiri Stransky, we noticed if you invoke wake() inside of poll() function before returning Poll::Pending, the Future can be resolved eventually without threading, but this would consume 100% of CPU due to the constant looping until Poll::Ready been generated. Hence, wake() inside of poll() is strongly prohibited in production projects.

Let's take a look at this demo code that mimicks tokio::time::sleep using self-implemented Future.

rust
struct SharedState {
    completed: bool,
    waker: Option<Waker>,
}

struct AsyncSleep {
    // The `Arc<Mutex<>>` is required to protect the shared data accessing
    // by threads
    shared_state: Arc<Mutex<SharedState>>,
}

impl AsyncSleep {
    fn sleep(dur: Duration) -> Self {
        let shared_state = Arc::new(Mutex::new(SharedState {
            completed: false,
            waker: None,
        }));
        let thread_state = shared_state.clone();
        std::thread::spawn(move || {
            std::thread::sleep(dur);
            let mut state = thread_state.lock().unwrap();
            state.completed = true;
            if let Some(waker) = state.waker.take() {
                waker.wake()
            }
        });
        Self { shared_state }
    }
}

impl Future for AsyncSleep {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut shared_state = self
            .shared_state
            .lock()
            .expect("Failed to lock the share state");
        if shared_state.completed {
            Poll::Ready(())
        } else {
            shared_state.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}

The rust async executor only runs poll() function on first .await call. After getting Poll::Pending, the Rust async executor will only re-run the poll() function when std::task::Waker::wake() been invoked.

In the example above, we spawned a thread to invoke the std::task::Waker::wake() when desired sleep time expired.

Summary

To consume Rust async crates, you need to choose a Rust async executor, decorate the function with the async keyword, and use .await to resolve the future objects.

To provide a Rust async crate, if you can pin to a specific Rust executor, the Rust executor helper could greatly simplify your work. Otherwise, be sure to implement the Future trait carefully.