Blog Engineering Compose Readers and Writers in Golang applications
Published on: February 15, 2024
9 min read

Compose Readers and Writers in Golang applications

GitLab streams terabytes of Git data every hour using Golang abstractions of I/O implementations. Learn how to compose Readers and Writers in Golang apps.

code - cover

Every hour, GitLab transfers terabytes of Git data between a server and a client. It is hard or even impossible to handle this amount of traffic unless it is done efficiently in a streaming fashion. Git data is served by Gitaly (Git server), GitLab Shell (Git via SSH), and Workhorse (Git via HTTP(S)). These services are implemented using Go - the language that conveniently provides abstractions to efficiently deal with I/O operations.

Golang's io package provides Reader and Writer interfaces to abstract the functionality of I/O implementations into public interfaces.

Reader is the interface that wraps the basic Read method:

type Reader interface {
	Read(p []byte) (n int, err error)
}

Writer is the interface that wraps the basic Write method.

type Writer interface {
	Write(p []byte) (n int, err error)
}

For example, os package provides an implementation of reading a file. File type implements Reader and Writer interfaces by defining basic Read and Write functions.

In this blog post, you'll learn how to compose Readers and Writers in Golang applications.

First, let's read from a file and write its content to os.Stdout.

func main() {
	file, err := os.Open("data.txt")
	if err != nil {
		log.Fatal(err)
	}
	defer file.Close()

	p := make([]byte, 32 * 1024)
	for {
		n, err := file.Read(p)

		_, errW := os.Stdout.Write(p[:n])
		if errW != nil {
			log.Fatal(errW)
		}

		if err != nil {
			if errors.Is(err, io.EOF) {
				break
			}

			log.Fatal(err)
		}
	}
}

Each call of the Read function fills the buffer p with the content from the file, i.e. the file is being consumed in chunks (up to 32KB) instead of being fully loaded into the memory.

To simplify this widely used pattern, io package conveniently provides Copy function that allows passing content from any Reader to any Writer and also handles additional edge cases.

func main() {
	file, err := os.Open("data.txt")
	if err != nil {
		log.Fatal(err)
	}
	defer file.Close()

	if _, err := io.Copy(os.Stdout, file); err != nil {
		log.Fatal(err)
	}
}

Reader and Writer interfaces are used across the whole Golang ecosystem because they facilitate reading and writing content in a streaming fashion. Therefore, gluing together the Readers and Writers with the functions that expect these interfaces as arguments is a frequent problem to solve. Sometimes it's as straightforward as passing content from a Reader into a Writer, but sometimes the content written into a Writer must be represented as a Reader or the content from a reader must be sent into multiple Writers. Let's have a closer look into different use cases and the examples of solving these types of problems in the GitLab codebase.

Reader -> Writer

Problem

We need to pass content from a Reader into a Writer.

readers and writers - image 1

Solution

The problem can be solved by using io.Copy.

func Copy(dst Writer, src Reader) (written int64, err error)

Example

InfoRefs* Gitaly RPCs return a Reader and we want to stream its content to a user via HTTP response:

func handleGetInfoRefsWithGitaly(ctx context.Context, responseWriter *HttpResponseWriter, a *api.Response, rpc, gitProtocol, encoding string) error {
        ...
        infoRefsResponseReader, err := smarthttp.InfoRefsResponseReader(ctx, &a.Repository, rpc, gitConfigOptions(a), gitProtocol)
        ...
        if _, err = io.Copy(w, infoRefsResponseReader); err != nil {
            return err
        }
        ...
}

Reader -> Multiple Writers

Problem

We need to pass content from a Reader into multiple Writers.

readers and writers - image 3

Solution

The io package provides io.MultiWriter function that converts multiple Writers into a single one. When its Write function is called, the content is copied to all the Writers (implementation).

func MultiWriter(writers ...Writer) Writer

Example

Given we want to build md5, sha1, sha256 and sha512 hashes from the same content. Hash type is a Writer. Using io.MultiWriter, we define multiHash Writer. After the content is written to the multiHash, we calculate the hashes of all these functions in a single run.

The simplified version of the example is:

package main

import (
	"crypto/sha1"
	"crypto/sha256"
	"fmt"
	"io"
	"log"
)

func main() {
	s1 := sha1.New()
	s256 := sha256.New()

	w := io.MultiWriter(s1, s256)
	if _, err := w.Write([]byte("content")); err != nil {
		log.Fatal(err)
	}

	fmt.Println(s1.Sum(nil))
	fmt.Println(s256.Sum(nil))
}

For simplicity, we just call Write function on a Writer, but when content comes from a Reader, then io.Copy can be used as well:

_, err := io.Copy(io.MultiWriter(s1, s256), reader)

Multiple Readers -> Reader

Problem

We have multiple Readers and need to sequentially read from them.

readers and writers - image 4

Solution

The io package provides io.MultiReader function that converts multiple Readers into a single one. The Readers are read in the passed order.

func MultiReader(readers ...Reader) Reader

Then this Reader can be used in any function that accepts Reader as an argument.

Example

Workhorse reads the first N bytes of an image to detect whether it's a PNG file and puts them back by building a Reader from multiple Readers:

func NewReader(r io.Reader) (io.Reader, error) {
	magicBytes, err := readMagic(r)
	if err != nil {
		return nil, err
	}

	if string(magicBytes) != pngMagic {
		debug("Not a PNG - read file unchanged")
		return io.MultiReader(bytes.NewReader(magicBytes), r), nil
	}

	return io.MultiReader(bytes.NewReader(magicBytes), &Reader{underlying: r}), nil
}

Multiple Readers -> Multiple Writers

Problem

We need to pass content from multiple Readers into multiple Writers.

readers and writers - image 6

Solution

The solutions above can be generalized on the many-to-many use case.

_, err := io.Copy(io.MultiWriter(w1, w2, w3), io.MultiReader(r1, r2, r3))

Reader -> Reader + Writer

Problem

We need to read content from a Reader or pass the Reader to a function and simultaneously write the content into a Writer.

readers and writers - image 2

Solution

The io package provides io.TeeReader function that accepts a Reader to read from, a Writer to write to, and returns a Reader that can be processed further.

func TeeReader(r Reader, w Writer) Reader

The implementation of the functionality is straightforward. The passed Reader and Writer are stored in a structure that is a Reader itself:

func TeeReader(r Reader, w Writer) Reader {
	return &teeReader{r, w}
}

type teeReader struct {
	r Reader
	w Writer
}

The Read function implemented for the structure delegates the Read to the passed Reader and also performs a Write to the passed Writer:

func (t *teeReader) Read(p []byte) (n int, err error) {
	n, err = t.r.Read(p)
	if n > 0 {
		if n, err := t.w.Write(p[:n]); err != nil {
			return n, err
		}
	}
	return
}

Example 1

We already touched hashing topic in the Multiple Writers -> Writer section and io.TeeReader is used to provide a Writer to create a hash from content. The returned Reader can be further used to upload content to object storage.

Example 2

Workhorse uses io.TeeReader to implement Dependency Proxy functionality. Dependency Proxy caches requested upstream images in the object storage. The not-yet-cached use case has the following behavior:

  • A user performs an HTTP request.
  • The upstream image is fetched using net/http and http.Response provides its content via Body field, which is io.ReadCloser (basically an io.Reader).
  • We need to send this content back to the user by writing it into http.ResponseWriter (basically an io.Writer).
  • We need to simultaniously upload the content to object storage by performing an http.Request (a function that accepts an io.Reader).

As a result, io.TeeReader can be used to glue these primitives together:

func (p *Injector) Inject(w http.ResponseWriter, r *http.Request, sendData string) {
	// Fetch upstream data via HTTP
	dependencyResponse, err := p.fetchUrl(r.Context(), sendData)
	...
	// Create a tee reader. Each Read will read from dependencyResponse.Body and simultaneously
        // perform a Write to w writer
	teeReader := io.TeeReader(dependencyResponse.Body, w)
	// Pass the tee reader as the body of an HTTP request to upload it to object storage
	saveFileRequest, err := http.NewRequestWithContext(r.Context(), "POST", r.URL.String()+"/upload", teeReader)
	...
	nrw := &nullResponseWriter{header: make(http.Header)}
	p.uploadHandler.ServeHTTP(nrw, saveFileRequest)
	...

Writer -> Reader

Problem

We have a function that accepts a Writer, and we are interested in the content that the function would write into the Writer. We want to intercept the content and represent it as a Reader to further process it in a streaming fashion.

readers and writers - image 5

Solution

The io package provides io.Pipe function that returns a Reader and a Writer:

func Pipe() (*PipeReader, *PipeWriter)

The Writer can be used to be passed to the function that accepts a Writer. All the content that has been written into it will be accessible via the reader, i.e. a synchronous in-memory pipe is created that can be used to connect code expecting an io.Reader with code expecting an io.Writer.

Example 1

For LSIF file transformation for code navigation we need to:

  • Read content of a zip file.
  • Transform the content and serialize it into zip.Writer.
  • Represent the new compressed content as a Reader to be further processed in a streaming fashion.

The zip.NewWriter function accepts a Writer to which it will write the compressed content. It is handy when we need to pass an open file descriptor to the function to save the content to the file. However, when we need to pass the compressed content via an HTTP request, we need to represent the data as a Reader.

// The `io.Pipe()` creates a reader and a writer.
pr, pw := io.Pipe()

// The writer is passed to `parser.transform` function which will write
// the transformed compressed content into it
// The writing should happen asynchronously in a goroutine because each `Write` to
// the `PipeWriter` blocks until it has satisfied one or more `Read`s from the `PipeReader`.
go parser.transform(pw)

// Everything that has been written into it is now accessible via the reader.
parser := &Parser{
	Docs: docs,
	pr:   pr,
}

// pr is a reader that can be used to read all the data written to the pw writer
return parser, nil

Example 2

For Geo setups GitLab Shell proxies all git push operations to secondary and redirects them to primary.

  • GitLab Shell establishes an SSH connection and defines ReadWriter struct that has In field of io.Reader type to read data from a user and Out field of io.Writer type to send response to the user.
  • GitLab Shell performs an HTTP request to /info/refs and sends response.Body of type io.Reader to the user using io.Copy
  • The user reacts to this response by sending data to In and GitLab Shell needs to read this data, convert it to a request expected by Git HTTP, and send it as an HTTP request to /git-receive-pack. This is where io.Pipe becomes useful.
func (c *PushCommand) requestReceivePack(ctx context.Context, client *git.Client) error {
	// Define pipeReader and pipeWriter and use pipeWriter to collect all the data
	//sent by the user converted to a format expected by Git HTTP.
	pipeReader, pipeWriter := io.Pipe()
	// The writing happens asynchronously because it's a blocking operation
	go c.readFromStdin(pipeWriter)

	// pipeReader can be passed as io.Reader and used to read all the data written to pipeWriter
	response, err := client.ReceivePack(ctx, pipeReader)
	...
	_, err = io.Copy(c.ReadWriter.Out, response.Body)
	...
}

func (c *PushCommand) readFromStdin(pw *io.PipeWriter) {
	var needsPackData bool

	// Scanner reads the user input line by line
	scanner := pktline.NewScanner(c.ReadWriter.In)
	for scanner.Scan() {
		line := scanner.Bytes()
		// And writes it to the pipe writer
		pw.Write(line)
		...
	}

	// The data that hasn't been processed by a scanner is copied if necessary
	if needsPackData {
		io.Copy(pw, c.ReadWriter.In)
	}

	// Close the pipe writer to signify EOF for the pipe reader
	pw.Close()
}

Try Golang

Golang provides elegant patterns designed to efficiently process data in a streaming fashion. The patterns can be used to address new challenges or refactor the existing performance issues associated with high memory consumption.

Learn more about GitLab and Golang.

We want to hear from you

Enjoyed reading this blog post or have questions or feedback? Share your thoughts by creating a new topic in the GitLab community forum. Share your feedback

Ready to get started?

See what your team could do with a unified DevSecOps Platform.

Get free trial

Find out which plan works best for your team

Learn about pricing

Learn about what GitLab can do for your team

Talk to an expert