Just add water

  • 11.14.2015
  • etc
No Man's Sky (Rock Paper Shotgun)
No Man's Sky (Rock Paper Shotgun)

The go-to rebuttal to increasing automation tends to be something around how creativity could not be emulated by computers, at least, not for awhile. There's some truth to that statement depending on how you define "creativity", I suppose. The least charitable definition might be synonymous with "content generation", a domain typically exclusive to humans - artists, musicians, writers, and so on - but computers have made some claim to this territory.

The poster child of the automated (i.e. procedural) content generation beachhead is No Man's Sky, which generates a literal universe, complete with stars and beaches and creatures, using interweaving mathematical equations. The universe it generates will reportedly take 5 billion years to explore, and that's just one of those universes. In theory, an infinite amount of universes can be generated by setting different seed values (the value which is used to determine all other values in the game). This interview with Sean Murray goes a bit more into depth on their approach.

No Man's Sky procedural creatures (via)
No Man's Sky procedural creatures (via)

These procedurally-generated universes aren't random, though they are designed to give that appearance. They are completely deterministic depending on the seed value - this is a critical property since there needs to be consistency. If you leave a planet, you should not only be able to come back to that planet, but it should be in a similar state to when you left it.

A big challenge in procedural content generation is figuring out a way of creating things which feel organic and natural - for No Man's Sky, the solution is the impressive-sounding Superformula, developed by John Gielis in 2003. In polar coordinates, with radius $r$ and angle $\varphi$, parameterized by $a, b, m, n_1, n_2, n_3$, the Superformula is:

$$ r\left(\varphi\right) = \left( \left| \frac{\cos\left(\frac{m\varphi}{4}\right)}{a} \right| ^{n_2} + \left| \frac{\sin\left(\frac{m\varphi}{4}\right)}{b} \right| ^{n_3} \right) ^{-\frac{1}{n_{1}}}. $$

The above is the formula for two dimensions, but it is easily generalized to higher dimensions as well by using spherical products of superformulas (cf. Wikipedia).

Some forms resulting from the 2D formula:

2D Superformula-generated shapes (Wikipedia)
2D Superformula-generated shapes (Wikipedia)

Procedural generation is a really beautiful approach to game design. It's not so much about creating a specific experience but rather about defining the conditions for limitless experiences. No Man's Land is far from the first to do this, but it is the first (as far as I know) to have all of its content procedurally generated. The game Elite (from 1984!) also had procedurally-generated universe. Elite's universe was much simpler of course, but used a clever approach using the Fibonacci sequence to simulate randomness:

$$ \begin{aligned} x_0 &= \text{seed} \\ x_n &= x_{n-1} + x_{n-2} \end{aligned} $$

And then taking the last few digits of values generated from this sequence.

For example, take the Fibonacci sequence:

$$ 0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987, \dots $$

Let's jump at head in the sequence:

$$ 1597, 2584, 4181, 6765, 10946, 17711, 28657, \dots $$

If we just look at the last three digits, the resulting sequence looks quite random:

$$ 597, 584, 181, 765, 946, 711, 657, \dots $$

The great thing about this is that these universes can be shared by sharing seed values.

Elite's procedurally-generated universe (Gamasutra)
Elite's procedurally-generated universe (Gamasutra)

Procedural content generation is interesting, but other games focus on "procedural" stories. These games have (some) manually generated content - creatures, buildings, etc - but focus on designing complex systems which form the bedrock of the game's ability to spawn wild and fantastic stories. Dwarf Fortress and RimWorld are two great examples, which are essentially fantasy and sci-fi world-simulators (respectively) which model things like each individual's mental health, weather patterns, crop growth, and so on. No one writes the stories ahead of time, no one has a premeditated experience for you put in place - it's all off-the-cuff based on the dynamics of the game's rules.

RimWorld (via)
RimWorld (via)

The stories that come out of these games are amazing. Dwarf Fortress has an especially vibrant community based around the often absurd things that happen in game (for example, see r/dwarffortress, or for a more academic treatment, see Josh Diaz's thesis).

With the independent game industry continually expanding, I think we'll see more of these kinds of games. They can be developed with relatively small teams but have the richness and depth (and often to a better degree) of a massively-built studio game.


coral metrics sketch

As part of the Coral Project, we're trying to come up with some interesting and useful metrics about community members and discussion on news sites.

It's an interesting exercise to develop metrics which embody an organization's principles. For instance - perhaps we see our content as the catalyst for conversations, so we'd measure an article's success by how much discussion it generates.

Generally, there are two groups of metrics that I have been focusing on:

  • Asset-level metrics, computed for individual articles or whatever else may be commented on
  • User-level metrics, computed for individual users

For the past couple of weeks I've been sketching out a few ideas for these metrics:

  • For assets, the principles that these metrics aspire to capture are around quantity and diversity of discussion.
  • For users, I look at organizational approval, community approval, how much discussion this user tends to generate, and how likely they are to be moderated.

Here I'll walk through my thought process for these initial ideas.

Asset-level metrics

For assets, I wanted to value not only the amount of discussion generated but also the diversity discussions. A good discussion is one in which there's a lot of high-quality exchange (something else to be measured, but not captured in this first iteration) from many different people.

There are two scores to start:

  • a discussion score, which quantifies how much discussion an asset generated. This looks at how much people are talking to each other as opposed to just counting up the number of comments. For instance, a comments section in which all comments are top-level should not have a high discussion score. A comments section in which there are some really deep back-and-forths should have a higher discussion score.
  • a diversity score, which quantifies how many different people are involved in the discussions. Again, we don't want to look at diversity in the comments section as a whole because we are looking for diversity within discussions, i.e. within threads.

The current sketch for computing the discussion score is via two values:

  • maximum thread depth: how long is the longest thread?
  • maximum thread width: what is the highest number of replies for a comment?

These are pretty rough approximations of "how much discussion" there is. The idea is that for sites which only allow one level of replies, a lot of replies to a comment can signal a discussion, and that a very deep thread signals the same for sites which allow more nesting.

The discussion score of a top-level thread is the product of these two intermediary metrics:

$$ \text{discussion score}_{\text{thread}} = \max(\text{thread}_{\text{depth}}) \max(\text{thread}_{\text{width}}) $$

The discussion score for the entire asset is the value that answers this question: if a new thread were to start in this asset, what discussion score would it have?

The idea is that if a section is generating a lot of discussion, a new thread would likely also involve a lot of discussion.

The nice thing about this approach (which is similar to the one used throughout all these sketches) is that we can capture uncertainty. When a new article is posted, we have no idea how good of a discussion a new thread might be. When we have one or two threads - maybe one is long and one is short - we're still not too sure, so we still have a fairly conservative score. But as more and more people comment, we begin to narrow down on the "true" score for the article.

More concretely (skip ahead to be spared of the gory details), we assume that this discussion score is drawn from a Poisson distribution. This makes things a bit easier to model because we can use the gamma distribution as a conjugate prior.

By default, the gamma prior is parameterized with $k=1, \theta=2$ since it is a fairly conservative estimate to start. That is, we begin with the assumption that any new thread is unlikely to generate a lot of discussion, so it will take a lot of discussion to really convince us otherwise.

Since this gamma-Poisson model will be reused elsewhere, it is defined as its own function:

def gamma_poission_model(X, n, k, theta, quantile):
    k = np.sum(X) + k
    t = theta/(theta*n + 1)
    return stats.gamma.ppf(quantile, k, scale=t)

Since the gamma is a conjugate prior here, the posterior is also a gamma distribution with easily-computed parameters based on the observed data (i.e. the "actual" top-level threads in the discussion).

We need an actual value to work with, however, so we need some point estimate of the expected discussion score. However, we don't want to go with the mean since that may be too optimistic a value, especially if we only have a couple top-level threads to look at. So instead, we look at the lower-bound of the 90% credible interval (the 0.05 quantile) to make a more conservative estimate.

So the final function for computing an asset's discussion score is:

def asset_discussion_score(threads, k=1, theta=2):
    X = np.array([max_thread_width(t) * max_thread_depth(t) for t in threads])
    n = len(X)

    k = np.sum(X) + k
    t = theta/(theta*n + 1)

    return {'discussion_score': gamma_poission_model(X, n, k, theta, 0.05)}

A similar approach is used for an asset's diversity score. Here we ask the question: if a new comment is posted, how likely is it to be a posted by someone new to the discussion?

We can model this with a beta-binomial model; again, the beta distribution is a conjugate prior for the binomial distribution, so we can compute the posterior's parameters very easily:

def beta_binomial_model(y, n, alpha, beta, quantile):
    alpha_ = y + alpha
    beta_ = n - y + beta
    return stats.beta.ppf(quantile, alpha_, beta_)

Again we start with conservative parameters for the prior, $\alpha=2, \beta=2$, and then compute using threads as evidence:

def asset_diversity_score(threads, alpha=2, beta=2):
    X = set()
    n = 0
    for t in threads:
        users, n_comments = unique_participants(t)
        X = X | users
        n += n_comments
    y = len(X)

    return {'diversity_score': beta_binomial_model(y, n, alpha, beta, 0.05)}

Then averages for these scores are computed across the entire sample of assets in order to give some context as to what good and bad scores are.

User-level metrics

User-level metrics are computed in a similar fashion. For each user, four metrics are computed:

  • a community score, which quantifies how much the community approves of them. This is computed by trying to predict the number of likes a new post by this user will get.
  • an organization score, which quantifies how much the organization approves of them. This is the probability that a post by this user will get "editor's pick" or some equivalent (in the case of Reddit, "gilded", which isn't "organizational" but holds a similar revered status).
  • a discussion score, which quantifies how much discussion this user tends to generate. This answers the question: if this user starts a new thread, how many replies do we expect it to have?
  • a moderation probability, which is the probability that a post by this user will be moderated.

The community score and discussion score are both modeled as gamma-Poission models using the same function as above. The organization score and moderation probability are both modeled as beta-binomial models using the same function as above.

Time for more refinement

These metrics are just a few starting points to shape into more sophisticated and nuanced scoring systems. There are some desirable properties missing, and of course, every organization has different principles and values, and so the ideas presented here are not one-size-fits-all, by any means. The challenge is to create some more general framework that allows people to easily define these metrics according to what they value.


Machine Learning 101

  • 11.10.2015
  • etc

This post is an adaptation from a talk given at artsec and a workshop given at a few other places (Buenos Aires Media Party 2015, MozFest 2015). The goal is to provide an intuition on machine learning for people without advanced math backgrounds - given that machine learning increasingly rules everything around us, everyone should probably have some mental model of how it works.

The materials for the workshop are available on GitHub.

What is "machine learning" anyways?

There are plenty of introductory machine learning tutorials and courses available, but they tend to focus on cursory tours of algorithms - here's linear regression, here's SVMs, here's neural networks, etc - without building an intuition of what machine learning is actually doing under the hood. This intuition is obvious to practitioners but not for people new to the field. To get the full value of the other machine learning resources out there, that intuition really helps.

But first, to clarify a bit what machine learning is: a basic working definition could be "algorithms for computers to learn patterns". Pattern recognition is something that people are very good at, but difficult for computers to do.

Here we'll go through walkthrough a very simple machine learning task which is prototypical of many real-world machine learning problems. First we'll go through it by hand, noting where our human superpower of pattern recognition comes into play, and then think about how we can translate what we did into something a computer is capable of executing.

Learning functions by hand

A common machine learning goal is to take a dataset and learn the pattern (i.e. relationship) between different variables of the data. Then that pattern can be used to predict values of those variables for new data.

Consider the following data:

Some data
Some data

If I were to ask you to describe the data by a pattern you see in that data, you'd likely draw a line. It is quite obvious to us that, even though the data points don't fall exactly in a line, a line seems to satisfactorily represent the data's pattern.

But a drawn line is no good - what are we supposed to do with it? It's hard to make use of that in a program.

A better way of describing a pattern is as a mathematical equation (i.e. a function). In this form, we can easily plug in new inputs to get predicted outputs. So we can restate our goal of learning a pattern as learning a function.

You may remember that lines are typically expressed in the form:

$$ y = mx + b $$

As a refresher: $y$ is the output, $x$ is the input, $m$ is the slope, and $b$ is the intercept.

Lines are uniquely identified by values of $m$ and $b$:

Some lines
Some lines

Variables like $m$ and $b$ which unique identify a function are called parameters (we also say that $m$ and $b$ paramterize the line). So finding a particular line means finding particular values of $m$ and $b$. So when we say we want to learn a function, we are really saying that we want to learn the parameters of a function, since they effectively define the function.

So how can we find the right values of $m, b$ for the line that fits our data?

Trial-and-error seems like a reasonable approach. Let's start with $m=12, b=2$ (these values were picked arbitrarily, you could start with different values too).

Trying $m=12, b=2$
Trying $m=12, b=2$

The line is still quite far from the data, so let's try lowering the slope to $m=8$.

Trying $m=8, b=2$
Trying $m=8, b=2$

The line's closer, but still far from the data, so we can try lowering the slope again. Then we can check the resulting line, and continue adjusting until we feel satisfied with the result. Let's say we carry out this trial-and-error and end up with $m=4.6, b=40$.

Trying $m=4.6, b=40$
Trying $m=4.6, b=40$

So now we have a function, $y = 4.6x + b$, and if we got new inputs we could predict outputs that would follow the pattern of this dataset.

But this was quite a laborious process. Can we get a computer to do this for us?

Learning functions by computer

The basic approach we took by hand was just to:

  1. Pick some random values for $m$ and $b$
  2. Compare the resulting line to the data to see if it's a good fit
  3. Go back to 1 if the resulting line is not a good fit

This is a fairly repetitive process and so it seems like a good candidate for a computer program. There's one snag though - we could easily eyeball whether or not a line was a "good" fit. A computer can't eyeball a line like that.

So we have to get a bit more explicit about what a "good" fit is, in terms that a computer can make use of. To put it another way, we want the computer to be able to calculate how "wrong" its current guesses for $m$ and $b$ are. We can define a function to do so.

Let's think for a moment about what we would consider a bad-fitting line. The further the line was from the dataset, the worse we consider it. So we want lines that are close to our datapoints. We can restate this in more concrete terms.

First, let's notate our line function guess as $f(x)$. Each datapoint we have is represented as $(x, y)$, where $x$ is the input and $y$ is the output. For a datapoint $(x, y)$, we predict an output $\hat y$ based on our line function, i.e. $\hat y = f(x)$.

We can look at each datapoint and calculate how far it is from our line guess with $y - \hat y$. Then we can combine all these distances (which are called errors) in some way to get a final "wrongness" value.

There are many ways we can do this, but a common way is to square all of these errors (only the magnitude of the error is important) and then take their mean:

$$ \frac{\sum (y - \hat y)^2 }{n} $$

Where $n$ is the number of datapoints we have.

It also helps to think about this as:

$$ \frac{\sum (y - f(x))^2 }{n} $$

To make it a bit clearer that the important part here is our guess at the function $f(x)$.

A function like this which calculates the "wrongness" of our current line guess is called a loss function (or a cost function). Clearly, we want to find a line (that is, values of $m$ and $b$) which are the "least wrong".

Another way of saying this is that we want to find parameters which minimize this loss function. This is basically what we're doing when we eyeball a "good" line.

When we guessed the line by hand, we just iteratively tried different $m$ and $b$ values until it looked good enough. We could use a similar approach with a computer, but when we did it, we again could eyeball how we should change $m$ and $b$ to get closer (e.g. if the line is going above the datapoints, we know we need to lower $b$ or lower $m$ or both). How then can a computer know in which direction to change $m$ and/or $b$?

Changing $m$ and $b$ to get a line that is closer to the data is the same as changing $m$ and $b$ to lower the loss. It's just a matter of having the computer figure out in what direction changes to $m$ and/or $b$ will lower the loss.

Remember that a derivative of a function tells us the rate of change at a specific point in that function. We could compute the derivative of the loss function with respect to our current guesses for $m$ and $b$. This will inform us as to which direction(s) we need to move in order to lower the loss, and gives us new guesses for $m$ and $b$.

Then it's just a matter of repeating this procedure - with our new guesses for $m$ and $b$, use the derivative of the loss function to figure out what direction(s) to move in and get new guesses, and so on.

To summarize, we took our trial-and-error-by-hand approach and turned it into the following computer-suited approach:

  1. Pick some random values for $m$ and $b$
  2. Use a loss function to compare our guess $f(x)$ to the data
  3. Determine how to change $m$ and $b$ by computing the derivative of the loss function with respect to $f(x)$
  4. Go back to 1 and repeat until the loss function can't get any lower (or until it's low enough for our purposes)

A lot of machine learning is just different takes on this basic procedure. If it had to be summarized in one sentence: an algorithm learns some function by figuring out parameters which minimize some loss function.

Different problems and methods involve variations on these pieces - different loss functions, different ways of changing the parameters, etc - or include additional flourishes to help get around the problems that can crop up.

Beyond lines

Here we worked with lines, i.e. functions of the form $y = mx + b$, but not all data fits neatly onto lines. The good news is that this general approach applies to sundry functions, both linear and nonlinear. For example, neural networks can approximate any function, from lines to really crazy-looking and complicated functions.

To be honest, there is quite a bit more to machine learning than just this - figuring out how to best represent data is another major concern, for example. But hopefully this intuition will help provide some direction in a field which can feel like a disconnected parade of algorithms.


Simulating a multi-node (Py)Spark cluster using Docker


I'm working on a set of tools for the Coral Project to make building data analysis pipelines easy and, perhaps one day, accessible to even non-technical folks. Part of what will be offered is a way of easily toggling between running pipelines on a in parallel on a local machine or on a distributed computing cluster. That way, the pipelines that a small organization uses for their data can be adapted to a larger organization just by spinning up the setup described below and changing a configuration option.

I wanted to simulate a multi-node cluster for developing these tools, and couldn't find any guides for doing so. So after some research, here is one.

The setup that follows runs all on one machine (remember, it just simulates a multi-node cluster), but it should be easily adaptable to a real multi-node cluster by appropriately changing the IPs that the containers use the communicate.

I have made available a repo with the Dockerfiles and scripts described below.

The Stack

A lot goes into the cluster stack:

  • Spark - used to define tasks
  • Mesos - used for cluster management
  • Zookeeper - used for electing Mesos leaders
  • Hadoop - used for HDFS (Hadoop Distributed File System)
  • Docker - for containerizing the above

The Setup

There will be a client machine (or "control node"), which is the machine we're working from. In this walkthrough, the client machine also functions as the Docker host (where the Docker containers are run).

Docker containers are spun up for each other part of the stack, and they all communicate via their "external" Docker IPs.

Setting up the client

I'm assuming a Linux environment because that's what Docker works best with (on OSX you are probably running it in a Linux VM anyways). The following instructions are for Ubuntu but should be replicable on other distros.

The client needs to have Spark and Mesos installed to properly interact with the cluster.

Spark has precompiled binaries available on their downloads page which are easily installed:

# go to <https://spark.apache.org/downloads.html>
# select and download the version you want
tar -xzvf spark-*.tgz
rm spark-*.tgz
sudo mv spark* /usr/local/share/spark

Add the following to your ~/.bash_profile as well:

export SPARK_HOME=/usr/local/share/spark

# so pyspark can be imported in python

PySpark has one final requirement, the py4j library:

pip install py4j

Mesos does not have any precompiled binaries, so you must compile it yourself:


# sources available at <https://mesos.apache.org/gettingstarted/>
wget http://www.apache.org/dist/mesos/${MESOS_V}/mesos-${MESOS_V}.tar.gz
tar -zxf mesos-*.tar.gz
rm mesos-*.tar.gz

# dependencies
sudo apt-get install -y openjdk-7-jdk build-essential python-dev python-boto libcurl4-nss-dev libsasl2-dev maven libapr1-dev libsvn-dev

# by default, this installs to /usr/local
cd mesos*
mkdir build
cd build
sudo make install

Finally, we need to configure Spark to use a Mesos cluster:

cp $SPARK_HOME/conf/spark-env.sh.template $SPARK_HOME/conf/spark-env.sh
echo 'export MESOS_NATIVE_JAVA_LIBRARY=/usr/local/lib/libmesos.so' >> $SPARK_HOME/conf/spark-env.sh

That's all for the client.

Setting up the Docker images

Our "cluster" will consist of several Docker containers, with one (or more) for each part of the stack, so we create images for each.


The Zookeeper image is straightforward:

FROM ubuntu:14.04

ENV ZOOKEEPER_PATH /usr/local/share/zookeeper

# update
RUN apt-get update
RUN apt-get upgrade -y

# dependencies
RUN apt-get install -y wget openjdk-7-jre-headless

# zookeeper
RUN wget http://apache.arvixe.com/zookeeper/zookeeper-${ZOOKEEPER_V}/zookeeper-${ZOOKEEPER_V}.tar.gz
RUN tar -zxf zookeeper-*.tar.gz
RUN rm zookeeper-*.tar.gz
RUN mv zookeeper-* $ZOOKEEPER_PATH
RUN mv $ZOOKEEPER_PATH/conf/zoo_sample.cfg $ZOOKEEPER_PATH/conf/zoo.cfg



ENTRYPOINT ["zkServer.sh"]
CMD ["start-foreground"]

A Zookeeper binary is downloaded and installed, then the default config is copied over. We start the Zookeeper service in the foreground so the Docker container does not immediately exit.


The Hadoop image is more involved:

FROM ubuntu:14.04

ENV HADOOP_HOME /usr/local/hadoop
ENV JAVA_HOME /usr/lib/jvm/java-7-openjdk-amd64
ENV HADOOP_TMP /var/hadoop/tmp

# update
RUN apt-get update
RUN apt-get upgrade -y

# dependencies
RUN apt-get install -y openssh-server openjdk-7-jdk wget

# disable ipv6 since hadoop does not support it
RUN echo 'net.ipv6.conf.all.disable_ipv6 = 1' >> /etc/sysctl.conf
RUN echo 'net.ipv6.conf.default.disable_ipv6 = 1' >> /etc/sysctl.conf
RUN echo 'net.ipv6.conf.lo.disable_ipv6 = 1' >> /etc/sysctl.conf

# hadoop
RUN wget http://apache.arvixe.com/hadoop/core/hadoop-${HADOOP_V}/hadoop-${HADOOP_V}.tar.gz
RUN tar -zxf hadoop-*.tar.gz
RUN rm hadoop-*.tar.gz
RUN mv hadoop-* $HADOOP_HOME

# hadoop tmp directory
RUN mkdir -p $HADOOP_TMP
RUN chmod 750 $HADOOP_TMP

# configs
RUN echo "export JAVA_HOME=$JAVA_HOME" >> $HADOOP_HOME/etc/hadoop/hadoop-env.sh
ADD docker/assets/core-site.xml $HADOOP_HOME/etc/hadoop/core-site.xml

# auth
# the provided config saves us from having
# to accept each new host key on connect
RUN ssh-keygen -q -N "" -t rsa -f /root/.ssh/id_rsa
RUN cp /root/.ssh/id_rsa.pub /root/.ssh/authorized_keys
ADD docker/assets/ssh_config /root/.ssh/config

# format the hdfs
RUN hdfs namenode -format

ADD docker/assets/start_hadoop start_hadoop

EXPOSE 8020 50010 50020 50070 50075 50090

CMD ["-d"]
ENTRYPOINT ["./start_hadoop"]

It does the following:

  • A Hadoop binary is downloaded and installed
  • IPV6 is disabled because Hadoop does not support it
  • SSH auth is setup because Hadoop uses it for connections
  • Hadoop is configured with the proper Java install

For SSH, a config which frees us from having to manually accept new hosts is copied over:

Host *
    UserKnownHostsFile /dev/null
    StrictHostKeyChecking no

A core-site.xml config file is also added, which includes:

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
  <description>A base for other temporary directories.</description>

  <description>The name of the default file system.  A URI whose
  scheme and authority determine the FileSystem implementation.  The
  uri's scheme determines the config property (fs.SCHEME.impl) naming
  the FileSystem implementation class.  The uri's authority is used to
  determine the host, port, etc. for a filesystem.</description>

The important part here is the fs.defaultFS property which describes how others can access the HDFS. Here, the value is localhost, but that is replaced by the start_hadoop script (see below) with the container's "external" IP.

And finally, a start_hadoop script is copied over, which includes:


# get "external" docker ip
HDFS_IP=$(ifconfig eth0 | grep 'inet addr:' | cut -d: -f2 | awk '{print $1}')

# set the proper ip in the HDFS config
sed -i 's/localhost/'${HDFS_IP}'/g' $HADOOP_HOME/etc/hadoop/core-site.xml

/etc/init.d/ssh restart

if [[ $1 == "-d" ]]; then
    while true; do sleep 1000; done

if [[ $1 == "-bash" ]]; then

As mentioned, it replaces the localhost value in the core-site.xml config with the "external" IP so that others can connect to the HDFS.

It also starts the SSH service, then starts the HDFS, and, with the -d flag (which is passed in the above Dockerfile), emulates a foreground service so that the Docker container does not exit.


For the Mesos leader and followers, we first create a base Mesos image and then use that to create the leader and follower images.

The base Mesos image Dockerfile:

FROM ubuntu:14.04

ENV MESOS_V 0.24.0

# update
RUN apt-get update
RUN apt-get upgrade -y

# dependencies
RUN apt-get install -y wget openjdk-7-jdk build-essential python-dev python-boto libcurl4-nss-dev libsasl2-dev maven libapr1-dev libsvn-dev

# mesos
RUN wget http://www.apache.org/dist/mesos/${MESOS_V}/mesos-${MESOS_V}.tar.gz
RUN tar -zxf mesos-*.tar.gz
RUN rm mesos-*.tar.gz
RUN mv mesos-* mesos
RUN mkdir build
RUN ./configure
RUN make
RUN make install

RUN ldconfig

This just builds and installs Mesos.

The leader Dockerfile:

FROM mesos_base
ADD docker/assets/start_leader start_leader
ENTRYPOINT ["./start_leader"]

It exposes the Mesos leader port and copies over a start_leader script, which contains:


# get "external" docker IP
LEADER_IP=$(ifconfig eth0 | grep 'inet addr:' | cut -d: -f2 | awk '{print $1}')
mesos-master --registry=in_memory --ip=${LEADER_IP} --zk=zk://${ZOOKEEPER}/mesos

All this does is tell the leader to use its "external" IP, which is necessary so that the Mesos followers and the client can properly communicate with it.

It also requires a ZOOKEEPER env variable to be set; it is specified when the Docker container is run (see below).

The follower Dockerfile:

FROM mesos_base

ADD docker/assets/start_follower start_follower


# permissions fix

# use python3 for pyspark
RUN apt-get install python3
ENV PYSPARK_PYTHON /usr/bin/python3

ENTRYPOINT ["./start_follower"]

There is a bit more going on here. The Mesos follower port is exposed and a few env variables are set. The MESOS_SWITCH_USER variable is a fix for a permissions issue, and the PYSPARK_PYTHON lets Spark know that we will use Python 3.

Like the leader image, there is a start_follower script here, which is simple:

mesos-slave --master=zk://${ZOOKEEPER}/mesos

Again, it uses a ZOOKEEPER env variable which is specified when the container is run.

Building the images

Finally, we can build all the images:

sudo docker build -f Dockerfile.mesos -t mesos_base .
sudo docker build -f Dockerfile.follower -t mesos_follower .
sudo docker build -f Dockerfile.leader -t mesos_leader .
sudo docker build -f Dockerfile.zookeeper -t mesos_zookeeper .
sudo docker build -f Dockerfile.hadoop -t hadoop .

Running the cluster

With all the images built, we can start the necessary Docker containers.

First, start a Zookeeper container:

sudo docker run --name mesos_zookeeper -itP mesos_zookeeper

When it's running, make a note of its IP:

ZOOKEEPER_IP=$(sudo docker inspect --format '{{.NetworkSettings.IPAddress }}' $(sudo docker ps -aq --filter=name=mesos_zookeeper))

Then, start the Hadoop container:

sudo docker run --name hadoop -itP hadoop

Note that our container name here should not have underscores in it, because Java can't handle hostnames with underscores.

Then, start a Mesos leader container:

sudo docker run -e ZOOKEEPER=${ZOOKEEPER_IP}:2181 --name mesos_leader -itP mesos_leader

Note that we set the ZOOKEEPER env variable here.

Finally, start a Mesos follower container in the same fashion:

sudo docker run -e ZOOKEEPER=${ZOOKEEPER_IP}:2181 --name mesos_follower -itP mesos_follower

Using the cluster

With the client setup and the cluster containers running, we can start using PySpark from the client machine.

We'll do the classic word count example to demonstrate the process.

First, open a shell in the Hadoop container:

sudo docker exec -it hadoop bash

From this container, grab a text file to work with and put it in the HDFS so the Mesos followers can access it:

wget http://www.gutenberg.org/cache/epub/4300/pg4300.txt
hadoop fs -put pg4300.txt /sample.txt

Now, back in the client machine, we can put together a Python script to count the words in this file.

First, we need to know the Zookeeper host, so PySpark knows where to find the cluster, and the Hadoop IP, so PySpark knows where to grab the file from. We'll pass them in as command-line arguments and grab them using the sys library:

import sys
import pyspark

zookeeper = sys.argv[1]
hadoop_ip = sys.argv[2]

Then we can specify where to find the text:

src = 'hdfs://{}:8020/sample.txt'.format(hadoop_ip)

And configure PySpark:

conf = pyspark.SparkConf()

One important configuration option is spark.executor.uri, which tells Mesos followers where they can get the Spark binary to properly execute the tasks. This must be a prebuilt Spark archive, i.e. a Spark binary package. You can build it and host it yourself if you like.

conf.set('spark.executor.uri', 'http://d3kbcqa49mib13.cloudfront.net/spark-1.5.0-bin-hadoop2.6.tgz')

Then we can create the SparkContext with our config and define the task:

sc = pyspark.SparkContext(conf=conf)

lines = sc.textFile(src)
words = lines.flatMap(lambda x: x.split(' '))
word_count = (words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x+y))

Save this file as example.py.

There are a couple gotchas when running this script.

We cannot run it with a simple python example.py. If we do so, then PySpark will use the client's local IP, e.g. something like We want PySpark to use the client's Docker IP so that it can properly communicate with the other Docker containers, and specify this as an env variable called LIBPROCESS_IP:

export LIBPROCESS_IP=$(ifconfig docker0 | grep 'inet addr:' | cut -d: -f2 | awk '{print $1}')

Then, we must also specify the proper Python version for the client's Spark install:

export PYSPARK_PYTHON=/usr/bin/python3

Because we're also passing in the Zookeeper connection string and the Hadoop IP, let's get those too:

HADOOP_IP=$(sudo docker inspect --format '{{.NetworkSettings.IPAddress }}' $(sudo docker ps -aq --filter=name=hadoop))

And now we can run the script:

python example.py $ZOOKEEPER $HADOOP_IP

Multi-node/high-availability setup

So far we only have one follower, but to better emulate a multi-node setup, we want many followers. This is easy to do, just spin up more follower Docker containers with the proper ZOOKEEPER variable:

sudo docker run -e ZOOKEEPER=${ZOOKEEPER_IP}:2181 --name mesos_follower1 -itP mesos_follower
sudo docker run -e ZOOKEEPER=${ZOOKEEPER_IP}:2181 --name mesos_follower2 -itP mesos_follower
sudo docker run -e ZOOKEEPER=${ZOOKEEPER_IP}:2181 --name mesos_follower3 -itP mesos_follower
# etc

For a high-availability setup, we can also create many leaders in a similar way:

sudo docker run -e ZOOKEEPER=${ZOOKEEPER_IP}:2181 --name mesos_leader1 -itP mesos_leader
sudo docker run -e ZOOKEEPER=${ZOOKEEPER_IP}:2181 --name mesos_leader2 -itP mesos_leader
sudo docker run -e ZOOKEEPER=${ZOOKEEPER_IP}:2181 --name mesos_leader3 -itP mesos_leader
# etc

These leaders will all register with Zookeeper and Zookeeper will elect one to be the "active" leader. The followers will coordinate with Zookeeper to figure out which leader they should be talking to. If one leader goes down, Zookeeper will elect a new active leader in its place.

We can even have multiple Zookeeper containers, but I haven't yet tried it out.


This repo has all of the files mentioned with a script that makes it easy to spin up this entire setup.




Broca's Area
Broca's Area

At this year's OpenNews Code Convening, Alex Spangher of the New York Times and I worked on broca, which is a Python library for rapidly experimenting with new NLP approaches.

Conventional NLP methods - bag-of-words vector space representations of documents, for example - generally work well, but sometimes not well enough, or worse yet, not well at all. At that point, you might want to try out a lot of different methods that aren't available in popular NLP libraries.

Prior to the Code Convening, broca was little more than a hodgepodge of algorithms I'd implemented for various projects. During the Convening, we restructured the library, added some examples and tests, and implemented in the key piece of broca: pipelines.


The core of broca is organized around pipes, which take some input and produce some output, which are then chained into pipelines.

Pipes represent different stages of an NLP process - for instance, your first stage may involve preprocessing or cleaning up the document, the next may be vectorizing it, and so on.

In broca, this would look like:

from broca.pipeline import Pipeline
from broca.preprocess import Cleaner
from broca.vectorize import BoW

docs = [
    # ...
    # some string documents
    # ...

pipeline = Pipeline(

vectors = pipeline(docs)

Since a key part of broca is rapid prototyping, it makes it very easy to simultaneously try different pipelines which may vary in only a few components:

from broca.vectorize import DCS

pipeline = Pipeline(
        [BoW(), DCS()]

This would produce a multi-pipeline consisting of two pipelines: one which vectorizes using BoW, the other using DCS.

Multi-pipelines often have shared components. In the example above, Cleaner() is in both pipelines. To avoid redundant processing, a key part of broca's pipelines is that the output for each pipe is "frozen" to disk.

These frozen outputs are identified by a hash derived from the input data and other factors. If frozen output exists for a pipe and its input, that frozen output is "defrosted" and returned, saving unnecessary processing time.

broca's Cryo
broca's Cryo

This way, you can tweak different components of the pipeline without worrying about needing to re-compute a lot of data. Only the parts that have changed will be re-computed.

Included pipes

broca includes a few pipes:

  • broca.tokenize includes various tokenization methods, using lemmas and a few different keyword extractors.
  • broca.vectorize includes a traditional bag-of-words vectorizer, an implementation of "dismabiguated core semantics", and Doc2Vec.
  • broca.preprocess includes common preprocessors - cleaning punctuation, HTML, and a few others.

Other tools

Not everything in broca is a pipe. Also included are:

  • broca.similarity includes similarity methods for terms and documents.
  • broca.distance includes string distance methods (this may be renamed later).
  • broca.knowledge includes some tools for dealing with external knowledge sources (e.g. other corpora or Wikipedia).

Though at some point these may also become pipes.

Give us your pipes!

We made it really easy to implement your own pipes. Just inherit from the Pipe class, specify the class's input and output types, and implement the __call__ method (that's what's called for each pipe).

For example:

from broca.pipeline import Pipe

class MyPipe(Pipe):
    input = Pipe.type.docs
    output = Pipe.type.vecs

    def __init__(self, some_param):
        self.some_param = some_param

    def __call__(self, docs):
        # do something with docs to get vectors
        vecs = make_vecs_func(docs, self.some_param)
        return vecs

We hope that others will implement their own pipes and submit them as pull requests - it would be great if broca becomes a repository of sundry NLP methods which makes it super easy to quickly try a battery of techniques on a problem.

broca is available on GitHub and also via pip:

pip install broca