What's The Time In Tokio?

Measuring the actual future execution time in Tokio


TL;DR - Use std::slice::Chunks when you'd like to measure a slightly more accurate execution time of each future in a big batch of futures.

Problem

Lately I've been working on a small project that I needed to measure how long it takes for a Future to be executed on a Tokio runtime. So I had the code below:

fn main() {
let now = std::time::Instant::now();
let socket = socket2::Socket::new(socket2::Domain::IPV4, socket2::Type::DGRAM, None).unwrap();
timeout(
Duration::from_secs(10),
TcpSocket::from_std_stream(socket.into())
.connect(("1.1.1.1".parse::<IpAddr>().unwrap(), 443).into()),
)
.await
.unwrap()
.unwrap();
println!("fut {} took {:?}", i, now.elapsed().as_millis());
}

Look about right? And it works too.

However when I needed to measure a batch of these kind of futures, I realised the time it takes grows as the batch size grows. So I posted something like this here and I had a feeling that the Runtime is adding the overhead to the result.

So when the batch size is small, say 10, it's all good. But when you do 100 futures in batch, the result grows almost linearly.

And this led me to thinking the earlier executed futures get the faster result as the Runtime needs to poll each future.

#[tokio::test]
async fn test_connect_tcp() {
let mut futs = vec![];
for i in 0..100 {
// number of here matters
futs.push(tokio::spawn(async move {
let now = std::time::Instant::now();
tokio::time::sleep(Duration::from_millis(100)).await; // manually adding a sleep to simulate but it doesn't seem to matter
let socket =
socket2::Socket::new(socket2::Domain::IPV4, socket2::Type::DGRAM, None).unwrap();
timeout(
Duration::from_secs(10),
TcpSocket::from_std_stream(socket.into())
.connect(("1.1.1.1".parse::<IpAddr>().unwrap(), 443).into()),
)
.await
.unwrap()
.unwrap();
println!("fut {} took {:?}", i, now.elapsed().as_millis());
}));
}
futures::stream::iter(futs)
.buffer_unordered(10)
.collect::<Vec<_>>()
.await;
}

In this post we are not going to talk about the details of how Tokio manages the tasks with mio or the underlying event loops. The problem we want to solve are:

  1. Is the Runtime adding overhead to the elapsed time?
  2. Can we fix that?

Solution

To solve 1, we need to make sure the we measure the time that a future gets polled rather than created. And for that, there are already so many great posts talking about wrapping a future and measure the execution time.

So I came up with something like this

use std::{pin::Pin, time::Duration};
use futures::Future;
use tokio::time::Instant;
pub struct TimedFuture<Fut: Future + Unpin> {
fut: Fut,
started_at: Option<Instant>,
}
impl<Fut: Future + Unpin> TimedFuture<Fut> {
pub fn new(fut: Fut, started_at: Option<Instant>) -> Self {
Self { fut, started_at }
}
}
impl<Fut: Future + Unpin> Future for TimedFuture<Fut> {
type Output = (Fut::Output, Duration);
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let Self { fut, started_at } = self.get_mut();
let started_at = started_at.get_or_insert_with(Instant::now);
let output = futures::ready!(Pin::new(fut).poll(cx));
let elapsed = started_at.elapsed();
std::task::Poll::Ready((output, elapsed))
}
}

And I thought that was it, though it didn't fix the problem - and we are not that lucky.

Only measuring the correct duration still doesnt gets us good result - the size of batch still matters as mentioned above.

So the time was measured fine and accurate when testing with only 1 or 2 futures, and not the question becomes how do we cut the batch into small peices, and here it comes - std::slice::Chunks

So this is the final working sample

#[tokio::test]
async fn test_connect_tcp() {
let mut futs = vec![];
for i in 0..100 {
// number of here matters
futs.push(tokio::spawn(async move {
let now = std::time::Instant::now();
tokio::time::sleep(Duration::from_millis(100)).await; // manually adding a sleep to simulate but it doesn't seem to matter
let socket =
socket2::Socket::new(socket2::Domain::IPV4, socket2::Type::DGRAM, None).unwrap();
timeout(
Duration::from_secs(10),
TcpSocket::from_std_stream(socket.into())
.connect(("1.1.1.1".parse::<IpAddr>().unwrap(), 443).into()),
)
.await
.unwrap()
.unwrap();
println!("fut {} took {:?}", i, now.elapsed().as_millis());
}));
}
for chunk in futs.chunks(10) {
futures::future::join_all(chunks).await;
}
}
All rights reserved
Except where otherwise noted, content on this page is copyrighted.