Every hour, GitLab transfers terabytes of Git data between a server and a client. It is hard or even impossible to handle this amount of traffic unless it is done efficiently in a streaming fashion. Git data is served by Gitaly (Git server), GitLab Shell (Git via SSH), and Workhorse (Git via HTTP(S)). These services are implemented using Go - the language that conveniently provides abstractions to efficiently deal with I/O operations.
Golang's io
package provides Reader
and Writer
interfaces to abstract the functionality of I/O implementations into public interfaces.
Reader
is the interface that wraps the basic Read
method:
type Reader interface {
Read(p []byte) (n int, err error)
}
Writer
is the interface that wraps the basic Write
method.
type Writer interface {
Write(p []byte) (n int, err error)
}
For example, os
package provides an implementation of reading a file. File
type implements Reader
and Writer
interfaces by defining basic Read
and Write
functions.
In this blog post, you'll learn how to compose Readers and Writers in Golang applications.
First, let's read from a file and write its content to os.Stdout
.
func main() {
file, err := os.Open("data.txt")
if err != nil {
log.Fatal(err)
}
defer file.Close()
p := make([]byte, 32 * 1024)
for {
n, err := file.Read(p)
_, errW := os.Stdout.Write(p[:n])
if errW != nil {
log.Fatal(errW)
}
if err != nil {
if errors.Is(err, io.EOF) {
break
}
log.Fatal(err)
}
}
}
Each call of the Read
function fills the buffer p
with the content from the file, i.e. the file is being consumed in chunks (up to 32KB
) instead of being fully loaded into the memory.
To simplify this widely used pattern, io
package conveniently provides Copy
function that allows passing content from any Reader
to any Writer
and also handles additional edge cases.
func main() {
file, err := os.Open("data.txt")
if err != nil {
log.Fatal(err)
}
defer file.Close()
if _, err := io.Copy(os.Stdout, file); err != nil {
log.Fatal(err)
}
}
Reader
and Writer
interfaces are used across the whole Golang ecosystem because they facilitate reading and writing content in a streaming fashion. Therefore, gluing together the Readers and Writers with the functions that expect these interfaces as arguments is a frequent problem to solve. Sometimes it's as straightforward as passing content from a Reader into a Writer, but sometimes the content written into a Writer must be represented as a Reader or the content from a reader must be sent into multiple Writers. Let's have a closer look into different use cases and the examples of solving these types of problems in the GitLab
codebase.
Reader -> Writer
Problem
We need to pass content from a Reader into a Writer.
Solution
The problem can be solved by using io.Copy
.
func Copy(dst Writer, src Reader) (written int64, err error)
Example
InfoRefs*
Gitaly RPCs return a Reader
and we want to stream its content to a user via HTTP response:
func handleGetInfoRefsWithGitaly(ctx context.Context, responseWriter *HttpResponseWriter, a *api.Response, rpc, gitProtocol, encoding string) error {
...
infoRefsResponseReader, err := smarthttp.InfoRefsResponseReader(ctx, &a.Repository, rpc, gitConfigOptions(a), gitProtocol)
...
if _, err = io.Copy(w, infoRefsResponseReader); err != nil {
return err
}
...
}
Reader -> Multiple Writers
Problem
We need to pass content from a Reader into multiple Writers.
Solution
The io
package provides io.MultiWriter
function that converts multiple Writers into a single one. When its Write
function is called, the content is copied to all the Writers (implementation).
func MultiWriter(writers ...Writer) Writer
Example
Given we want to build md5
, sha1
, sha256
and sha512
hashes from the same content. Hash
type is a Writer
. Using io.MultiWriter
, we define multiHash
Writer. After the content is written to the multiHash
, we calculate the hashes of all these functions in a single run.
The simplified version of the example is:
package main
import (
"crypto/sha1"
"crypto/sha256"
"fmt"
"io"
"log"
)
func main() {
s1 := sha1.New()
s256 := sha256.New()
w := io.MultiWriter(s1, s256)
if _, err := w.Write([]byte("content")); err != nil {
log.Fatal(err)
}
fmt.Println(s1.Sum(nil))
fmt.Println(s256.Sum(nil))
}
For simplicity, we just call Write
function on a Writer, but when content comes from a Reader, then io.Copy
can be used as well:
_, err := io.Copy(io.MultiWriter(s1, s256), reader)
Multiple Readers -> Reader
Problem
We have multiple Readers and need to sequentially read from them.
Solution
The io
package provides io.MultiReader
function that converts multiple Readers into a single one. The Readers are read in the passed order.
func MultiReader(readers ...Reader) Reader
Then this Reader can be used in any function that accepts Reader
as an argument.
Example
Workhorse reads the first N
bytes of an image to detect whether it's a PNG file and puts them back by building a Reader from multiple Readers:
func NewReader(r io.Reader) (io.Reader, error) {
magicBytes, err := readMagic(r)
if err != nil {
return nil, err
}
if string(magicBytes) != pngMagic {
debug("Not a PNG - read file unchanged")
return io.MultiReader(bytes.NewReader(magicBytes), r), nil
}
return io.MultiReader(bytes.NewReader(magicBytes), &Reader{underlying: r}), nil
}
Multiple Readers -> Multiple Writers
Problem
We need to pass content from multiple Readers into multiple Writers.
Solution
The solutions above can be generalized on the many-to-many use case.
_, err := io.Copy(io.MultiWriter(w1, w2, w3), io.MultiReader(r1, r2, r3))
Reader -> Reader + Writer
Problem
We need to read content from a Reader or pass the Reader to a function and simultaneously write the content into a Writer.
Solution
The io
package provides io.TeeReader function that accepts a Reader to read from, a Writer to write to, and returns a Reader that can be processed further.
func TeeReader(r Reader, w Writer) Reader
The implementation of the functionality is straightforward. The passed Reader
and Writer
are stored in a structure that is a Reader
itself:
func TeeReader(r Reader, w Writer) Reader {
return &teeReader{r, w}
}
type teeReader struct {
r Reader
w Writer
}
The Read
function implemented for the structure delegates the Read
to the passed Reader
and also performs a Write
to the passed Writer
:
func (t *teeReader) Read(p []byte) (n int, err error) {
n, err = t.r.Read(p)
if n > 0 {
if n, err := t.w.Write(p[:n]); err != nil {
return n, err
}
}
return
}
Example 1
We already touched hashing topic in the Multiple Writers -> Writer
section and io.TeeReader
is used to provide a Writer to create a hash from content. The returned Reader can be further used to upload content to object storage.
Example 2
Workhorse uses io.TeeReader
to implement Dependency Proxy functionality. Dependency Proxy caches requested upstream images in the object storage. The not-yet-cached use case has the following behavior:
- A user performs an HTTP request.
- The upstream image is fetched using
net/http
andhttp.Response
provides its content viaBody
field, which isio.ReadCloser
(basically anio.Reader
). - We need to send this content back to the user by writing it into
http.ResponseWriter
(basically anio.Writer
). - We need to simultaniously upload the content to object storage by performing an
http.Request
(a function that accepts anio.Reader
).
As a result, io.TeeReader
can be used to glue these primitives together:
func (p *Injector) Inject(w http.ResponseWriter, r *http.Request, sendData string) {
// Fetch upstream data via HTTP
dependencyResponse, err := p.fetchUrl(r.Context(), sendData)
...
// Create a tee reader. Each Read will read from dependencyResponse.Body and simultaneously
// perform a Write to w writer
teeReader := io.TeeReader(dependencyResponse.Body, w)
// Pass the tee reader as the body of an HTTP request to upload it to object storage
saveFileRequest, err := http.NewRequestWithContext(r.Context(), "POST", r.URL.String()+"/upload", teeReader)
...
nrw := &nullResponseWriter{header: make(http.Header)}
p.uploadHandler.ServeHTTP(nrw, saveFileRequest)
...
Writer -> Reader
Problem
We have a function that accepts a Writer, and we are interested in the content that the function would write into the Writer. We want to intercept the content and represent it as a Reader to further process it in a streaming fashion.
Solution
The io
package provides io.Pipe
function that returns a Reader and a Writer:
func Pipe() (*PipeReader, *PipeWriter)
The Writer can be used to be passed to the function that accepts a Writer. All the content that has been written into it will be accessible via the reader, i.e. a synchronous in-memory pipe is created that can be used to connect code expecting an io.Reader
with code expecting an io.Writer
.
Example 1
For LSIF file transformation for code navigation we need to:
- Read content of a zip file.
- Transform the content and serialize it into
zip.Writer
. - Represent the new compressed content as a Reader to be further processed in a streaming fashion.
The zip.NewWriter
function accepts a Writer to which it will write the compressed content. It is handy when we need to pass an open file descriptor to the function to save the content to the file. However, when we need to pass the compressed content via an HTTP request, we need to represent the data as a Reader.
// The `io.Pipe()` creates a reader and a writer.
pr, pw := io.Pipe()
// The writer is passed to `parser.transform` function which will write
// the transformed compressed content into it
// The writing should happen asynchronously in a goroutine because each `Write` to
// the `PipeWriter` blocks until it has satisfied one or more `Read`s from the `PipeReader`.
go parser.transform(pw)
// Everything that has been written into it is now accessible via the reader.
parser := &Parser{
Docs: docs,
pr: pr,
}
// pr is a reader that can be used to read all the data written to the pw writer
return parser, nil
Example 2
For Geo setups GitLab Shell proxies all git push
operations to secondary and redirects them to primary.
- GitLab Shell establishes an SSH connection and defines
ReadWriter
struct that hasIn
field ofio.Reader
type to read data from a user andOut
field ofio.Writer
type to send response to the user. - GitLab Shell performs an HTTP request to
/info/refs
and sendsresponse.Body
of typeio.Reader
to the user usingio.Copy
- The user reacts to this response by sending data to
In
and GitLab Shell needs to read this data, convert it to a request expected by Git HTTP, and send it as an HTTP request to/git-receive-pack
. This is whereio.Pipe
becomes useful.
func (c *PushCommand) requestReceivePack(ctx context.Context, client *git.Client) error {
// Define pipeReader and pipeWriter and use pipeWriter to collect all the data
//sent by the user converted to a format expected by Git HTTP.
pipeReader, pipeWriter := io.Pipe()
// The writing happens asynchronously because it's a blocking operation
go c.readFromStdin(pipeWriter)
// pipeReader can be passed as io.Reader and used to read all the data written to pipeWriter
response, err := client.ReceivePack(ctx, pipeReader)
...
_, err = io.Copy(c.ReadWriter.Out, response.Body)
...
}
func (c *PushCommand) readFromStdin(pw *io.PipeWriter) {
var needsPackData bool
// Scanner reads the user input line by line
scanner := pktline.NewScanner(c.ReadWriter.In)
for scanner.Scan() {
line := scanner.Bytes()
// And writes it to the pipe writer
pw.Write(line)
...
}
// The data that hasn't been processed by a scanner is copied if necessary
if needsPackData {
io.Copy(pw, c.ReadWriter.In)
}
// Close the pipe writer to signify EOF for the pipe reader
pw.Close()
}
Try Golang
Golang provides elegant patterns designed to efficiently process data in a streaming fashion. The patterns can be used to address new challenges or refactor the existing performance issues associated with high memory consumption.
Learn more about GitLab and Golang.