As Rust's asynchronous feature has evolved, many open source Rust projects decided to consume or provide async crates. This article will present:
- An overview of asynchronous design in Rust.
- How to consume Rust async crates.
- How to expose Rust async crate with the help of the async executor.
- How to expose Rust async crate without help from the async executor.
Overview of Rust asynchronous design
The Rust asynchronous design mainly contains the following:
Future
traitasync
keywordawait
keyword- Executor
Let's explore how these elements are used in the following example code:
use std::future::Future;
fn foo() -> impl Future<Output = u8> {
async {
println!("foo() 1");
1
}
}
async fn bar() -> u8 {
println!("bar() 2");
2
}
#[tokio::main]
async fn main() {
foo().await;
bar().await;
}
The async
keyword before fn
is just syntactic sugar; it converts async fn foo() -> u8
to fn foo() -> impl Future<Output = u8>
. Both of them returned anonymous.
The executor in this example is Tokio, which is responsible for resolving the Future
to its Output
when await
is invoked. The Rust std
and core
do not provide any executor, trusting the open source community to provide a good option. Besides tokio
, there is also:
With the help of the async
keyword, both the foo()
and bar()
functions in our example return an anonymous type that implements the Future
trait with Output
set to u8
, and the poll()
function simply places Poll::Ready()
out of the block of code. The poll()
function will be invoked by the async executor upon calling .await
.
If you remove any .await
from the above example code, you will not get a print message for that function. For example, by replacing bar().await
with bar()
in the main()
function, the execution output will not contain message bar() 2
.
How to consume Rust async crates
The futures-rs
provides join!
and select!
, which take multiple Future
objects and resolve them concurrently.
The example code below demonstrates how to consume the async sleep()
function provided by Tokio crates:
use std::time::Duration;
async fn foo() {
tokio::time::sleep(Duration::from_secs(3)).await;
println!("foo() slept 3");
}
async fn bar() {
tokio::time::sleep(Duration::from_secs(1)).await;
println!("bar() slept 1");
}
#[tokio::main]
async fn main() {
futures::future::join(foo(), bar()).await;
}
The #[tokio::main]
is a macro of the Tokio async executor. If you want more control of the Tokio runtime, you can use:
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(your_async_func)
Both tokio::time::sleep()
and futures::future::join()
are async functions, hence, .await
is required.
The total runtime of the above example code is 3 seconds, indicating foo()
and bar()
functions are running simultaneously.
Expose Rust async crate with the async executor
To create your own async Rust crate, you need to implement the Future
trait for your own future struct
or enum
objects. But handling the wake-up locking between threads is way too complex for small projects.
The smol::Async
and tokio::io::unix::AsyncFd
provide a way to wrap Unix file descriptor (file or socket) into an async struct with Future
implemented.
This example code uses the POSIX C function timer_create()
to mimic the tokio::time::sleep()
. You can find the complete code in this GitHub repo.
// SPDX-License-Identifier: Apache-2.0
use std::time::Duration;
use nix::sys::time::TimeSpec;
use nix::sys::timerfd::{
ClockId::CLOCK_BOOTTIME, Expiration, TimerFd, TimerFlags, TimerSetTimeFlags,
};
#[cfg(feature = "smol")]
use smol::Async as DefaultAsync;
#[cfg(not(feature = "smol"))]
use tokio::io::unix::AsyncFd as DefaultAsync;
struct AsyncSleep;
impl AsyncSleep {
async fn sleep(dur: Duration) -> Result<(), Box<dyn std::error::Error>>{
let timer = TimerFd::new(CLOCK_BOOTTIME, TimerFlags::empty())?;
timer
.set(
Expiration::OneShot(TimeSpec::from_duration(dur)),
TimerSetTimeFlags::empty(),
)?;
let _ = DefaultAsync::new(timer)?.readable().await?;
}
}
By using #[cfg(feature = "smol")]
and #[cfg(not(feature = "smol"))]
, we are allowing the user to choose between smol
and tokio
as the async executor.
The smol::Async
and tokio::io::unix::AsyncFd
have the same new()
function definition; hence, we just conditionally import them as DefaultAsync
and then hide the detail in follow up of DefaultAsync::new()
to create an async object out of TimerFd()
.
With zero awareness of Future
trait, our AsyncSleep::sleep()
works out of the box with the help of the Rust async executor. But as you might have already noticed, we are forcing our crate user to use the Rust async executor we supported.
Expose Rust async crate without async executor
If your async crate should be working on all async executors, you need to implement the Future
trait manually.
The definition of std::future::Future
trait is:
pub trait Future {
type Output;
// Required method
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
The Output
defines the object type after Future
has been resolved. The poll()
function returns Poll::Ready(T)
when data been resolved, or Poll::Pending
otherwise.
The poll()
function will be invoked by the .await
call. If Poll::Pending
has been returned, the Rust executor will not invoke poll()
functions unless the std::task::Waker::wake()
of Context
invoked. It is your responsibility to invoke std::task::Waker::wake()
when the Future
might be ready. Once std::task::Waker::wake()
is returned, the poll()
will be invoked again. It is also OK to return a Poll::Pending
again to indicate we need to wait for the other wake()
call.
Thanks to Jiri Stransky, we noticed if you invoke wake()
inside of poll()
function before returning Poll::Pending
, the Future
can be resolved eventually without threading, but this would consume 100% of CPU due to the constant looping until Poll::Ready
been generated. Hence, wake()
inside of poll()
is strongly prohibited in production projects.
Let's take a look at this demo code that mimicks tokio::time::sleep
using self-implemented Future
.
rust
struct SharedState {
completed: bool,
waker: Option<Waker>,
}
struct AsyncSleep {
// The `Arc<Mutex<>>` is required to protect the shared data accessing
// by threads
shared_state: Arc<Mutex<SharedState>>,
}
impl AsyncSleep {
fn sleep(dur: Duration) -> Self {
let shared_state = Arc::new(Mutex::new(SharedState {
completed: false,
waker: None,
}));
let thread_state = shared_state.clone();
std::thread::spawn(move || {
std::thread::sleep(dur);
let mut state = thread_state.lock().unwrap();
state.completed = true;
if let Some(waker) = state.waker.take() {
waker.wake()
}
});
Self { shared_state }
}
}
impl Future for AsyncSleep {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut shared_state = self
.shared_state
.lock()
.expect("Failed to lock the share state");
if shared_state.completed {
Poll::Ready(())
} else {
shared_state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
The rust async executor only runs poll()
function on first .await
call. After getting Poll::Pending
, the Rust async executor will only re-run the poll()
function when std::task::Waker::wake()
been invoked.
In the example above, we spawned a thread to invoke the std::task::Waker::wake()
when desired sleep time expired.
Summary
To consume Rust async crates, you need to choose a Rust async executor, decorate the function with the async
keyword, and use .await
to resolve the future objects.
To provide a Rust async crate, if you can pin to a specific Rust executor, the Rust executor helper could greatly simplify your work. Otherwise, be sure to implement the Future
trait carefully.