Cancellable Async I/O in golang with muesli/cancelreader

April 12, 2023

In this crazy world of GUIs, the humble terminal may seem rather primitive. It may be natural to assume its quite simple in function, being a simple matter of "command in, execution out".

I used to think this way - that is, until I had to allocate my first pseudoterminal!

At my current job, I work a lot with terminals. In particular, with our flagship product runme, we often have to "wrap" command executions done in terminals, piping the stdin and stdout through a wrapper command.

A very basic version of our command runner could be given as follows (in golang, the language in which our command runner is written):

func runCommand() {
	cmd := exec.Command("bash", "-c", "echo -n 'Enter name: '; read name; echo Hello, $name!")

	cmd.Stdin = os.Stdin
	cmd.Stdout = os.Stdout

	cmd.Run()
}

This runs a basic script which prompts for the users' name and reports it back. We have piped the stdin and stdout from the "outer" shell into the inner one. This runs exactly as expected:

However, our command runner is designed to take input remotely, and so in practice we do not directly pipe from stdin. Rather, we pipe from an input stream, which has been serialized from some remote method.

To demonstrate this, let's suppose we want to do a transformation on stdin, changing all lower-case letters to upper-case. There are a few ways to do this, but for simplicity, I'll use io.Pipe as follows:

func runCommand() {
	cmd := exec.Command("bash", "-c", "echo -n 'Enter name: '; read name; echo Hello, $name!")

	stdin, stdinWriter := io.Pipe()
	cmd.Stdin = stdin

	cmd.Stdout = os.Stdout

	go func() {
		buf := make([]byte, 32*1024)

		for {
			n, _ := os.Stdin.Read(buf)

			if n > 0 {
				input := string(buf[:n])
				inputAllCaps := strings.ToUpper(input)

                _, err := stdinWriter.Write([]byte(inputAllCaps))
				if err != nil {
					return
				}
			}
		}
	}()

	cmd.Run()
}

The important part here is lines 15-18, which take the data from stdin, make it all caps, and then write it through an io.Pipe. This is done in a separate goroutine which, importantly, is not cancellable.

Running this, we almost get what we want:

However, we are hanging until the last input. What is going on here?

We can find it in the documentation for cmd.Stdin:

// If Stdin is an *os.File, the process's standard input is connected
// directly to that file.
//
// Otherwise, during the execution of the command a separate
// goroutine reads from Stdin and delivers that data to the command
// over a pipe. In this case, Wait does not complete until the goroutine
// stops copying, either because it has reached the end of Stdin
// (EOF or a read error) or because writing to the pipe returned an error.

The important part here is:

a separate goroutine reads from Stdin and delivers that data to the command over a pipe

In other words, there is a parallel goroutine blocking while reading our pipe. Since our pipe never closes, it's only after the final extra input that the command's pipe read stops blocking - at which point the goroutine safely exits, since the command has finished execution.

Fortunately, there is a simple solution here in the form of cmd.StdinPipe! All we need to do is replace this code:

stdin, stdinWriter := io.Pipe()
cmd.Stdin = stdin

With this code:

stdinWriter, _ := cmd.StdinPipe()

And everything runs beautifully, since the command's pipe is now closed on command exit.

There's one major hitch with this approach, though: reading from os.Stdin is non-cancellable. Why is this an issue? Well, let's try running our command twice in a row:

As you can see, the first run is fine. The second, however, has its first stdin read "eaten".

This strange behavior makes sense once you realize that two goroutines are reading from stdin simultaneously - one for each function call. The first goroutine will not exit until after it "realizes" this, which happens by trying to write to a closed pipe. This happens for the first stdin read of the second function call - and this is how it gets "eaten".

The way this typically gets solved is by hoisting the os.Stdin read to a master goroutine, and sending to a channel. Unlike an io.Reader, a go chan can be read asynchronously using select, so this looks like an attractive option.

However, this really only kicks the can down the road, since our module would take exclusive ownership of os.Stdin for the entire application lifetime. What if want to interface with a library that also needs access to os.Stdin, such as bubbletea or cobra?

Enter muesli/cancelreader. At a low level, this library implements async reading from stdin with platform-specific syscalls. At a high level, we can simply define a new "cancelreader" wrapping around os.Stdin as follows:

stdinReader, _ := cancelreader.NewReader(os.Stdin)

And we can cancel it like so:

stdinReader.Cancel()

All together (notice we are checking for the cancel error on read on lines 12-15):

func runCommand() {
	cmd := exec.Command("bash", "-c", "echo -n 'Enter name: '; read name; echo Hello, $name!")

	stdinWriter, _ := cmd.StdinPipe()
	stdinReader, _ := cancelreader.NewReader(os.Stdin)

	cmd.Stdout = os.Stdout

	go func() {
		buf := make([]byte, 32*1024)

		for {
			n, err := stdinReader.Read(buf)
			if err != nil {
				return
			}

			if n > 0 {
				input := string(buf[:n])
				inputAllCaps := strings.ToUpper(input)

				_, err := stdinWriter.Write([]byte(inputAllCaps))
				if err != nil {
					return
				}
			}
		}
	}()

	cmd.Run()

	stdinReader.Cancel()
}

And, like magic:

I felt really lucky to stumble upon this library, as it saved me a ton of headache. Hopefully I have saved you from the same!