What is pv?

pv, likely for pipe view, is a neat little Linux command that let’s us monitor the amount of data that goes through a pipe.

❯ pv ~/Pictures/some_picture.jpg > some_picture.jpg
2.90MiB 0:00:00 [ 157MiB/s] [===========================================>] 100%

❯ pv ~/Pictures/some_picture.jpg | ssh remote.machine "cat > some_picture.jpg"
2.90MiB 0:00:03 [ 788KiB/s] [===========================================>] 100%

pv moves data from its standard input to its standard output (or in this case from a file) while measuring how much data is moved and how fast.
It can sometimes be really fast. A quick test to see how fast it can go is to use the yes command to generate input and then output the data to /dev/null. On my laptop I can get 5-6GiB/s, but this throughput will vary widly from machine to machine, and the bottleneck could be either yes, pv or the way linux is configured on my machine.

❯ yes | pv > /dev/null
35.4GiB 0:00:06 [5.60GiB/s] [      <=>                                   ]

How does pv work?

Copying some input data to the output while keeping information to report seems somewhat straight forward at first, so we will figure out the details as we go. 🤞
But what about the report? How can pv output anything to my terminal if its output is being piped to the next command/a file? According to the man page it uses standard error!
The report that pv outputs purposefully spans only 1 line. This way, the report can be updated in place by using the carriage return character '\r' without using a linefeed character '\n' to go back to the beginning of the line and overwrite what was previously written. Smart! Only downside is that it needs to write enough the second time around to fully overwrite what was previously written.

A naive implementation

Let’s implement pv in Rust for a bit of fun and to learn along the way!

In this first iteration we will setup a buffer. We continuously read from standard input into that buffer then write from that buffer to standard output.

// main.rs
use std::io;
use std::io::prelude::*;

// This buffer size seems optimal on my machine
const BUFFER_SIZE: usize = 8 * 1024;

fn main() -> io::Result<()> {
    // Get handles for standard input and standard output
    let mut stdin = io::stdin();
    let mut stdout = io::stdout();

    // Setup a buffer to transfer data from stdin to stdout
    let mut buffer = [0; BUFFER_SIZE];

    loop {
        // Read data from the standard input into the buffer
        let bytes = stdin.read(&mut buffer)?;
        if bytes == 0 {
            // No more data to read, return successfully
            return Ok(());
        }

        // Write the data we've just read from the buffer to standard output
        //
        // Note: we use `write_all` instead of `write` as it could take several
        // writes to finish depending on how busy the recipient is
        stdout.write_all(&buffer[..bytes])?;
    }
}

Let’s try it:

❯ echo "Hello" | cargo run --release --quiet --bin rpv
Hello

Great! 🎉
We have data passing through. Though, it’s a bit useless at the moment 😅

Let’s add some reporting. For similar functionality to pv we need to keep track of how many bytes have been transferred, as well as the time elapsed since the beginning. From that we can get the average throughput since the start of the program.

How often should we report progress? If we do it on every transfer of one buffer we are likely to slow down the transfer as well as making the report unreadable. We will therefore only report once every second.

// main.rs
/* ... */
use std::time::{Duration, Instant};

/* ... */
const REPORT_PERIOD: f64 = 1.0;

/* fn main() -> ... */
    // Keep track of how many bytes are being transferred as we go
    let mut bytes_so_far = 0;

    // Start timer to figure out the elapsed time
    let start_time = Instant::now();
    let mut next_report_time = start_time;

    /* loop ... */
        // Update what we have transferred so far
        bytes_so_far += bytes;

        // Report if it is time to do so
        let now = Instant::now();
        if now >= next_report_time {
            next_report_time = now + Duration::from_secs_f64(REPORT_PERIOD);
            report(bytes_so_far, start_time.elapsed());
        }

To print the report we will use standard error and the '\r' trick discussed. To make reporting nicer we will make use of the great byte-unit crate to properly format the byte count and throughput in a human readable way.

fn report(byte_count: usize, elapsed: Duration) {
    // Use the byte_unit crate to do all the unit conversions and display logic
    use byte_unit::{Byte, ByteUnit};

    let adjusted_byte_count = Byte::from_unit(byte_count as f64, ByteUnit::B)
        .unwrap()
        .get_appropriate_unit(true);

    // Get the average throughput since the start
    let throughput = byte_count as f64 / elapsed.as_secs_f64();
    let adjusted_throughput = Byte::from_unit(throughput, ByteUnit::B)
        .unwrap()
        .get_appropriate_unit(true);

    // Print report to standard error
    // We use some padding to make the number of characters outputted stable so
    // that the carriage return trick properly overwrites all previous output
    eprint!(
        "{:>10} | {:>10}/s | {:>10}\r",
        adjusted_byte_count.to_string(),
        adjusted_throughput.to_string(),
        // Debug for Duration doesn't pad properly, so format beforehand
        format!("{:.1?}", elapsed)
    );
}

So how well does this run?

❯ yes | cargo run --release --quiet --bin rpv > /dev/null
  7.54 GiB |   4.32 GiB/s |      11.0s

Not bad for a first iteration. I did have to tweak the buffer size in order to get the best throughput possible on my machine, but we’re getting close to the throughput we saw with pv.

How is pv so fast?

In order to find out how pv is so fast, we should have a look at what kind of IO it does in its main loop. Then we can compare that to what we are doing.
strace is an amazing tool to get exactly this type of information. You run strace command and it prints all of the system calls the command does, which is what IO is: a bunch of system calls to get the Linux kernel to do some work for you.

In our case, it is slightly trickier to get this info since pv already makes heavy use of the standard output (and standard error) so it’s not as straight forward to call strace on pv. Fortunately we can use strace by providing it the PID of the program we are interested in.

So we start out command in one terminal:

❯ yes | pv > /dev/null

And with a bit of bash magic we call strace on that pv (assuming there’s only one instance of pv running):

❯ strace -p $(ps aux | grep "pv$" | tr -s ' ' | cut -d' ' -f2)
strace: Process 176755 attached
select(1, [0], [], NULL, {tv_sec=0, tv_usec=90000}) = 1 (in [0], left {tv_sec=0, tv_usec=89999})
splice(0, NULL, 1, NULL, 131072, SPLICE_F_MORE) = 65536
select(1, [0], [], NULL, {tv_sec=0, tv_usec=90000}) = 1 (in [0], left {tv_sec=0, tv_usec=89999})
splice(0, NULL, 1, NULL, 131072, SPLICE_F_MORE) = 65536
select(1, [0], [], NULL, {tv_sec=0, tv_usec=90000}) = 1 (in [0], left {tv_sec=0, tv_usec=89999})
...
<detached ...>

If we compare it to our version (rpv):

❯ strace -p $(ps aux | grep "rpv$" | tr -s ' ' | cut -d' ' -f2)
strace: Process 183150 attached
read(0, "y\ny\ny\ny\ny\ny\ny\ny\ny\ny\ny\ny\ny\ny\ny\ny\n"..., 8192) = 8192
write(1, "y\ny\ny\ny\ny\ny\ny\ny\ny\ny\ny\ny\ny\ny\ny\ny\n"..., 8192) = 8192
read(0, "y\ny\ny\ny\ny\ny\ny\ny\ny\ny\ny\ny\ny\ny\ny\ny\n"..., 8192) = 8192
write(1, "y\ny\ny\ny\ny\ny\ny\ny\ny\ny\ny\ny\ny\ny\ny\ny\n"..., 8192) = 8192
read(0, "y\ny\ny\ny\ny\ny\ny\ny\ny\ny\ny\ny\ny\ny\ny\ny\n"..., 8192) = 8192
...
<detached ...>

It looks like pv uses a different strategy for moving data around. The relevant system call it uses is splice, for which the man page tells us:

splice() moves data between two file descriptors without copying between kernel address space and user address space.

So that’s the trick! We can make the data go from one file to another without copying the data via user space (our program).
pv seems to be moving data with a chunk size of 65536 bytes (even though it is requesting 131072). This is a bit bigger than our buffer of 8192 bytes, which means that pv uses fewer system calls than we do to move the same amount of data as well as not needing to copy it.

The select system call looks to be waiting for standard input (file descriptor 0) to have some input available. This is likely to prevent spamming splice system calls when no data is available and no progress can be made anyway. But as far as I can tell, when using splice with a pipe on the input, it patiently waits for data to become available.

Using splice ourselves

The simplest way for us to call the splice system call is to use the nix crate.
Changing our code to use nix::fcntl::splice instead of read/write is fairly straight forward:

// main.rs
use nix::fcntl::{splice, SpliceFFlags};
use std::os::unix::io::AsRawFd;
/* ... */

const CHUNK_SIZE: usize = 64 * 1024;
/* ... */

/* fn main() -> ... */

    /* loop ... */
        // Move data from stdin to stdout in kernel space
        let bytes = splice(
            stdin.as_raw_fd(),
            None,
            stdout.as_raw_fd(),
            None,
            CHUNK_SIZE,
            SpliceFFlags::SPLICE_F_MOVE | SpliceFFlags::SPLICE_F_MORE,
        )?;

        /* ... */

Let’s run it:

❯ yes | cargo run --release --quiet --bin rpv > /dev/null
 96.28 GiB |   7.41 GiB/s |      13.0s

Success! 🎉
It even looks a bit faster than pv, maybe because we’ve omitted these select system calls?

❯ strace -p $(ps aux | grep "rpv$" | tr -s ' ' | cut -d' ' -f2)
strace: Process 221492 attached
splice(0, NULL, 1, NULL, 65536, SPLICE_F_MOVE|SPLICE_F_MORE) = 65536
splice(0, NULL, 1, NULL, 65536, SPLICE_F_MOVE|SPLICE_F_MORE) = 65536
splice(0, NULL, 1, NULL, 65536, SPLICE_F_MOVE|SPLICE_F_MORE) = 65536
splice(0, NULL, 1, NULL, 65536, SPLICE_F_MOVE|SPLICE_F_MORE) = 65536
...
<detached ...>

Under 75 lines

We did it, we have a basic version of pv in Rust in under 75 lines.

use nix::fcntl::{splice, SpliceFFlags};
use std::io;
use std::os::unix::io::AsRawFd;
use std::time::{Duration, Instant};

const CHUNK_SIZE: usize = 64 * 1024;
const REPORT_PERIOD: f64 = 1.0;

fn main() -> io::Result<()> {
    // Get handles for standard input and standard output
    let stdin = io::stdin();
    let stdout = io::stdout();

    // Keep track of how many bytes are being transferred as we go
    let mut bytes_so_far = 0;

    // Start timer to figure out the elapsed time
    let start_time = Instant::now();
    let mut next_report_time = start_time;

    loop {
        // Move data from stdin to stdout in kernel space
        let bytes = splice(
            stdin.as_raw_fd(),
            None,
            stdout.as_raw_fd(),
            None,
            CHUNK_SIZE,
            SpliceFFlags::SPLICE_F_MOVE | SpliceFFlags::SPLICE_F_MORE,
        )?;
        if bytes == 0 {
            // No more data to read, return successfully after reporting one
            // last time
            report(bytes_so_far, start_time.elapsed());
            return Ok(());
        }

        // Update what we have transferred so far
        bytes_so_far += bytes;

        // Report if it is time to do so
        let now = Instant::now();
        if now >= next_report_time {
            next_report_time = now + Duration::from_secs_f64(REPORT_PERIOD);
            report(bytes_so_far, start_time.elapsed());
        }
    }
}

fn report(byte_count: usize, elapsed: Duration) {
    // Use the byte_unit crate to do all the unit conversions and display logic
    use byte_unit::{Byte, ByteUnit};

    let adjusted_byte_count = Byte::from_unit(byte_count as f64, ByteUnit::B)
        .unwrap()
        .get_appropriate_unit(true);

    // Get the average throughput since the start
    let throughput = byte_count as f64 / elapsed.as_secs_f64();
    let adjusted_throughput = Byte::from_unit(throughput, ByteUnit::B)
        .unwrap()
        .get_appropriate_unit(true);

    // Print report to standard error
    // We use some padding to make the number of characters outputted stable so
    // that the carriage return trick properly overwrites all previous output
    eprint!(
        "{:>10} | {:>10}/s | {:>10}\r",
        adjusted_byte_count.to_string(),
        adjusted_throughput.to_string(),
        // Debug for Duration doesn't pad properly, so format beforehand
        format!("{:.1?}", elapsed)
    );
}

A more full-fledged clone of pv in Rust would have nicer error messages, for example when standard input is the terminal rather than a pipe. It could possibly also support file inputs. Hide the cursor in the terminal while updating the report. And have a colourful output! 🤩

I hope you learned something just like I did while writing this, see you around!