Concurrency With Async
In this section, we will apply async to some of the same concurrency challenges we tackled with threads in chapter 16. Since we already talked about a lot of the key ideas there, in this section we will focus on what is different between threads and futures.
In many cases, the APIs for working with concurrency using async are very similar to those for using threads. In other cases, they end up being shaped fairly differently. Even when the APIs look similar, they often have different behavior and they nearly always have different performance characteristics.
Counting
The first task we tackled in Chapter 16 was counting up on two separate threads.
Let’s do the same using async. The trpl crate supplies a spawn_task function
which looks very similar to the thread::spawn API, and a sleep function
which is an async version of the thread::sleep API. We can use these together
to implement the same counting example as with threads.
To start, we will set up our main function with trpl::block_on:
use std::time::Duration; fn main() { trpl::block_on(async { trpl::spawn_task(async { for i in 1..10 { println!("hi number {i} from the first task!"); trpl::sleep(Duration::from_millis(1)).await; } }); for i in 1..5 { println!("hi number {i} from the second task!"); trpl::sleep(Duration::from_millis(1)).await; } }); }
Note: From this point forward in the chapter, every example will include this exact same code, so we will often skip it just like we do with
main. Don’t forget to include it in your own code!
Then we can write two loops within that block, each with a trpl::sleep call in
them. Similar to the threading example, we put one loop in the body of a
trpl::spawn_task, the same way we did with thread::spawn, and the other in a
top-level for loop. Notice that we also need to add a .await after the
sleep calls.
use std::time::Duration; fn main() { trpl::block_on(async { trpl::spawn_task(async { for i in 1..10 { println!("hi number {i} from the first task!"); trpl::sleep(Duration::from_millis(1)).await; } }); for i in 1..5 { println!("hi number {i} from the second task!"); trpl::sleep(Duration::from_millis(1)).await; } }); }
Putting that all together, we end up with the code in Listing 17-TODO:
use std::time::Duration; fn main() { trpl::block_on(async { trpl::spawn_task(async { for i in 1..10 { println!("hi number {i} from the first task!"); trpl::sleep(Duration::from_millis(1)).await; } }); for i in 1..5 { println!("hi number {i} from the second task!"); trpl::sleep(Duration::from_millis(1)).await; } }); }
This does something very similar to what the thread-based implementation did, as we can see from the output when we run it. (As with the threading example, you may see a different order in your own terminal output when you run this.)
hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
This stops as soon as the for loop in the body of the main async block finishes,
because the task spawned by spawn_task is shut down when the main function
ends—just like threads are. Thus, if you want to run all the way to the
completion of the task, you will need to use a join handle to wait for the first
task to complete. With threads, we used the join method to “block” until the
thread was done running. Here, we can use await to do the same thing:
use std::time::Duration; fn main() { trpl::block_on(async { let handle = trpl::spawn_task(async { for i in 1..10 { println!("hi number {i} from the first task!"); trpl::sleep(Duration::from_millis(1)).await; } }); for i in 1..5 { println!("hi number {i} from the second task!"); trpl::sleep(Duration::from_millis(1)).await; } handle.await; }); }
Now the output again looks like what we saw in the threading example.
hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!
So far, it looks like async and threads basically give us the same basic
behavior. However, there are a few important differences already. One was using
.await instead of calling join on the join handle. Another is that we needed
to await both sleep calls. Most importantly, though, we did not need to spawn
another operating system thread to do this. We were able to get concurrency for
just the cost of a task, which has much faster startup time and uses much less
memory than an OS thread.
What is more, we actually do not need the spawn_task call at all to get
concurrency here. Remember that each async block compiles to an anonymous
future. That means we can put each of these two loops in an async block and then
ask the runtime to run them both to completion using trpl::join:
use std::time::Duration; fn main() { trpl::block_on(async { let fut1 = async { for i in 1..10 { println!("hi number {i} from the first task!"); trpl::sleep(Duration::from_millis(1)).await; } }; let fut2 = async { for i in 1..5 { println!("hi number {i} from the second task!"); trpl::sleep(Duration::from_millis(1)).await; } }; trpl::join(fut1, fut2).await; }); }
When we run this, we see both futures run to completion:
hi number 1 from the first task!
hi number 1 from the second task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!
Here, you will see the exact same order every time, which is very different from
what we saw with threads. That is because the trpl::join function is fair,
meaning it checks both futures equally, rather than letting one race ahead. With
threads, the operating system decides which thread to check, and that is
ultimately out of our control. With an async runtime, the runtime itself decides
which future to check, so it has the final say. In practice, the details get
complicated because an async runtime might use operating system threads under
the hood as part of how it manages concurrency, but a runtime can still choose
to guarantee fairness even so. However, runtimes do not have to guarantee
fairness for any given operation, and even within a given runtime, different
APIs sometimes exist to let you choose whether fairness is something you care
about as a caller.
Try some of these different variations on awaiting the futures and see what they do:
- Remove the async block from around either or both of the loops.
- Await each async block immediately after defining it.
- Wrap only the first loop in an async block, and await the resulting future after the body of second loop.
For an extra challenge, see if you can figure out what the output will be in each case before running the code!
Futures, Tasks, and Threads
Message Passing
Sharing data between futures will look familiar. We can again use async versions
of Rust’s types for message-passing. Instead of std::sync:mpsc::channel, we
will use a tprl::channel, for example.
The Receiver::recv() method in the std channel blocks until it receives a
message. The trpl::Receiver::recv() method, by contrast, is an async
function. Instead of blocking, it sleeps until a message is received or the send
side of the channel closes. One other difference with this particular recv()
implementation is that it returns an Option of the type sent over the channel
instead of a Result.
We can start by introducing an async version of the channel
use std::time::Duration; fn main() { trpl::block_on(async { let (tx, mut rx) = trpl::channel(); let tx_fut = async { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }; let rx_fut = async { while let Some(value) = rx.recv().await { println!("received '{value}'"); } }; trpl::join(tx_fut, rx_fut).await; }); }
If we run this, though, it never stops! You will need to shut it down using
ctrl-c. We can see that tx sends all the
messages,and rx receives and prints them, but we never see the “Done!”
message, and the program never stops running. That’s because of the combination
of the while let loop and the trpl::join call:
use std::time::Duration; fn main() { trpl::block_on(async { let (tx, mut rx) = trpl::channel(); let tx_fut = async { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }; let rx_fut = async { while let Some(value) = rx.recv().await { println!("received '{value}'"); } }; trpl::join(tx_fut, rx_fut).await; }); }
Let’s consider the way this loop works:
- The
trpl::joinfuture only completes once both futures passed to it have completed. - The
txfuture completes after sending the second message. - The
rxfuture will not complete until thewhile letloop ends, though. - The
while letloop will not end untilrx.recv().awaitproducesNone. - The
rx.recv().awaitwill only returnNoneonce the other end of the channel is closed. - The channel will only close if we call
rx.close()or when the sender side,tx, is dropped. - We do not call
rx.close()anywhere, andtxwill not be dropped until the function exits. - The function cannot exit because it is blocked on
trpl::joincompleting, which takes us back to the top of the list!
To solve this, then, we need to make sure the channel gets closed so that
trpl::join will complete. We could manually close rx somewhere by calling
rx.close(), but that does not make much sense in this case. The idea is that
rx should keep listening until tx is done sending. Stopping after handling
some arbitrary number of messages would make the program shut down, but it would
mean we could miss messages if the sending side changed. Given that we cannot
use rx.close(), we need to make sure that tx gets dropped before the end
of the function.
Right now, the async block only borrows tx. We can confirm this by adding
another async block which uses tx, and using trpl::join3 to wait for all
three futures to complete:
use std::time::Duration; fn main() { trpl::block_on(async { let (tx, mut rx) = trpl::channel(); let tx_fut = async { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }; let rx_fut = async { while let Some(received) = rx.recv().await { println!("Got: {received}"); } }; let tx_fut2 = async { let vals = vec![ String::from("more"), String::from("messages"), String::from("for"), String::from("you"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }; trpl::join3(tx_fut, tx_fut2, rx_fut).await; }); }
Now both blocks borrow tx, so they are both able to use it to send messages,
which rx can then receive. When we run that code, we see the extra output from
the new async block, and the message it sends being received by the
rx.recv():
Got: hi
Got: more
Got: from
Got: messages
Got: the
Got: for
Got: future
Got: you
As before, we also see that the program does not shut down on its own and
requires a ctrl-c. Now that we have seen how
async blocks borrow the items they reference from their outer scope, we can go
ahead and remove the extra block we just added, and switch back to using
trpl::join instead of trpl::join3.
This little exploration makes the original issue much clearer: it is ultimately
about ownership. We need to move tx into the async block so that once that
block ends, tx will be dropped.
In Chapter 13, we learned how to use the move keyword with closures, and in
Chapter 16, we saw that we often need to use closures marked with move when
working with threads. As we have discovered, the same dynamics apply to async
blocks—so the move keyword also works with async blocks, allowing them to take
ownership of the data they reference.
Remember, any time you write a future, a runtime is ultimately responsible for executing it. That means that an async block might outlive the function where you write it, the same way a closure can.
We can do that by making the first async block an async move block.
use std::time::Duration; fn main() { trpl::block_on(async { let (tx, mut rx) = trpl::channel(); let tx_fut = async move { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }; let rx_fut = async { while let Some(value) = rx.recv().await { eprintln!("received '{value}'"); } }; trpl::join(tx_fut, rx_fut).await; }); }
The result is Listing 17-TODO, and when we run this version of the code, it shuts down gracefully after the last message is sent.
use std::time::Duration; fn main() { trpl::block_on(async { let (tx, mut rx) = trpl::channel(); let tx_fut = async move { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }; let rx_fut = async { while let Some(value) = rx.recv().await { eprintln!("received '{value}'"); } }; trpl::join(tx_fut, rx_fut).await; }); }
Multiple Producers with Async
This async channel is also a multiple-producer channel, so we can call clone
on tx if we want to send messages from multiple futures. For example, we can
make the code from Listing 17-TODO work by cloning the tx before moving it
into the first async block, moving the original tx into the second async
block, and switchign back to join3.
use std::time::Duration; fn main() { trpl::block_on(async { let (tx, mut rx) = trpl::channel(); let tx1 = tx.clone(); let tx1_fut = async move { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx1.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }; let rx_fut = async { while let Some(value) = rx.recv().await { println!("received '{value}'"); } }; let tx_fut = async move { let vals = vec![ String::from("more"), String::from("messages"), String::from("for"), String::from("you"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }; trpl::join3(tx1_fut, tx_fut, rx_fut).await; }); }
Both of these blocks need to be async move blocks, or else we will end up back
in the same infinite loop we started out in.
The async keyword does not yet work with closures directly. That is, there is
no direct equivalent to async fn for anonymous functions. As a result, you
cannot write code like these function calls:
example_1(async || { ... });
example_2(async move || { ... });
However, since async blocks themselves can be marked with move, this ends up
not being a problem. Remember that async blocks compile to anonymous futures.
That means you can write calls like this instead:
example_1(|| async { ... });
example_2(|| async move { ... });
These closures now return anonymous futures, meaning they work basically the same way that an async function does.