This page describes a simple abstraction that allows you to break your program logic into simple functions that are wrapped into coroutines and glued together as a Unix shell pipeline. This brings the benefits of flexibility, reusability and composablilty of the Unix shell model inside your program. The implementation is a single header that requires Boost.Coroutine and assumes that you have a compiler that can handle C++11.
Let’s start with a sample use case for our abstraction. Imagine we have a web access log file from which we wish to extract a list of unique client addresses that made unauthorized accesses to our webserver. Most Unix shell users will know of multiple ways to do this, here’s one:
cat /tmp/access.log | grep " 403 " | cut -f1 -d' ' | sort | uniq
This same solution can be expressed in C++, using the abstraction I’m presenting here, as:
#include <coro/shell.hpp>
int main()
{
shell({
cat("/tmp/access.log"),
grep(" 403 "),
cut(" ", 1),
uniq(),
echo()
});
}
There are no processes being spawned here and cat
, grep
, uniq
, echo
, etc. are chosen as
names only to correspond with the familiar Unix tools. Each of these is actually a simple C++
function that I refer to as the link function. A link function returns a functor that can be
wrapped inside a coroutine and be linked into a pipeline.
For example, the functor returned by cat
can be wrapped into a coroutine that’ll yield lines, from
the bound file, into the pipeline. These lines are fed to the coroutine wrapped around grep
which
selectively yields lines forward only if they match the bound regular expression. The actual
wiring of these links is done by the shell
constructor which takes a vector of link functors as
input, ties them together into the pipeline and executes the pipeline.
If that’s not completely clear, its mostly because we haven’t yet seen any code that does all this. Here it is:
#include <vector>
#include <memory>
#include <vector>
#include <boost/bind.hpp>
#include <boost/function.hpp>
#include <boost/coroutine/coroutine.hpp>
template<typename T>
class basic_pipeline {
private:
typedef boost::coroutines::coroutine<const T&()> coro;
typedef std::unique_ptr<coro> coroptr;
public:
typedef coro in;
typedef typename coro::caller_type out;
typedef boost::function<void(in&, out&)> link;
basic_pipeline(const std::vector<link>& links) {
std::vector<coroptr> coros;
coros.push_back(coroptr(new coro([](out&) {})));
for(auto & l : links) {
coros.push_back(coroptr(new coro(boost::bind(l, boost::ref(*coros.back().get()), _1))));
}
}
};
The class basic_pipeline
is the lynchpin of the abstraction and yet all it contains are a few
typedefs to hide the details and a constructor that takes the link functors we spoke about. The
types in
and out
are both coroutine types that generate and/or consume values of the template
type T
. The main extension element of the design is the link
function that takes two coroutine
references and links them. Here’s the definition of the grep
link function:
typedef basic_pipeline<std::string> shell;
shell::link grep(const std::string& pattern)
{
return [pattern](shell::in & source, shell::out & yield) {
const std::regex regex(pattern);
for(; source; source()) {
const std::string& line(source.get());
if(std::regex_search(line, regex)) {
yield(line);
}
}
};
}
The function returns a lambda that implements the logic of grep. It drains the source coroutine, one line at a time, checks for a regex match against the captured pattern, and if it matches, yields it forward to the coroutine that follows it. In this way, this particular link function is a filter.
Here’s a link that’s a source because it ignores the source coroutine and effectively generates strings of its own:
shell::link cat(const std::string& filename)
{
return [filename](shell::in&, shell::out & yield) {
std::string line;
std::ifstream input(filename);
while(std::getline(input, line)) {
yield(line);
}
};
}
Conversely, here’s a link thats a sink because it ignores the out coroutine and drains all the values generated by its source.
shell::link echo()
{
return [](shell::in & source, shell::out&) {
for(; source; source()) {
std::cout << source.get() << std::endl;
}
};
}
There’s a bunch of other link functions defined in shell.hpp that our program can use.
The only thing left to describe now is how these links are connected. That’s done by the
basic_pipeline
constructor which takes the link functions and wraps them inside coroutine
instances. The wiring is done by simply binding the in&
parameter of each new coroutine to the
last coroutine constructed.
That’s about it, let’s step back and look at what we have at hand here.
The basic_pipeline
abstraction gives us the ability to define links as simple functions that do
one simple thing and the ability to chain them together flexibly to get interesting things
done. Consider, for example, what it takes to extend the program we wrote above to look for the
events we’re interested in, in not just one file, but for all files in a directory that match a
certain filename pattern. Furthermore, we want to hold onto the results of the computation (and not
just print them on the console). Here’s the modified version, in its entirety:
int main()
{
std::set<std::string> lines;
shell({
ls("/tmp"),
grep("*.log"),
cat(),
substr(" 403 "),
cut(" ", 1),
uniq(lines)
});
std::cout<<lines.size()<<std::endl;
}
All we’ve done is modify the pipeline above to use a different source (the directory listing link
ls
,) add a filtering link that picks the filenames we’re interested in, replaced the cat
link
with an overload that acts as a filter (instead of a source) and finally used a version of uniq
that populates a collection we hand it.
This flexibility and composability comes at the cost of numerous switches between coroutine contexts
that make up the pipeline. For a pipeline with l
links through which w
units of work flow, we go
through w x l
context switches. The performance numbers given by Boost.Coroutine
suggest that each of these switches would cost roughly 100 CPU cycles. As long as the computation
performed inside the link functions dominates this plumbing overhead, we’re good, and for non
trivial applications that is usually the case. Having said that, I’d like to advise you against
taking performance assurances from strangers. The only thing you can trust is a decent profiler.
Finally, I’m sure you have noticed, but the pipeline is not restricted to strings. It can be used in any application where we need a work-unit to flow through a chain of workers that sequentially resolve it to its completion. It can also be used in places where it is not a good fit, for example, consider this monstrosity that solves the FizzBuzz problem
#include <iostream>
#include <coro/pipeline.hpp>
int main()
{
typedef basic_pipeline<int> fizzbuzz;
auto one2hundred = [](fizzbuzz::in&, fizzbuzz::out & yield) {
for(int i = 1; i <= 100; i++) {
yield(i);
}
};
auto fizz = [](fizzbuzz::in & source, fizzbuzz::out & yield) {
for(; source; source()) {
int i = source.get();
if(i % 3 == 0) {
std::cout << "fizz";
if(i % 5 != 0) {
std::cout << std::endl;
continue;
}
}
yield(i);
}
};
auto buzz = [](fizzbuzz::in & source, fizzbuzz::out & yield) {
for(; source; source()) {
int i = source.get();
if(i % 5 == 0) {
std::cout << "buzz" << std::endl;
} else {
yield(i);
}
}
};
auto rest = [](fizzbuzz::in & source, fizzbuzz::out&) {
for(; source; source()) {
std::cout << source.get() << std::endl;
}
};
fizzbuzz({one2hundred, fizz, buzz, rest});
}
This page was, in part, inspired by an interesting presentation by a certain David Beazley on the practical uses of coroutines in Python. If you haven’t seen that already, go through it, it is great.