Simulating a multi-node (Py)Spark cluster using Docker

09.30.2015 | code
finally
finally

I'm working on a set of tools for the Coral Project to make building data analysis pipelines easy and, perhaps one day, accessible to even non-technical folks. Part of what will be offered is a way of easily toggling between running pipelines on a in parallel on a local machine or on a distributed computing cluster. That way, the pipelines that a small organization uses for their data can be adapted to a larger organization just by spinning up the setup described below and changing a configuration option.

I wanted to simulate a multi-node cluster for developing these tools, and couldn't find any guides for doing so. So after some research, here is one.

The setup that follows runs all on one machine (remember, it just simulates a multi-node cluster), but it should be easily adaptable to a real multi-node cluster by appropriately changing the IPs that the containers use the communicate.

I have made available a repo with the Dockerfiles and scripts described below.

The Stack

A lot goes into the cluster stack:

  • Spark - used to define tasks
  • Mesos - used for cluster management
  • Zookeeper - used for electing Mesos leaders
  • Hadoop - used for HDFS (Hadoop Distributed File System)
  • Docker - for containerizing the above

The Setup

There will be a client machine (or "control node"), which is the machine we're working from. In this walkthrough, the client machine also functions as the Docker host (where the Docker containers are run).

Docker containers are spun up for each other part of the stack, and they all communicate via their "external" Docker IPs.

Setting up the client

I'm assuming a Linux environment because that's what Docker works best with (on OSX you are probably running it in a Linux VM anyways). The following instructions are for Ubuntu but should be replicable on other distros.

The client needs to have Spark and Mesos installed to properly interact with the cluster.

Spark has precompiled binaries available on their downloads page which are easily installed:

# go to <https://spark.apache.org/downloads.html>
# select and download the version you want
tar -xzvf spark-*.tgz
rm spark-*.tgz
sudo mv spark* /usr/local/share/spark

Add the following to your ~/.bash_profile as well:

export SPARK_HOME=/usr/local/share/spark
export PATH=$SPARK_HOME/bin:$PATH

# so pyspark can be imported in python
export PYTHONPATH=$SPARK_HOME/python:$PYTHONPATH

PySpark has one final requirement, the py4j library:

pip install py4j

Mesos does not have any precompiled binaries, so you must compile it yourself:

MESOS_V=0.24.0

# sources available at <https://mesos.apache.org/gettingstarted/>
wget http://www.apache.org/dist/mesos/${MESOS_V}/mesos-${MESOS_V}.tar.gz
tar -zxf mesos-*.tar.gz
rm mesos-*.tar.gz

# dependencies
sudo apt-get install -y openjdk-7-jdk build-essential python-dev python-boto libcurl4-nss-dev libsasl2-dev maven libapr1-dev libsvn-dev

# by default, this installs to /usr/local
cd mesos*
mkdir build
cd build
../configure
make
sudo make install

Finally, we need to configure Spark to use a Mesos cluster:

cp $SPARK_HOME/conf/spark-env.sh.template $SPARK_HOME/conf/spark-env.sh
echo 'export MESOS_NATIVE_JAVA_LIBRARY=/usr/local/lib/libmesos.so' >> $SPARK_HOME/conf/spark-env.sh

That's all for the client.

Setting up the Docker images

Our "cluster" will consist of several Docker containers, with one (or more) for each part of the stack, so we create images for each.

Zookeeper

The Zookeeper image is straightforward:

FROM ubuntu:14.04

ENV ZOOKEEPER_V 3.4.6
ENV ZOOKEEPER_PATH /usr/local/share/zookeeper

# update
RUN apt-get update
RUN apt-get upgrade -y

# dependencies
RUN apt-get install -y wget openjdk-7-jre-headless

# zookeeper
RUN wget http://apache.arvixe.com/zookeeper/zookeeper-${ZOOKEEPER_V}/zookeeper-${ZOOKEEPER_V}.tar.gz
RUN tar -zxf zookeeper-*.tar.gz
RUN rm zookeeper-*.tar.gz
RUN mv zookeeper-* $ZOOKEEPER_PATH
RUN mv $ZOOKEEPER_PATH/conf/zoo_sample.cfg $ZOOKEEPER_PATH/conf/zoo.cfg

ENV PATH $PATH:$ZOOKEEPER_PATH/bin

EXPOSE 2181

ENTRYPOINT ["zkServer.sh"]
CMD ["start-foreground"]

A Zookeeper binary is downloaded and installed, then the default config is copied over. We start the Zookeeper service in the foreground so the Docker container does not immediately exit.

Hadoop

The Hadoop image is more involved:

FROM ubuntu:14.04

ENV HADOOP_V 2.7.1
ENV HADOOP_HOME /usr/local/hadoop
ENV HADOOP_PREFIX $HADOOP_HOME
ENV JAVA_HOME /usr/lib/jvm/java-7-openjdk-amd64
ENV PATH $PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
ENV HADOOP_TMP /var/hadoop/tmp

# update
RUN apt-get update
RUN apt-get upgrade -y

# dependencies
RUN apt-get install -y openssh-server openjdk-7-jdk wget

# disable ipv6 since hadoop does not support it
RUN echo 'net.ipv6.conf.all.disable_ipv6 = 1' >> /etc/sysctl.conf
RUN echo 'net.ipv6.conf.default.disable_ipv6 = 1' >> /etc/sysctl.conf
RUN echo 'net.ipv6.conf.lo.disable_ipv6 = 1' >> /etc/sysctl.conf

# hadoop
RUN wget http://apache.arvixe.com/hadoop/core/hadoop-${HADOOP_V}/hadoop-${HADOOP_V}.tar.gz
RUN tar -zxf hadoop-*.tar.gz
RUN rm hadoop-*.tar.gz
RUN mv hadoop-* $HADOOP_HOME

# hadoop tmp directory
RUN mkdir -p $HADOOP_TMP
RUN chmod 750 $HADOOP_TMP

# configs
RUN echo "export JAVA_HOME=$JAVA_HOME" >> $HADOOP_HOME/etc/hadoop/hadoop-env.sh
ADD docker/assets/core-site.xml $HADOOP_HOME/etc/hadoop/core-site.xml

# auth
# the provided config saves us from having
# to accept each new host key on connect
RUN ssh-keygen -q -N "" -t rsa -f /root/.ssh/id_rsa
RUN cp /root/.ssh/id_rsa.pub /root/.ssh/authorized_keys
ADD docker/assets/ssh_config /root/.ssh/config

# format the hdfs
RUN hdfs namenode -format

ADD docker/assets/start_hadoop start_hadoop

EXPOSE 8020 50010 50020 50070 50075 50090

CMD ["-d"]
ENTRYPOINT ["./start_hadoop"]

It does the following:

  • A Hadoop binary is downloaded and installed
  • IPV6 is disabled because Hadoop does not support it
  • SSH auth is setup because Hadoop uses it for connections
  • Hadoop is configured with the proper Java install

For SSH, a config which frees us from having to manually accept new hosts is copied over:

Host *
    UserKnownHostsFile /dev/null
    StrictHostKeyChecking no

A core-site.xml config file is also added, which includes:

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
  <name>hadoop.tmp.dir</name>
  <value>/var/hadoop/tmp</value>
  <description>A base for other temporary directories.</description>
</property>

<property>
  <name>fs.defaultFS</name>
  <value>hdfs://localhost:8020</value>
  <description>The name of the default file system.  A URI whose
  scheme and authority determine the FileSystem implementation.  The
  uri's scheme determines the config property (fs.SCHEME.impl) naming
  the FileSystem implementation class.  The uri's authority is used to
  determine the host, port, etc. for a filesystem.</description>
</property>
</configuration>

The important part here is the fs.defaultFS property which describes how others can access the HDFS. Here, the value is localhost, but that is replaced by the start_hadoop script (see below) with the container's "external" IP.

And finally, a start_hadoop script is copied over, which includes:

#!/bin/bash

# get "external" docker ip
HDFS_IP=$(ifconfig eth0 | grep 'inet addr:' | cut -d: -f2 | awk '{print $1}')

# set the proper ip in the HDFS config
sed -i 's/localhost/'${HDFS_IP}'/g' $HADOOP_HOME/etc/hadoop/core-site.xml

/etc/init.d/ssh restart
start-dfs.sh

if [[ $1 == "-d" ]]; then
    while true; do sleep 1000; done
fi

if [[ $1 == "-bash" ]]; then
    /bin/bash
fi

As mentioned, it replaces the localhost value in the core-site.xml config with the "external" IP so that others can connect to the HDFS.

It also starts the SSH service, then starts the HDFS, and, with the -d flag (which is passed in the above Dockerfile), emulates a foreground service so that the Docker container does not exit.

Mesos

For the Mesos leader and followers, we first create a base Mesos image and then use that to create the leader and follower images.

The base Mesos image Dockerfile:

FROM ubuntu:14.04

ENV MESOS_V 0.24.0

# update
RUN apt-get update
RUN apt-get upgrade -y

# dependencies
RUN apt-get install -y wget openjdk-7-jdk build-essential python-dev python-boto libcurl4-nss-dev libsasl2-dev maven libapr1-dev libsvn-dev

# mesos
RUN wget http://www.apache.org/dist/mesos/${MESOS_V}/mesos-${MESOS_V}.tar.gz
RUN tar -zxf mesos-*.tar.gz
RUN rm mesos-*.tar.gz
RUN mv mesos-* mesos
WORKDIR mesos
RUN mkdir build
RUN ./configure
RUN make
RUN make install

RUN ldconfig

This just builds and installs Mesos.

The leader Dockerfile:

FROM mesos_base
ADD docker/assets/start_leader start_leader
EXPOSE 5050
ENTRYPOINT ["./start_leader"]

It exposes the Mesos leader port and copies over a start_leader script, which contains:

#!/bin/bash

# get "external" docker IP
LEADER_IP=$(ifconfig eth0 | grep 'inet addr:' | cut -d: -f2 | awk '{print $1}')
mesos-master --registry=in_memory --ip=${LEADER_IP} --zk=zk://${ZOOKEEPER}/mesos

All this does is tell the leader to use its "external" IP, which is necessary so that the Mesos followers and the client can properly communicate with it.

It also requires a ZOOKEEPER env variable to be set; it is specified when the Docker container is run (see below).

The follower Dockerfile:

FROM mesos_base

ADD docker/assets/start_follower start_follower

EXPOSE 5051

# permissions fix
ENV MESOS_SWITCH_USER 0

# use python3 for pyspark
RUN apt-get install python3
ENV PYSPARK_PYTHON /usr/bin/python3

ENTRYPOINT ["./start_follower"]

There is a bit more going on here. The Mesos follower port is exposed and a few env variables are set. The MESOS_SWITCH_USER variable is a fix for a permissions issue, and the PYSPARK_PYTHON lets Spark know that we will use Python 3.

Like the leader image, there is a start_follower script here, which is simple:

#!/bin/bash
mesos-slave --master=zk://${ZOOKEEPER}/mesos

Again, it uses a ZOOKEEPER env variable which is specified when the container is run.

Building the images

Finally, we can build all the images:

sudo docker build -f Dockerfile.mesos -t mesos_base .
sudo docker build -f Dockerfile.follower -t mesos_follower .
sudo docker build -f Dockerfile.leader -t mesos_leader .
sudo docker build -f Dockerfile.zookeeper -t mesos_zookeeper .
sudo docker build -f Dockerfile.hadoop -t hadoop .

Running the cluster

With all the images built, we can start the necessary Docker containers.

First, start a Zookeeper container:

sudo docker run --name mesos_zookeeper -itP mesos_zookeeper

When it's running, make a note of its IP:

ZOOKEEPER_IP=$(sudo docker inspect --format '{{.NetworkSettings.IPAddress }}' $(sudo docker ps -aq --filter=name=mesos_zookeeper))

Then, start the Hadoop container:

sudo docker run --name hadoop -itP hadoop

Note that our container name here should not have underscores in it, because Java can't handle hostnames with underscores.

Then, start a Mesos leader container:

sudo docker run -e ZOOKEEPER=${ZOOKEEPER_IP}:2181 --name mesos_leader -itP mesos_leader

Note that we set the ZOOKEEPER env variable here.

Finally, start a Mesos follower container in the same fashion:

sudo docker run -e ZOOKEEPER=${ZOOKEEPER_IP}:2181 --name mesos_follower -itP mesos_follower

Using the cluster

With the client setup and the cluster containers running, we can start using PySpark from the client machine.

We'll do the classic word count example to demonstrate the process.

First, open a shell in the Hadoop container:

sudo docker exec -it hadoop bash

From this container, grab a text file to work with and put it in the HDFS so the Mesos followers can access it:

wget http://www.gutenberg.org/cache/epub/4300/pg4300.txt
hadoop fs -put pg4300.txt /sample.txt

Now, back in the client machine, we can put together a Python script to count the words in this file.

First, we need to know the Zookeeper host, so PySpark knows where to find the cluster, and the Hadoop IP, so PySpark knows where to grab the file from. We'll pass them in as command-line arguments and grab them using the sys library:

import sys
import pyspark

zookeeper = sys.argv[1]
hadoop_ip = sys.argv[2]

Then we can specify where to find the text:

src = 'hdfs://{}:8020/sample.txt'.format(hadoop_ip)

And configure PySpark:

conf = pyspark.SparkConf()
conf.setMaster('mesos://zk://{}/mesos'.format(zookeeper))
conf.setAppName('my_test_app')

One important configuration option is spark.executor.uri, which tells Mesos followers where they can get the Spark binary to properly execute the tasks. This must be a prebuilt Spark archive, i.e. a Spark binary package. You can build it and host it yourself if you like.

conf.set('spark.executor.uri', 'http://d3kbcqa49mib13.cloudfront.net/spark-1.5.0-bin-hadoop2.6.tgz')

Then we can create the SparkContext with our config and define the task:

sc = pyspark.SparkContext(conf=conf)

lines = sc.textFile(src)
words = lines.flatMap(lambda x: x.split(' '))
word_count = (words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x+y))
print(word_count.collect())

Save this file as example.py.

There are a couple gotchas when running this script.

We cannot run it with a simple python example.py. If we do so, then PySpark will use the client's local IP, e.g. something like 192.168.1.2. We want PySpark to use the client's Docker IP so that it can properly communicate with the other Docker containers, and specify this as an env variable called LIBPROCESS_IP:

export LIBPROCESS_IP=$(ifconfig docker0 | grep 'inet addr:' | cut -d: -f2 | awk '{print $1}')

Then, we must also specify the proper Python version for the client's Spark install:

export PYSPARK_PYTHON=/usr/bin/python3

Because we're also passing in the Zookeeper connection string and the Hadoop IP, let's get those too:

ZOOKEEPER=$ZOOKEEPER_IP:2181
HADOOP_IP=$(sudo docker inspect --format '{{.NetworkSettings.IPAddress }}' $(sudo docker ps -aq --filter=name=hadoop))

And now we can run the script:

python example.py $ZOOKEEPER $HADOOP_IP

Multi-node/high-availability setup

So far we only have one follower, but to better emulate a multi-node setup, we want many followers. This is easy to do, just spin up more follower Docker containers with the proper ZOOKEEPER variable:

sudo docker run -e ZOOKEEPER=${ZOOKEEPER_IP}:2181 --name mesos_follower1 -itP mesos_follower
sudo docker run -e ZOOKEEPER=${ZOOKEEPER_IP}:2181 --name mesos_follower2 -itP mesos_follower
sudo docker run -e ZOOKEEPER=${ZOOKEEPER_IP}:2181 --name mesos_follower3 -itP mesos_follower
# etc

For a high-availability setup, we can also create many leaders in a similar way:

sudo docker run -e ZOOKEEPER=${ZOOKEEPER_IP}:2181 --name mesos_leader1 -itP mesos_leader
sudo docker run -e ZOOKEEPER=${ZOOKEEPER_IP}:2181 --name mesos_leader2 -itP mesos_leader
sudo docker run -e ZOOKEEPER=${ZOOKEEPER_IP}:2181 --name mesos_leader3 -itP mesos_leader
# etc

These leaders will all register with Zookeeper and Zookeeper will elect one to be the "active" leader. The followers will coordinate with Zookeeper to figure out which leader they should be talking to. If one leader goes down, Zookeeper will elect a new active leader in its place.

We can even have multiple Zookeeper containers, but I haven't yet tried it out.

Repo

This repo has all of the files mentioned with a script that makes it easy to spin up this entire setup.

References