Coroutine Pipelines in C++

C++ Boost

This page describes a simple abstraction that allows you to break your program logic into simple functions that are wrapped into coroutines and glued together as a Unix shell pipeline. This brings the benefits of flexibility, reusability and composablilty of the Unix shell model inside your program. The implementation is a single header that requires Boost.Coroutine and assumes that you have a compiler that can handle C++11.

Let's start with a sample use case for our abstraction. Imagine we have a web access log file from which we wish to extract a list of unique client addresses that made unauthorized accesses to our webserver. Most Unix shell users will know of multiple ways to do this, here's one:

cat /tmp/access.log | grep " 403 " | cut -f1 -d' ' | sort | uniq

This same solution can be expressed in C++, using the abstraction I'm presenting here, as:

#include <coro/shell.hpp>

int main()
{
  shell({
    cat("/tmp/access.log"),
    grep(" 403 "),
    cut(" ", 1),
    uniq(),
    echo()
  });
}

There are no processes being spawned here and cat, grep, uniq, echo, etc. are chosen as names only to correspond with the familiar Unix tools. Each of these is actually a simple C++ function that I refer to as the link function. A link function returns a functor that can be wrapped inside a coroutine and be linked into a pipeline.

For example, the functor returned by cat can be wrapped into a coroutine that'll yield lines, from the bound file, into the pipeline. These lines are fed to the coroutine wrapped around grep which selectively yields lines forward only if they match the bound regular expression. The actual wiring of these links is done by the shell constructor which takes a vector of link functors as input, ties them together into the pipeline and executes the pipeline.

If that's not completely clear, its mostly because we haven't yet seen any code that does all this. Here it is:

#include <vector>
#include <memory>
#include <vector>

#include <boost/bind.hpp>
#include <boost/function.hpp>
#include <boost/coroutine/coroutine.hpp>

template<typename T>
class basic_pipeline {
private:
  typedef boost::coroutines::coroutine<const T&()> coro;
  typedef std::unique_ptr<coro> coroptr;

public:
  typedef coro in;
  typedef typename coro::caller_type out;
  typedef boost::function<void(in&, out&)> link;

  basic_pipeline(const std::vector<link>& links) {
    std::vector<coroptr> coros;
    coros.push_back(coroptr(new coro([](out&) {})));
    for(auto & l : links) {
      coros.push_back(coroptr(new coro(boost::bind(l, boost::ref(*coros.back().get()), _1))));
    }
  }
};

The class basic_pipeline is the lynchpin of the abstraction and yet all it contains are a few typedefs to hide the details and a constructor that takes the link functors we spoke about. The types in and out are both coroutine types that generate and/or consume values of the template type T. The main extension element of the design is the link function that takes two coroutine references and links them. Here's the definition of the grep link function:

typedef basic_pipeline<std::string> shell;

shell::link grep(const std::string& pattern)
{
  return [pattern](shell::in & source, shell::out & yield) {
    const std::regex regex(pattern);
    for(; source; source()) {
      const std::string& line(source.get());
      if(std::regex_search(line, regex)) {
        yield(line);
      }
    }
  };
}

The function returns a lambda that implements the logic of grep. It drains the source coroutine, one line at a time, checks for a regex match against the captured pattern, and if it matches, yields it forward to the coroutine that follows it. In this way, this particular link function is a filter.

Here's a link that's a source because it ignores the source coroutine and effectively generates strings of its own:

shell::link cat(const std::string& filename)
{
  return [filename](shell::in&, shell::out & yield) {
    std::string line;
    std::ifstream input(filename);
    while(std::getline(input, line)) {
      yield(line);
    }
  };
}

Conversely, here's a link thats a sink because it ignores the out coroutine and drains all the values generated by its source.

shell::link echo()
{
  return [](shell::in & source, shell::out&) {
    for(; source; source()) {
      std::cout << source.get() << std::endl;
    }
  };
}

There's a bunch of other link functions defined in shell.hpp that our program can use.

The only thing left to describe now is how these links are connected. That's done by the basic_pipeline constructor which takes the link functions and wraps them inside coroutine instances. The wiring is done by simply binding the in& parameter of each new coroutine to the last coroutine constructed.

That's about it, let's step back and look at what we have at hand here.

Flexibilty and Composability

The basic_pipeline abstraction gives us the ability to define links as simple functions that do one simple thing and the ability to chain them together flexibly to get interesting things done. Consider, for example, what it takes to extend the program we wrote above to look for the events we're interested in, in not just one file, but for all files in a directory that match a certain filename pattern. Furthermore, we want to hold onto the results of the computation (and not just print them on the console). Here's the modified version, in its entirety:

int main()
{
  std::set<std::string> lines;
  shell({
    ls("/tmp"),
    grep("*.log"),
    cat(),
    substr(" 403 "),
    cut(" ", 1),
    uniq(lines)
  });
  std::cout<<lines.size()<<std::endl;
}

All we've done is modify the pipeline above to use a different source (the directory listing link ls,) add a filtering link that picks the filenames we're interested in, replaced the cat link with an overload that acts as a filter (instead of a source) and finally used a version of uniq that populates a collection we hand it.

The Cost

This flexibility and composability comes at the cost of numerous switches between coroutine contexts that make up the pipeline. For a pipeline with l links through which w units of work flow, we go through w x l context switches. The performance numbers given by Boost.Coroutine suggest that each of these switches would cost roughly 100 CPU cycles. As long as the computation performed inside the link functions dominates this plumbing overhead, we're good, and for non trivial applications that is usually the case. Having said that, I'd like to advise you against taking performance assurances from strangers. The only thing you can trust is a decent profiler.

One more thing

Finally, I'm sure you have noticed, but the pipeline is not restricted to strings. It can be used in any application where we need a work-unit to flow through a chain of workers that sequentially resolve it to its completion. It can also be used in places where it is not a good fit, for example, consider this monstrosity that solves the FizzBuzz problem

#include <iostream>
#include <coro/pipeline.hpp>

int main()
{
  typedef basic_pipeline<int> fizzbuzz;

  auto one2hundred = [](fizzbuzz::in&, fizzbuzz::out & yield) {
    for(int i = 1; i <= 100; i++) {
      yield(i);
    }
  };

  auto fizz = [](fizzbuzz::in & source, fizzbuzz::out & yield) {
    for(; source; source()) {
      int i = source.get();
      if(i % 3 == 0) {
        std::cout << "fizz";
        if(i % 5 != 0) {
          std::cout << std::endl;
          continue;
        }
      }
      yield(i);
    }
  };

  auto buzz = [](fizzbuzz::in & source, fizzbuzz::out & yield) {
    for(; source; source()) {
      int i = source.get();
      if(i % 5 == 0) {
        std::cout << "buzz" << std::endl;
      } else {
        yield(i);
      }
    }
  };

  auto rest = [](fizzbuzz::in & source, fizzbuzz::out&) {
    for(; source; source()) {
      std::cout << source.get() << std::endl;
    }
  };

  fizzbuzz({one2hundred, fizz, buzz, rest});
}

This page was, in part, inspired by an interesting presentation by a certain David Beazley on the practical uses of coroutines in Python. If you haven't seen that already, go through it, it is great.