Concurrency With Async
In this section, we will apply async to some of the same concurrency challenges we tackled with threads in chapter 16. Since we already talked about a lot of the key ideas there, in this section we will focus on what is different between threads and futures.
In many cases, the APIs for working with concurrency using async are very similar to those for using threads. In other cases, they end up being shaped fairly differently. Even when the APIs look similar, they often have different behavior and they nearly always have different performance characteristics.
Counting
The first task we tackled in Chapter 16 was counting up on two separate threads.
Let’s do the same using async. The trpl
crate supplies a spawn_task
function
which looks very similar to the thread::spawn
API, and a sleep
function
which is an async version of the thread::sleep
API. We can use these together
to implement the same counting example as with threads.
To start, we will set up our main
function with trpl::block_on
:
use std::time::Duration; fn main() { trpl::block_on(async { trpl::spawn_task(async { for i in 1..10 { println!("hi number {i} from the first task!"); trpl::sleep(Duration::from_millis(1)).await; } }); for i in 1..5 { println!("hi number {i} from the second task!"); trpl::sleep(Duration::from_millis(1)).await; } }); }
Note: From this point forward in the chapter, every example will include this exact same code, so we will often skip it just like we do with
main
. Don’t forget to include it in your own code!
Then we can write two loops within that block, each with a trpl::sleep
call in
them. Similar to the threading example, we put one loop in the body of a
trpl::spawn_task
, the same way we did with thread::spawn
, and the other in a
top-level for
loop. Notice that we also need to add a .await
after the
sleep
calls.
use std::time::Duration; fn main() { trpl::block_on(async { trpl::spawn_task(async { for i in 1..10 { println!("hi number {i} from the first task!"); trpl::sleep(Duration::from_millis(1)).await; } }); for i in 1..5 { println!("hi number {i} from the second task!"); trpl::sleep(Duration::from_millis(1)).await; } }); }
Putting that all together, we end up with the code in Listing 17-TODO:
use std::time::Duration; fn main() { trpl::block_on(async { trpl::spawn_task(async { for i in 1..10 { println!("hi number {i} from the first task!"); trpl::sleep(Duration::from_millis(1)).await; } }); for i in 1..5 { println!("hi number {i} from the second task!"); trpl::sleep(Duration::from_millis(1)).await; } }); }
This does something very similar to what the thread-based implementation did, as we can see from the output when we run it. (As with the threading example, you may see a different order in your own terminal output when you run this.)
hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
This stops as soon as the for loop in the body of the main async block finishes,
because the task spawned by spawn_task
is shut down when the main function
ends—just like threads are. Thus, if you want to run all the way to the
completion of the task, you will need to use a join handle to wait for the first
task to complete. With threads, we used the join
method to “block” until the
thread was done running. Here, we can use await
to do the same thing:
use std::time::Duration; fn main() { trpl::block_on(async { let handle = trpl::spawn_task(async { for i in 1..10 { println!("hi number {i} from the first task!"); trpl::sleep(Duration::from_millis(1)).await; } }); for i in 1..5 { println!("hi number {i} from the second task!"); trpl::sleep(Duration::from_millis(1)).await; } handle.await; }); }
Now the output again looks like what we saw in the threading example.
hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!
So far, it looks like async and threads basically give us the same basic
behavior. However, there are a few important differences already. One was using
.await
instead of calling join
on the join handle. Another is that we needed
to await both sleep
calls. Most importantly, though, we did not need to spawn
another operating system thread to do this. We were able to get concurrency for
just the cost of a task, which has much faster startup time and uses much less
memory than an OS thread.
What is more, we actually do not need the spawn_task
call at all to get
concurrency here. Remember that each async block compiles to an anonymous
future. That means we can put each of these two loops in an async block and then
ask the runtime to run them both to completion using trpl::join
:
use std::time::Duration; fn main() { trpl::block_on(async { let fut1 = async { for i in 1..10 { println!("hi number {i} from the first task!"); trpl::sleep(Duration::from_millis(1)).await; } }; let fut2 = async { for i in 1..5 { println!("hi number {i} from the second task!"); trpl::sleep(Duration::from_millis(1)).await; } }; trpl::join(fut1, fut2).await; }); }
When we run this, we see both futures run to completion:
hi number 1 from the first task!
hi number 1 from the second task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!
Here, you will see the exact same order every time, which is very different from
what we saw with threads. That is because the trpl::join
function is fair,
meaning it checks both futures equally, rather than letting one race ahead. With
threads, the operating system decides which thread to check, and that is
ultimately out of our control. With an async runtime, the runtime itself decides
which future to check, so it has the final say. In practice, the details get
complicated because an async runtime might use operating system threads under
the hood as part of how it manages concurrency, but a runtime can still choose
to guarantee fairness even so. However, runtimes do not have to guarantee
fairness for any given operation, and even within a given runtime, different
APIs sometimes exist to let you choose whether fairness is something you care
about as a caller.
Try some of these different variations on awaiting the futures and see what they do:
- Remove the async block from around either or both of the loops.
- Await each async block immediately after defining it.
- Wrap only the first loop in an async block, and await the resulting future after the body of second loop.
For an extra challenge, see if you can figure out what the output will be in each case before running the code!
Futures, Tasks, and Threads
Message Passing
Sharing data between futures will look familiar. We can again use async versions
of Rust’s types for message-passing. Instead of std::sync:mpsc::channel
, we
will use a tprl::channel
, for example.
The Receiver::recv()
method in the std
channel blocks until it receives a
message. The trpl::Receiver::recv()
method, by contrast, is an async
function. Instead of blocking, it sleeps until a message is received or the send
side of the channel closes. One other difference with this particular recv()
implementation is that it returns an Option
of the type sent over the channel
instead of a Result
.
We can start by introducing an async version of the channel
use std::time::Duration; fn main() { trpl::block_on(async { let (tx, mut rx) = trpl::channel(); let tx_fut = async { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }; let rx_fut = async { while let Some(value) = rx.recv().await { println!("received '{value}'"); } }; trpl::join(tx_fut, rx_fut).await; }); }
If we run this, though, it never stops! You will need to shut it down using
ctrl-c. We can see that tx
sends all the
messages,and rx
receives and prints them, but we never see the “Done!”
message, and the program never stops running. That’s because of the combination
of the while let
loop and the trpl::join
call:
use std::time::Duration; fn main() { trpl::block_on(async { let (tx, mut rx) = trpl::channel(); let tx_fut = async { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }; let rx_fut = async { while let Some(value) = rx.recv().await { println!("received '{value}'"); } }; trpl::join(tx_fut, rx_fut).await; }); }
Let’s consider the way this loop works:
- The
trpl::join
future only completes once both futures passed to it have completed. - The
tx
future completes after sending the second message. - The
rx
future will not complete until thewhile let
loop ends, though. - The
while let
loop will not end untilrx.recv().await
producesNone
. - The
rx.recv().await
will only returnNone
once the other end of the channel is closed. - The channel will only close if we call
rx.close()
or when the sender side,tx
, is dropped. - We do not call
rx.close()
anywhere, andtx
will not be dropped until the function exits. - The function cannot exit because it is blocked on
trpl::join
completing, which takes us back to the top of the list!
To solve this, then, we need to make sure the channel gets closed so that
trpl::join
will complete. We could manually close rx
somewhere by calling
rx.close()
, but that does not make much sense in this case. The idea is that
rx
should keep listening until tx
is done sending. Stopping after handling
some arbitrary number of messages would make the program shut down, but it would
mean we could miss messages if the sending side changed. Given that we cannot
use rx.close()
, we need to make sure that tx
gets dropped before the end
of the function.
Right now, the async block only borrows tx
. We can confirm this by adding
another async block which uses tx
, and using trpl::join3
to wait for all
three futures to complete:
use std::time::Duration; fn main() { trpl::block_on(async { let (tx, mut rx) = trpl::channel(); let tx_fut = async { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }; let rx_fut = async { while let Some(received) = rx.recv().await { println!("Got: {received}"); } }; let tx_fut2 = async { let vals = vec![ String::from("more"), String::from("messages"), String::from("for"), String::from("you"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }; trpl::join3(tx_fut, tx_fut2, rx_fut).await; }); }
Now both blocks borrow tx
, so they are both able to use it to send messages,
which rx
can then receive. When we run that code, we see the extra output from
the new async
block, and the message it sends being received by the
rx.recv()
:
Got: hi
Got: more
Got: from
Got: messages
Got: the
Got: for
Got: future
Got: you
As before, we also see that the program does not shut down on its own and
requires a ctrl-c. Now that we have seen how
async
blocks borrow the items they reference from their outer scope, we can go
ahead and remove the extra block we just added, and switch back to using
trpl::join
instead of trpl::join3
.
This little exploration makes the original issue much clearer: it is ultimately
about ownership. We need to move tx
into the async block so that once that
block ends, tx
will be dropped.
In Chapter 13, we learned how to use the move
keyword with closures, and in
Chapter 16, we saw that we often need to use closures marked with move
when
working with threads. As we have discovered, the same dynamics apply to async
blocks—so the move
keyword also works with async blocks, allowing them to take
ownership of the data they reference.
Remember, any time you write a future, a runtime is ultimately responsible for executing it. That means that an async block might outlive the function where you write it, the same way a closure can.
We can do that by making the first async block an async move
block.
use std::time::Duration; fn main() { trpl::block_on(async { let (tx, mut rx) = trpl::channel(); let tx_fut = async move { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }; let rx_fut = async { while let Some(value) = rx.recv().await { eprintln!("received '{value}'"); } }; trpl::join(tx_fut, rx_fut).await; }); }
The result is Listing 17-TODO, and when we run this version of the code, it shuts down gracefully after the last message is sent.
use std::time::Duration; fn main() { trpl::block_on(async { let (tx, mut rx) = trpl::channel(); let tx_fut = async move { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }; let rx_fut = async { while let Some(value) = rx.recv().await { eprintln!("received '{value}'"); } }; trpl::join(tx_fut, rx_fut).await; }); }
Multiple Producers with Async
This async channel is also a multiple-producer channel, so we can call clone
on tx
if we want to send messages from multiple futures. For example, we can
make the code from Listing 17-TODO work by cloning the tx
before moving it
into the first async block, moving the original tx
into the second async
block, and switchign back to join3
.
use std::time::Duration; fn main() { trpl::block_on(async { let (tx, mut rx) = trpl::channel(); let tx1 = tx.clone(); let tx1_fut = async move { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx1.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }; let rx_fut = async { while let Some(value) = rx.recv().await { println!("received '{value}'"); } }; let tx_fut = async move { let vals = vec![ String::from("more"), String::from("messages"), String::from("for"), String::from("you"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }; trpl::join3(tx1_fut, tx_fut, rx_fut).await; }); }
Both of these blocks need to be async move
blocks, or else we will end up back
in the same infinite loop we started out in.
The async
keyword does not yet work with closures directly. That is, there is
no direct equivalent to async fn
for anonymous functions. As a result, you
cannot write code like these function calls:
example_1(async || { ... });
example_2(async move || { ... });
However, since async blocks themselves can be marked with move
, this ends up
not being a problem. Remember that async
blocks compile to anonymous futures.
That means you can write calls like this instead:
example_1(|| async { ... });
example_2(|| async move { ... });
These closures now return anonymous futures, meaning they work basically the same way that an async function does.