Beer and Concurrent HTTP Pipelines

This post is going to be a variation of the famous pipelines and cancellation Go blog post, modified for crawling a website, and updated using net/http’s native support for request contexts introduced in Go 1.7.

Downloading Beer Recipes

My motivation for building a concurrent website crawler was to be able to download a catalog of all public recipes on Brewtoad. Each recipe can be individually exported as XML, but from what I could tell there was no API to be able to download them all. So, I got to crawlin'.

(Apologies in advance to the Brewtoad team for the harm I may have done to your server by writing this post. Internet, please be gentle. <3)

One at a Time

Recipes can be viewed by visiting the URL https://www.brewtoad.com/recipes?page=X&sort=rank, where X is a page number beginning at 1. Each page contains a series of links to specific recipes, and the XML for that recipe can be downloaded by simply adding .xml to the end of the URL. A simple first-pass, non-concurrent algorithm would then look like this:

for page := 1; ; page++ {
    resp, err := http.Get(fmt.Sprintf(
        "https://www.brewtoad.com/recipes?page=%d&sort=rank",
        page,
    ))
    if err != nil {
        panic(err)
    }

    // This line uses the `golang.org/x/net/html` package.
    doc, err := html.Parse(resp.Body)
    resp.Body.Close()
    if err != nil {
        panic(err)
    }
    
    // getRecipeLinks() implementation elided, but it walks the page,
    // extracts beer recipe links, and returns them as a slice.
    for _, recipeLink := range getRecipeLinks(doc) {
        beerXml, err := http.Get(
            "https://www.brewtoad.com" + recipeLink + ".xml",
        )
        if err != nil {
            panic(err)
        }

        // Beer recipe XML acquired!
    }
}

Since the majority of time spent running this code is waiting for the network request to finish, it makes sense that concurrently executing multiple requests at once would speed the whole process up.

Concurrency: Take One

The natural way to concurrify this code is to kick off a new goroutine within each iteration of the loop:

for i := 1; ; i++ {
    go func(page int) {
        resp, err := http.Get(fmt.Sprintf(
            "https://www.brewtoad.com/recipes?page=%d&sort=rank",
            page,
        ))
        if err != nil {
            panic(err)
        }

        doc, err := html.Parse(resp.Body)
        resp.Body.Close()
        if err != nil {
            panic(err)
        }
        
        for _, recipeLink := range getRecipeLinks(doc) {
            go func(recipeLink string) {
                beerXml, err := http.Get(
                    "https://www.brewtoad.com" + recipeLink + ".xml",
                )
                if err != nil {
                    panic(err)
                }

                // Beer recipe XML acquired!
            }(recipeLink)
        }
    }(i)
}

You could then use a channel to collect all of your downloaded recipes into a central location.

There’s one problem with this approach, though:

Get https://www.brewtoad.com/recipes?page=7136&sort=rank: dial tcp 192.237.224.29:443: socket: too many open files

or

Get https://www.brewtoad.com/recipes?page=8120&sort=rank: dial tcp: lookup www.brewtoad.com: no such host

Exactly what error you get when you run that code may vary, but no computer likes having thousands of HTTP requests sent off together in the blink of an eye. While writing this post, I let that program run for a little bit too long, and my computer’s fan kicked into high gear. I wasn’t able to get it back to reasonable levels without kill -9.

This kind of “kick off a new goroutine for each computation you need, then wait on the results” approach is fine for some workloads, but sending that many concurrent HTTP requests is not one of them. So, a more nuanced approach is needed.

Concurrency: Take Two

The natural way around this is to limit the number of goroutines that are active at a time. This type of pattern is very common, and thanks to sync.WaitGroup, very easy to implement:

const numGoroutines = 50 // can adjust based on your hardware

var wg sync.WaitGroup
wg.Add(numGoroutines)

for i := 0; i < numGoroutines; i++ {
    go doSomething()
}

wg.Wait()

The next question then becomes the implementation of that doSomething() function. Your goroutines will each need to download more than one page, and they’ll need some way of figuring out who should download which page.

One way to distribute the work is to “fan-out” by deciding which pages you want to download, and then using a shared channel to send that input to whichever goroutine gets it first:

pc := make(chan int)

// Fill the input channel with the numbers 1-100 inclusive.
go func() {
    for i := 1; i <= 100; i++ {
        pc <- i
    }
    close(pc)
}()

for i := 0; i < numGoroutines; i++ {
    go func() {
        for {
            // Grab the next page off the channel. The `ok` value
            // will be false only if the channel has been closed,
            // which means there is no more input to process.
            page, ok := <-pc
            if !ok {
                break
            }

            // Download the page and collect its recipes.

            // Each recipe download on this page should be done
            // synchronously within this goroutine.
        }

        wg.Done()
    }()
}

wg.Wait()

With this change, you can now safely queue up as many pages of recipes as you’d like, but your computer will no longer hate you since there will be at most numGoroutines HTTP requests in flight at any given point in time.

Adding Cancellation

One nice improvement is the ability to cancel all in-flight network requests. This can be because you caught a SIGINT from the user, a certain amount of time has passed, or any other reason. With the release of Go 1.7, this is super easy to do since http.Request now supports native request-scoped contexts, which include the ability to cancel at a moment’s notice.

A simple cancellation context can be initialized like this:

ctx, cancel := context.WithCancel(context.Background())

The ctx context then needs to be included with each request you kick off, so the calls to http.Get() then become:

req, err := http.NewRequest("GET", url, nil)
if err != nil {
    // handle err
}

http.DefaultClient.Do(req.WithContext(ctx))

The Full Example

For those who want it, here’s the full working code:

package main

import (
    "context"
    "errors"
    "fmt"
    "io"
    "io/ioutil"
    "net/http"
    "strconv"
    "strings"
    "sync"
    "time"

    "golang.org/x/net/html"
)

func main() {
    // Do an initial request to determine how many pages of recipes there are
    // available to download.
    pageCount, err := getRecipePageCount()
    if err != nil {
        panic(err)
    }

    // Initialize the input (page count) channel and a cancelable request context.
    var (
        pc          = make(chan int)
        ctx, cancel = context.WithCancel(context.Background())
    )

    // Fill the input channel in a separate goroutine.
    go func() {
        for i := 1; i <= pageCount; i++ {
            pc <- i
        }
        close(pc)
    }()

    // Kick off the job. This returns two channels; one for receiving downloaded
    // recipes, and one for receiving errors.
    const numGoroutines = 50
    rc, errc := downloadRecipes(numGoroutines, ctx, pc)

    // Demonstration of pipeline cancellation. After 30 seconds, every goroutine
    // will be told to drop what it's doing and exit.
    go func() {
        <-time.After(30 * time.Second)
        fmt.Println("Telling everyone to clean up.")
        cancel()
    }()

    // Wait on input values from the spawned goroutines.
    for {
        // This block uses some clever channel tricks. If `ok` is false, it
        // means that the channel has been closed. A receive operation on a
        // closed channel immediately returns its type's zero value, but a
        // receive operation on a nil channel will never return. Once the
        // channel has been closed, we want to set it to nil to ensure that
        // that arm of the switch statement is never executed again.
        select {
        case recipe, ok := <-rc:
            if !ok {
                rc = nil
                break
            }

            // Do something with recipe. This will probably involve
            // saving it somewhere, unmarshaling it into a struct,
            // or both. Could be a good opportunity for another
            // pipeline!
            _ = recipe
            fmt.Println("Got a recipe.")

        case err, ok := <-errc:
            if !ok {
                errc = nil
                break
            }

            fmt.Println("Error: " + err.Error())
        }

        // Once both channels have been closed and set to nil, we need to
        // break out of the loop to avoid hanging indefinitely on no input.
        if rc == nil && errc == nil {
            break
        }
    }
}

// downloadRecipes spawns numGoroutines goroutines to download recipes from Brewtoad.
// The provided context can be used to cancel in-flight requests. The input channel
// provides the page numbers that should be downloaded. This method returns two channels:
// one for downloaded recipes in XML format, and one for any errors encountered.
func downloadRecipes(numGoroutines int, ctx context.Context, pc <-chan int) (<-chan string, <-chan error) {
    var (
        wg   sync.WaitGroup
        rc   = make(chan string)
        errc = make(chan error)
    )

    wg.Add(numGoroutines)

    for g := 0; g < numGoroutines; g++ {
        go func() {
            running := true
            for running {
                select {
                case page, ok := <-pc:
                    if !ok {
                        running = false
                        break
                    }

                    err := getRecipesForPage(ctx, rc, page)
                    // Don't send anything on the error channel if it was nil,
                    // or if cancellation was requested, since we're trying to
                    // abort everything anyway.
                    if err != nil && ctx.Err() != context.Canceled {
                        errc <- err
                    }

                // A receive event on this channel means that we're cancelled,
                // so we should stop what we're doing and exit the loop.
                case <-ctx.Done():
                    running = false
                }
            }
            wg.Done()
        }()
    }

    // Once all goroutines have finished, close the returned channels.
    go func() {
        wg.Wait()
        close(rc)
        close(errc)
    }()

    return rc, errc
}

// getRecipesForPage downloads a recipe page and sends each recipe found
// there along the provided channel.
func getRecipesForPage(ctx context.Context, rc chan<- string, page int) error {
    doc, err := downloadPage(ctx, page)
    if err != nil {
        return err
    }

    for _, link := range findRecipeLinks(doc) {
        r, err := downloadBeerXml(ctx, link)
        if err != nil {
            return err
        }

        beerXml, err := ioutil.ReadAll(r)
        r.Close()
        if err != nil {
            return err
        }

        rc <- string(beerXml)
    }

    return nil
}

// downloadPage downloads a recipe page from Brewtoad and parses it into
// an HTML document.
func downloadPage(ctx context.Context, page int) (*html.Node, error) {
    req, err := http.NewRequest("GET", fmt.Sprintf(
        "https://www.brewtoad.com/recipes?page=%d&sort=rank",
        page,
    ), nil)
    if err != nil {
        return nil, err
    }

    resp, err := http.DefaultClient.Do(req.WithContext(ctx))
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()
    return html.Parse(resp.Body)
}

// findRecipeLinks traverses an HTML document looking for beer recipe links.
func findRecipeLinks(doc *html.Node) (links []string) {
    var f func(*html.Node)
    f = func(n *html.Node) {
        defer func() {
            for c := n.FirstChild; c != nil; c = c.NextSibling {
                f(c)
            }
        }()

        if n.Type == html.ElementNode && n.Data == "a" {
            if classes, ok := getAttr(n.Attr, "class"); ok {
                for _, class := range strings.Fields(classes) {
                    if class == "recipe-link" {
                        href, _ := getAttr(n.Attr, "href")
                        links = append(links, href)
                        return
                    }
                }
            }
        }
    }
    f(doc)
    return
}

// downloadBeerXml downloads the XML for the provided recipe link. If no error
// is returned, then the io.ReadCloser must be closed by the caller in order to
// prevent a resource leak.
func downloadBeerXml(ctx context.Context, link string) (io.ReadCloser, error) {
    req, err := http.NewRequest("GET", "https://www.brewtoad.com"+link+".xml", nil)
    if err != nil {
        return nil, errors.New("failed to build beer xml request: " + err.Error())
    }

    resp, err := http.DefaultClient.Do(req.WithContext(ctx))
    if err != nil {
        return nil, errors.New("failed to get beer xml: " + err.Error())
    }
    return resp.Body, nil
}

// getRecipePageCount downloads the first page and checks the pagination to determine
// how many pages of recipes there are.
func getRecipePageCount() (int, error) {
    resp, err := http.Get("https://www.brewtoad.com/recipes?page=1&sort=rank")
    if err != nil {
        return 0, err
    }
    defer resp.Body.Close()

    doc, err := html.Parse(resp.Body)
    if err != nil {
        return 0, err
    }

    var pageCount int

    var f func(*html.Node)
    f = func(n *html.Node) {
        defer func() {
            if pageCount == 0 {
                for c := n.FirstChild; c != nil; c = c.NextSibling {
                    f(c)
                }
            }
        }()

        if n.Type == html.ElementNode && n.Data == "a" {
            if classes, ok := getAttr(n.Attr, "class"); ok {
                for _, class := range strings.Fields(classes) {
                    if class == "next_page" {
                        pageCount, err = strconv.Atoi(n.PrevSibling.PrevSibling.FirstChild.Data)
                        return
                    }
                }
            }
        }
    }
    f(doc)

    return pageCount, nil
}

// getAttr is a utility method for looking up an HTML element attribute.
func getAttr(attrs []html.Attribute, name string) (string, bool) {
    for _, attr := range attrs {
        if attr.Key == name {
            return attr.Val, true
        }
    }
    return "", false
}

And as a bonus, a couple type definitions for unmarshaling Brewtoad recipe results into a more usable form:

type RecipeList struct {
    Recipes []Recipe `xml:"RECIPE"`
}

type Recipe struct {
    XMLName    xml.Name `xml:"RECIPE"`
    Name       string   `xml:"NAME"`
    Type       string   `xml:"TYPE"`
    Brewer     string   `xml:"BREWER"`
    BatchSize  string   `xml:"BATCH_SIZE"`
    BoilSize   string   `xml:"BOIL_SIZE"`
    BoilTime   string   `xml:"BOIL_TIME"`
    Efficiency string   `xml:"EFFICIENCY"`
    Style      struct {
        StyleGuide     string `xml:"STYLE_GUIDE"`
        Version        string `xml:"VERSION"`
        Name           string `xml:"NAME"`
        StyleLetter    string `xml:"STYLE_LETTER"`
        CategoryNumber string `xml:"CATEGORY_NUMBER"`
        Type           string `xml:"TYPE"`
        OGMin          string `xml:"OG_MIN"`
        OGMax          string `xml:"OG_MAX"`
        FGMin          string `xml:"FG_MIN"`
        FGMax          string `xml:"FG_MAX"`
        ABVMin         string `xml:"ABV_MIN"`
        ABVMax         string `xml:"ABV_MAX"`
    } `xml:"STYLE"`
    Fermentables []struct {
        Name           string `xml:"NAME"`
        Origin         string `xml:"ORIGIN"`
        Type           string `xml:"TYPE"`
        Yield          string `xml:"YIELD"`
        Amount         string `xml:"AMOUNT"`
        DisplayAmount  string `xml:"DISPLAY_AMOUNT"`
        Potential      string `xml:"POTENTIAL"`
        Color          string `xml:"COLOR"`
        DisplayColor   string `xml:"DISPLAY_COLOR"`
        AddAfterBoil   string `xml:"ADD_AFTER_BOIL"`
        CoarseFineDiff string `xml:"COARSE_FINE_DIFF"`
        Moisture       string `xml:"MOISTURE"`
        DiastaticPower string `xml:"DIASTATIC_POWER"`
        Protein        string `xml:"PROTEIN"`
        MaxInBatch     string `xml:"MAX_IN_BATCH"`
        RecommendMash  string `xml:"RECOMMEND_MASH"`
        IBUGalPerLB    string `xml:"IBU_GAL_PER_LB"`
        Notes          string `xml:"NOTES"`
    } `xml:"FERMENTABLES>FERMENTABLE"`
    Hops []struct {
        Name          string `xml:"NAME"`
        Origin        string `xml:"ORIGIN"`
        Alpha         string `xml:"ALPH"`
        Beta          string `xml:"BETA"`
        Amount        string `xml:"AMOUNT"`
        DisplayAmount string `xml:"DISPLAY_AMOUNT"`
        Use           string `xml:"USE"`
        Form          string `xml:"FORM"`
        Time          string `xml:"TIME"`
        DisplayTime   string `xml:"DISPLAY_TIME"`
        Notes         string `xml:"NOTES"`
    } `xml:"HOPS>HOP"`
    Yeasts []struct {
        Laboratory  string `xml:"LABORATORY"`
        Name        string `xml:"NAME"`
        Type        string `xml:"TYPE"`
        Form        string `xml:"FORM"`
        Attenuation string `xml:"ATTENUATION"`
    } `xml:"YEASTS>YEAST"`

    // TODO: The <MISCS> tag.
}