don't worry, it's probably fine

Using Docker with Apache Flume - Part 2

24 Aug 2014

java docker flume

In the previous post of this series, we saw how to use docker to set up a basic Apache Flume topology - now we’ll take a closer look at achieving more reliable flows using shared volumes between containers. Specifically, we’ll see examples of using Docker to introduce replayability to a streaming application based on Flume.

In search of replayability

Replayability is the ability to easily “rewind” the current state of an application to a fixed point and upon resuming, the application will end up in the same state that it started in. In this way, it behaves very much like a statemachine which makes it easy to reason about, and we can start considering the case where we’re passed bad or malformed data - can we easily reproduce the problem and thus fix it?

In the example last time, if we have badly formed data that breaks the sink during ingestion, the entire flume agent has potential to shut down. In this post, we’ll take the previous Flume example and implement replayability by decoupling the original Source/Sink and introducing a human-readable intermediate state (or spool) between them.

Sink or swim

One of Flume’s more reliable sources is the SpoolingDirectory source: it watches a directory for new files and dumps the contents of these files line-by-line as events, then renames them so it doesn’t add the same content twice. Unfortunately there is no included sink which would form the opposite side of this, which would operate like a FileSink but would roll files into a new directory on completion. As part of our open-source program at Unruly, we have built such a sink and released it on Github (SpoolingDirectoryFileSink). With this new sink in our toolbox, we can start building our reliable flows.

The architecture for our flow is illustrated below:

A two part Flume flow

Rather than going from Source to Sink directly, we will set up the previously mentioned layer between them as files being spooled into a data directory - this directory is then accessible by both Docker containers as a shared volume.

Shared volumes in Docker

Shared volumes exist as a way of making data available to more than one container without them having direct knowledge of each other. As an example, consider a webserver that outputs access logs to a shared volume. We can mount that shared volume onto other container to examine those log files but the second container does not need to know where the files come from - existence is a sufficient condition. In the same way we hope to introduce stability between components of our flume multi-process (so if our Sink fails, it can pick up where it left off on restart and the source is not prohibited from spooling events).

To create container 1, we need a Flume configuration …

fsource.sinks = spoolSink
fsource.sources = netcatSource
fsource.channels = inMemoryChannel

fsource.sources.netcatSource.type = netcat
fsource.sources.netcatSource.bind = 0.0.0.0
fsource.sources.netcatSource.port = 44444
fsource.sources.netcatSource.channels = inMemoryChannel

fsource.channels.inMemoryChannel.type = memory
fsource.channels.inMemoryChannel.capacity = 1000
fsource.channels.inMemoryChannel.transactionCapacity = 100

# Set up the spooling directory sink
fsource.sinks.spoolSink.type = com.unrulymedia.flume.SpoolingDirectoryFileSink
fsource.sinks.spoolSink.channel = inMemoryChannel
fsource.sinks.spoolSink.sink.directory = /var/tmp/sink
fsource.sinks.spoolSink.sink.rollInterval = 5

… and a Dockerfile.

FROM probablyfine/flume

# Create the Spooling Directory Sink according to the Github readme
ADD target/flume-spooling-directory-sink.jar /opt/flume/lib/
ADD flume.conf /var/tmp/

# Set up a volume in /var/tmp
VOLUME /var/tmp/sink

# Launch flume
CMD [ "flume-ng", "agent", "-c", "/opt/flume/conf",
  "-f", "/var/tmp/flume.conf", "-n", "fsource", "-Dflume.root.logger=INFO,console"]

For this example, I’ve chosen the directory spool sink to rotate files every 5 seconds. Build the image as before from this Dockerfile (with docker build -t fsource .) gives us the first half of our topology.

And to create container 2, we have the configuration for Flume …

fsink.sinks = logSink
fsink.sources = spoolSource
fsink.channels = inMemoryChannel

fsink.channels.inMemoryChannel.type = memory
fsink.channels.inMemoryChannel.capacity = 1000
fsink.channels.inMemoryChannel.transactionCapacity = 100

fsink.sinks.logSink.type = logger
fsink.sinks.logSink.channel = inMemoryChannel

fsink.sources.spoolSource.type = spooldir
fsink.sources.spoolSource.channels = inMemoryChannel
fsink.sources.spoolSource.spoolDir = /var/tmp/sink
fsink.sources.spoolSource.fileHeader = true

… and Docker

FROM probablyfine/flume

ADD flume.conf /var/tmp/

CMD [ "flume-ng", "agent", "-c", "/opt/flume/conf",
  "-f", "/var/tmp/flume.conf", "-n", "fsink", "-Dflume.root.logger=INFO,console"]

Notice that we set up the spoolDir as the same path that we set up the volume on for container 1. Build the image (docker build -t fsink .) and then we’re ready to start running our containers.

Using shared containers

Sharing containers would be hardpressed to be more simple - we just need to use the --volumes-from flag.

  1. Start container1 and expose ports

    docker run -p 444:44444 --name container1 -t fsource

  2. Start container2 and mount volume

    docker run --name container2 --volumes-from container1 -t fsink

That’s it. Docker mounts the volume from container1 in the same place on container2 and starts feeding from the spool directory. As we did in the previous post, we’ll write to port 444 (using echo ding | nc localhost 444) and see the events being logged on container2.

...
2014-08-24 16:07:40,252 (SinkRunner-PollingRunner-DefaultSinkProcessor)
  [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)]
  Event: { headers:{file=/var/tmp/sink/1408896413071-6} body: 64 69 6E 67 ding }
...

There’s our data! There is a more of a lag now that we are batching events in 5s intervals, but we’re compensated with additional reliability and the ability to have a readable intermediate state. Here the human-readability really shines: if we have a breakage caused by bad data then not only can we diagnose it by looking at a human-readable event format, but also just remove the offending data by deleting the line.

Replaying the input data to the sink part is also easy - since the SpoolingDirectory source renames an input file from somefile to somefile.COMPLETED, to roll back the state by 30 minutes, we just need to execute the following

find . -name '*.COMPLETED' -mmin -30 | sed 's/\(.*\).\(COMPLETED\)/mv \1.\2 \1/' | sh

This is Flume specific, but when building replayability into an application it’s advised to make your rollback mechanism easy - it could be as simple as an index stored in a flat file or database.

Summary

Through these two blogposts, we’ve used Docker as a way to both containerise a simple application like Apache Flume and to effectively decouple parts of an application and gain resiliency and replayability along the way, both of which are useful playbooks for running a distributed application.