Joe Crobak's Website

Header Image

Migration to Jekyll

Posted on Sep 1, 2014

I've migrated my blog from self-hosted Wordpress to Jekyll on S3. The site is more responsive (and much less of a security threat). Comments are now hosted on disqus.

These articles were particularly helpful to me, and you might find them useful if you're doing a similar migration:

Leave a Comment

using jarjar to solve hive and pig antlr conflicts

Posted on Oct 20, 2012

Pig 0.9+ and Hive 0.7+ (and maybe older versions, too) both use antlr. Unfortunately, they use incompatible versions which causes problems if you try to pull in both pig and hive via ivy or maven. Oozie has come up with a deployment workaround for this problem, but HCatalog is still on Pig 0.8 because of the above issue.

Per the recommendation of a co-worker (thanks johng!), I checked out jarjar to shade the pig jar to avoid the conflicts of antlr and all the other dependencies in the pig "fat jar." Here are the steps that I used:

  1. Download jarjar 1.3 (note that the 1.4 release of jarjar seems to have a bad artifact).
  2. Generate a list of "rules" to rewrite the non-pig classes in the pig-withouthadoop jar:
    1. jar tf pig-0.10.0-cdh4.1.0-withouthadoop.jar | \
        grep "\.class" | grep -v org.apache.pig | \
        python -c "import sys;[sys.stdout.write('.'.join(arg.split('/')[:-1]) +'\n') for arg in sys.stdin]" | \
        sort | uniq | \
        awk '{ print "rule " $1 ".* org.apache.pig.jarjar.@0" }' > \
    2. The above command generates one rule per package containing a class file, rewriting the class with a prefix of org.apache.pig.jarjar. The rules are stored in pig-jarjar-automated.rules. See below for the rules file that I generated.

  3. Run jarjar with the rules listed to generate a new jar:
    1. java -jar jarjar-1.3.jar process \
        pig-jarjar-automated.rules \
        pig-0.10.0-cdh4.1.0-withouthadoop.jar \
    2. Checkout jarjar's command line docs for more info, including the rules file format.
  4. Check the contents of your new jar.
    1.  jar tf pig-0.10.0-cdh4.1.0-withouthadoop-jarjar.jar | \
        egrep ".class$" | grep -c -v "org/apache/pig"
    2. The above command should return 0 to show that all classes have been rewritten under org/apache/pig.
That's it! We uploaded this jar into our internal nexus repo to use within tests. We're still using the vanilla pig jar for our pig CLI and within oozie, but we may do further testing to see what happens once we start rolling out hcatalog.

For pig-0.10.0-cdh4.1, the rules file looks like this:

rule* org.apache.pig.jarjar.@0
rule* org.apache.pig.jarjar.@0
rule* org.apache.pig.jarjar.@0
rule* org.apache.pig.jarjar.@0
rule* org.apache.pig.jarjar.@0
rule* org.apache.pig.jarjar.@0
rule* org.apache.pig.jarjar.@0
rule* org.apache.pig.jarjar.@0
rule* org.apache.pig.jarjar.@0
rule* org.apache.pig.jarjar.@0
rule* org.apache.pig.jarjar.@0
rule* org.apache.pig.jarjar.@0
rule dk.brics.automaton.* org.apache.pig.jarjar.@0
rule jline.* org.apache.pig.jarjar.@0
rule org.antlr.runtime.* org.apache.pig.jarjar.@0
rule org.antlr.runtime.debug.* org.apache.pig.jarjar.@0
rule org.antlr.runtime.misc.* org.apache.pig.jarjar.@0
rule org.antlr.runtime.tree.* org.apache.pig.jarjar.@0
rule* org.apache.pig.jarjar.@0
rule org.stringtemplate.v4.* org.apache.pig.jarjar.@0
rule org.stringtemplate.v4.compiler.* org.apache.pig.jarjar.@0
rule org.stringtemplate.v4.debug.* org.apache.pig.jarjar.@0
rule org.stringtemplate.v4.gui.* org.apache.pig.jarjar.@0
rule org.stringtemplate.v4.misc.* org.apache.pig.jarjar.@0

Leave a Comment

Workflow Engines for Hadoop

Posted on Jul 5, 2012

Over the past 2 years, I've had the opportunity to work with two open-source workflow engines for Hadoop. I used and contributed to Azkaban, written and open-sourced by LinkedIn, for over a year while I worked at Adconion. Recently, I've been working with Oozie, which is bundled as part of Cloudera's CDH3. Both systems have a lot of great features but also a number of weaknesses. The strengths and weaknesses of both systems don't always overlap, so I hope that each can learn from the other to improve the tools available for Hadoop.

In that vain, I'm going to produce a head-head comparison of the two systems considering a number of different features. In the follow comparisons, I'm considering the version of Azkaban found in master on github (with exceptions noted) and Oozie from CDH3u3.

Job Definition

Both systems support defining a workflow as a DAG (directed acyclic graph) made up of individual steps.


In Azkaban, a "job" is defined as a java properties file. You specify a job type, any parameters, and any dependencies that job has. Azkaban doesn't have any notion of a self-contained workflow -- a job can depend on any other job in the system. Each job has a unique identifier which is used to reference dependent jobs.


In Oozie, a "jobs" are referred to as "actions". A workflow is defined in an XML file, which specifies a start action. There are special actions such as fork and join (which fork and join dependency graph), as well as the ability to reference a "sub-workflow" defined in another XML file.

Job Submission


To submit a job to Azkaban, one creates a tar.gz or zip archive and uploads it via Azkaban's web interface. The archive contains any jars necessary to run the workflow, which are automatically added to the classpath of job at launch time.

It's possible to bypass the archive upload (this is what we did at Adconion), and directly place the files on the filesystem then tell Azkaban to reload the workflow definitions. I liked this approach because we were able to use RPMs to install workflows, and thus gave us the ability to rollback to a previous version.


Oozie comes with a command-line program for submitting jobs. This command-line program interacts with the Oozie server via REST. Unfortunately, the REST api (at least in our version of Oozie) doesn't have very good error reporting. It's actually very easy to cause the server to 500 in which case you have to investigate Oozie's logs to guess at the problem.

Before submitting a job, the job definition, which is a folder contain xml and jar files, must be uploaded to HDFS. Any jars that are needed by the workflow should be placed in the "lib" directory of the workflow folder. Optionally, Oozie can include "system" libraries by setting a sytem library path in oozie-site and adding a property setting. Note that *only* HDFS is supported, which makes testing an Oozie workflow cumbersome since you must spin-up a MiniDFS cluster.

Running a Job


Azkaban provides a simple web-interface for running a job. Each job is given a name in its definition, and one can choose the appropriate job in the UI and click "Run Now". It's also easy to construct a HTTP POST to kick-off a job via curl or some other tool.

Determining which job to launch, though, can be quite confusing. With Azkaban, you don't launch via the "first" or "start" node in your DAG, but rather, you find the last node in your DAG and run it. This causes all the (recursive) dependent jobs to run.  This model means that you sometimes have to jump through some hoops to prevent duplicate work from occurring if you have multiple sinks with a common DAG.

Azkaban runs the driver program as a child process of the Azkaban process. This means that you're resource constrained by the memory on the box, which caused us to DOS or box a few times (Azkaban does have a feature to limit the number of simultaneous jobs, which we did use to alleviate this problem. But then your job-submission turns into FIFO).


Once a workflow is uploaded to HDFS, one submits or runs a job using the Oozie client. You must give Oozie the full path to your workflow.xml file in HDFS as a parameter to the client. This can be cumbersome since the path changes if you version your workflows (and if you don't version your workflow, a re-submission could cause a running job to fail). Job submission typically references a java properties file that contains a number of parameters for the workflow.

Oozie runs the "driver" program (e.g. PigMain or HiveMain or your MapReduce program's main) as a MapTask. This has a few implications:

  1. If you have the wrong scheduler configuration, it's possible to end up with all map task slots occupied only by these "driver" tasks.
  2. If a map task dies (disable preemption and hope that your TaskTrackers don't die), you end up with an abandoned mapreduce job. Unless you kill the job, retrying at the Oozie level will likely fail.
  3. There's another level of indirection to determine what's happened if your job failed. You have to navigate from Oozie to Hadoop to the Hadoop job to the map task to the map task's output to see what happened.
  4. But also, you don't have a single box that you might DOS.

Scheduling a Job


Azkaban provides a WebUI for scheduling a job with cron-like precision. It's rather easy to recreate this HTTP POST from the command-line.


Oozie has a great feature called "coordinators". A coordinator is an XML file that optionally describes datasets that a workflow consumes, and also describes the frequency of your dataset. For example, you can tell it that your dataset should be created daily at 1am. If there are input datasets described, then your workflow will only be launched when those datasets are available.

A coordinator requires a "startDate", which is rather annoying in the usual case (I just want to launch this workflow today and going forward… we have taken to making the startDate a parameter since we don't necessarily know when the coordinator will be released), but also makes it very easy to do a backfill of your data. E.g. if you have a new workflow that you want to run over all data from the first of the year onwards, just specify a startDate of Jan 1st.

Azkaban doesn't include anything like Oozie's coordinators. At Adconion, we wrote our own version of it, which also supported some nice features like reruns when data arrive late.



Azkaban doesn't support secure Hadoop. That means, if you're running CDH3 or Hadoop 0.20.200+, that all of your jobs will be submitted to Hadoop as a single user. There have been discussions about fixing this, and I know that Adconion was working on something. Even so, with the fair scheduler it's possible to assign jobs to different pools.


Oozie has built-in support for secure Hadoop including kerberos. We haven't used this, but it does mean that you have to configure Hadoop to allow Oozie to proxy as other users. Thus, jobs are submitted to the cluster as the user that submitted the job to Oozie (although it's possible to override this in a non-kerberos setting).

Property Management


Azkaban has a notion of "global properties" that are embedded within Azkaban itself. These global properties can be referenced from within a workflow, and thus a generic workflow can be built as long as different values for these global properties are specified in different environments (e.g. testing, staging, production). Typical examples of global properties are things like the location of the pig global props and database usernames and passwords.

Azkaban determines which Hadoop cluster to talk to by checking for HADOOP_HOME and HADOOP_CONF_DIR directories containing core-site.xml and mapred-site.xml entries. This also allows you to specify things like the default number of reducers very easily.

Global properties are nice, because if you need to tweak one you don't have to redeploy the workflows that depend on them.


Oozie doesn't have a notion of global properties. All properties must be submitted as part of every job run. This include the jobtracker and the namenode (so make sure you have CNAMEs setup for those in case they ever change!). Also, Oozie doesn't let you refer to anything with a relative path (including sub-workflows!), so we've taken to setting a property called workflowBase that our tooling provides.

At foursquare, we've had to build a bunch of tooling around job submission so that we don't have to keep around all of these properties in each of our workflows. We're still stuck with resubmitting all coordinators, though, if we have to make a global change. Also, the jobtracker/namenode settings are extra annoying because you *must* specify these in each and every workflow action. Talk about boilerplate. I assume that since Yahoo has use-cases for supporting multiple clusters for a particular Oozie, but the design over-complicates things for the typical case.



A neat feature of Azkaban is partial reruns - i.e. if your 10 step workflow fails on step 8, then you can pickup from step 8 and just run the last 3 steps. This was possible to do via the UI. This was an attractive feature of Azkaban, but we didn't use it.


In order to get a similar feature in Oozie, each action in your workflow must be a sub-workflow, then you can run the individual sub-workflows. At least in theory -- it turns out that you have to set so many properties that it becomes untenable, and even with the right magic incantation, I couldn't get this to work well.

Reruns of failed days in a coordinator are easy, but only in an all-or-nothing sense -- if the last step of the workflow failed, there's no easy way to rerun it.



Azkaban has a phenomenal UI for viewing workflows (including visualizing the DAG!), run histories, submitting workflows, creating schedules, and more. The UI does have some bugs, such as when you run multiple instances of the same workflow, the history page gets confused. But in general, it's very easy to tell what the state of the system is.


The Oozie UI, on the other hand, is not very useful. It's all Aaax, but is formatted in a window sized for a 1999 monitor. It's laggy, doubl-clicks don't always work, and things that should be links aren't. It's nearly impossible to navigate once you have a non-trivial number of jobs because jobs aren't named with any human-readable form, the UI doesn't support proper sorting, and it's too laggy.



Azkaban supports a global email notification whenever a job finishes. This is a nice and cheap mechanism to detect failures. Also, my Adconion-colleague Don Pazel contributed a notification system that can be stitched up to detect failures, run times, etc and expose these via JMX or HTTP. That's what we did at Adconion, but that piece wasn't open-sourced.


With Oozie, it's possible to have an email action that mails on success or failure, but an action has to be defined for each workflow. Since there's no good way to detect failure, we've written a workflow that uses the Oozie REST api to check the status of jobs and then sends us a daily email. This is far from ideal since we sometimes don't learn about a failure until hours after it occurred.



Testing with Azkaban can be achieved by instantiating the Azkaban JobRunner and using the java api to submit a job. We had a lot of success with this at Adconion, and tests ran in a matter of seconds.


Oozie has a LocalOozie utility, but it requires spinning up a HDFS cluster (since Oozie has lots of hard-coded checks that data lives in HDFS). Thus, integration testing is slow (on the order of a minute for a single workflow).

Oozie also has a class that can validate a schema, which we've incorporated to our build. But that doesn't catch things like parameter typos, or referencing non-existant actions.

Custom Job Types


Writing a custom job is fairly straightforward. The Azkaban API has some abstract classes you can subclass. Unfortunately, you must recompile Azkaban to expose a new job type.


Admittedly, I haven't tried this. But the action classes that I've seen are well into the hundreds of lines of code.

Baked-in support


Azkaban has baked-in support for Pig, java, shell, and mapreduce jobs.


Oozie has baked-in support for Pig, Hive, and Java. The shell and ssh actions have been deprecated. In addition, though, an Oozie action can have a "prepare" statement to cleanup directories to which you might want to write. But these directories *must* be HDFS, which means that if you use <prepare> then your code is less testable.

Storing State


Azkaban uses JSON files on the filesystem to store state. It caches these files in memory using a LRU cache, which makes the UI responsive. The JSON files are also easy to investigate if you want to poke-around bypassing the UI. Creating a backup is an easy snapshot of a single directory on a filesystem.


Oozie uses an RDBMS for storing state, and one must backup that RDBMS via whatever mechanism in order to create a backup.


Both systems feature rich documentation. Oozie's documentation tends to be much longer and larger since it includes XML fragments as well as details of the XML schemas. Over the years, Azkaban's documentation has at times fallen out of sync with implementation, but the general docs are still maintained.

A note on Oozie versions

To be fair to Oozie, I haven't tried the latest version, yet. Hopefully many issues I've noted are fixed, but if not, it'll be easier to file bug reports once on the latest versions. A number of bugs I found in the CDH3u3 version of Oozie were either fixed or not applicable to trunk, so it became difficult to keep track of what was what.


Both Azkaban and Oozie offer substantial features and are powerful workflow engines. The weaknesses and strengths of the systems tend to complement one-another, and it'd be fantastic if each system integrated the strengths of the other to improve.

It's worth noting that there are also a number of other workflow systems available that I haven't used. I am not discounting these systems, but I have zero authority to speak on them. Lot's of folks also seem to be using in-house engines, and it'd be fantastic to see more work open-sourced.

Writing a general-purpose workflow engine is very hard, and there are certainly remnants of the "LinkedIn" or "Yahoo" way of doing things in each system. As communities grow, hopefully these engines will start to lose those types of annoyances.

The second-half of 2012 could be very interesting with workflow-engine improvements. For example, there is talk of a new UI for building Oozie workflows, HCatalog and Oozie integration could be interesting, and YARN integration could make for a better-solution to distributing the "driver" programs for workflows (I've heard rumblings that Oozie could go down this path).

Lastly, I realize that this post more than likely has mistakes or oversights. These are inadvertent -- if you find any, please make a note in the comments, and I will try to correct.

Leave a Comment

Getting Started with Apache Hadoop 0.23.0

Posted on Dec 4, 2011

Hadoop 0.23.0 was released November 11, 2011. Being the future of the Hadoop platform, it's worth checking out even though it is an alpha release.

Note: Many of the instructions in this article came from trial and error, and there are lots of alternative (and possibly better ways) to configure the systems. Please feel free to suggest improvements in the comments. Also, all commands were only tested on Mac OS X.


To get started, download the hadoop-0.23.0.tar.gz file from one of the mirrors here:

Once downloaded, decompress the file. The bundled documentation is available in share/doc/hadoop/index.html

Notes for Users of Previous Versions of Hadoop

The directory layout of the hadoop distribution changed in hadoop 0.23.0 and 0.20.204 vs. previous versions. In particular, there are now sbin, libexec, and etc directories in the root of distribution tarball.

scripts and executables

In hadoop 0.23.0, a number of commonly used scripts from the bin directory have been removed or drastically changed.  Specifically, the following scripts were removed (vs

  • hadoop-daemon(s).sh
  • and
  • and
  • and
  • and
  • task-controller

The start/stop mapred-related scripts have been replaced by "map-reduce 2.0" scripts called yarn-*.  The and scripts no longer start or stop HDFS, but they are used to start and stop the yarn daemons.  Finally, bin/hadoop has been deprecated. Instead, users should use bin/hdfs and bin/mapred.

Hadoop distributions now also include scripts in a sbin directory. The scripts include,, and (and the stop versions of those scripts).

configuration directories and files

The conf directory that comes with Hadoop is no longer the default configuration directory.  Rather, Hadoop looks in etc/hadoop for configuration files.  The libexec directory contains scripts and for configuring where Hadoop pulls configuration information, and it's possible to override the location of the configuration directory the following ways:

  • calls in $HADOOP_COMMON_HOME/libexec and $HADOOP_HOME/libexec
  • accepts a --config option for specifying a config directory, or the directory can be specified using $HADOOP_CONF_DIR.
    • This scripts also accepts a --hosts parameter to specify the hosts / slaves
    • This script uses variables typically set in, such as: $JAVA_HOME, $HADOOP_HEAPSIZE, $HADOOP_CLASSPATH, $HADOOP_LOG_DIR, $HADOOP_LOGFILE and more.  See the file for a full list of variables.

Configure HDFS

To start hdfs, we will use sbin/ which pulls configuration from etc/hadoop by default. We'll be putting configuration files in that directory, starting with core-site.xml.  In core-site.xml, we must specify a

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

Next, we want to override the locations that the NameNode and DataNode store data so that it's in a non-transient location. The two relevant parameters are and  We also set replication to 1, since we're using a single datanode.

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration> <property> <name>dfs.replication</name> <value>1</value> </property> <property> <name></name> <value>file:/Users/joecrow/Code/hadoop-0.23.0/data/hdfs/namenode</value> </property> <property> <name></name> <value>file:/Users/joecrow/Code/hadoop-0.23.0/data/hdfs/datanode</value> </property> </configuration>


  • as of HDFS-456 and HDFS-873, the namenode and datanode dirs should be specified with a full URI.
  • by default, hadoop starts up with 1000 megabytes of RAM allocated to each daemon. You can change this by adding a to etc/hadoop. There's a template that can be added with: $ cp ./share/hadoop/common/templates/conf/ etc/hadoop
    • The template sets up a bogus value for HADOOPLOGDIR
    • HADOOPPIDDIR defaults to /tmp, so you might want to change that variable, too.

Start HDFS

Start the NameNode:

sbin/ start namenode

Start a DataNode:

sbin/ start datanode

(Optionally) start the SecondaryNameNode (this is not required for local development, but definitely for production).

sbin/ start secondarynamenode

To confirm that the processes are running, issue jps and look for lines for NameNode, DataNode and SecondaryNameNode:

$ jps
55036 Jps
55000 SecondaryNameNode
54807 NameNode
54928 DataNode


  • the hadoop daemons log to the "logs" dir.  Stdout goes to a file ending in ".out" and a logfile ends in ".log". If a daemon doesn't start up, check the file that includes the daemon name (e.g. logs/hadoop-joecrow-datanode-jcmba.local.out).
  • the commands might say "Unable to load realm info from SCDynamicStore" (at least on Mac OS X). This appears to be harmless output, see HADOOP-7489 for details.

Stopping HDFS

Eventually you'll want to stop HDFS. Here are the commands to execute, in the given order:

sbin/ stop secondarynamenode
sbin/ stop datanode
sbin/ stop namenode

Use jps to confirm that the daemons are no longer running.

Running an example MR Job

This section just gives the commands for configuring and starting the Resource Manager, Node Manager, and Job History Server, but it doesn't explain the details of those. Please refer to the References and Links section for more details.

The Yarn daemons use the conf directory in the distribution for configuration by default. Since we used etc/hadoop as the configuration directory for HDFS, it would be nice to use that as the config directory for mapreduce, too.  As a result, we update the following files:

In conf/, add the following lines under the definition of YARNCONFDIR:


In conf/yarn-site.xml, update the contents to:

<?xml version="1.0"?>

Set the contents of etc/hadoop/mapred-site.xml to:

<?xml version="1.0"?>
<?xml-stylesheet href="configuration.xsl"?>

Now, start up the yarn daemons:

 $ bin/ start resourcemanager
 $ bin/ start nodemanager
 $ bin/ start historyserver

A bunch of example jobs are available via the hadoop-examples jar. For example, to run the program that calculates pi:

$ bin/hadoop jar hadoop-mapreduce-examples-0.23.0.jar pi \ \
-libjars modules/hadoop-mapreduce-client-jobclient-0.23.0.jar 16 10000

The command will output a lot of output, but towards the end you'll see:

Job Finished in 67.705 seconds
Estimated value of Pi is 3.14127500000000000000


  • By default, the resource manager uses a number of IPC ports, including 8025, 8030, 8040, and 8141.  The web UI is exposed on port 8088.
  • By default, the JobHistoryServer uses port 19888 for a web UI and port 10020 for IPC.
  • By default, the node manager uses port 9999 for a web UI and port 4344 for IPC. Port 8080 is used for something? Also so random port... 65176 ?
  • The resource manager has a "proxy" url that it uses to link-through to the JobHistoryServer UI. e.g.:
    $ curl -I
    HTTP/1.1 302 Found
    Content-Type: text/plain; charset=utf-8
    Content-Length: 0
    Server: Jetty(6.1.26)


While Hadoop 0.23 is an alpha-release, getting it up and running in psuedo-distributed mode isn't too difficult.  The new architecture will take some getting used to for users of previous releases of Hadoop, but it's an exciting step forward.

Observations and Notes

There are a few bugs or gotchas that I discovered or verified to keep an eye on as you're going through these steps.  These include:

  • HADOOP-7837 log4j isn't setup correctly when using sbin/
  • HDFS-2574 Deprecated parameters appear in the hdfs-site.xml templates.
  • HDFS-2595 misleading message when not set and running sbin/
  • HDFS-2553 BlockPoolScanner spinning in a loop (causes DataNode to peg one cpu to 100%).
  • HDFS-2608 NameNode webui references missing hadoop.css

References and Links

Leave a Comment

Recap: Apache Flume (incubating) User Meetup, Hadoop World 2011 NYC Edition

Posted on Nov 13, 2011

The Apache Flume (incubating) User Meetup, Hadoop World 2011 NYC Edition was Wednesday, November 9. It was collocated with the Hive Meetup at Palantir's awesome office space in the meatpacking district in Manhattan.

The following are my notes from the two presentations about Flume NG and Artimon. Hopefully I've recapped details accurately, but please let me know if corrections are appropriate!

Flume NG

Speakers Arvind Prabhakar and Eric Sammer of Cloudera. Arvind is first.

Motivation: Flume was built piecemeal as use-cases arose, and FlumeNG is taking a step back and rearchitecting from scratch with these use-cases in mind.  To understand the flume-ng architecture, Arvind took us through a tour of terminology.

If Flume is a blackbox, then there are bunches of potential inputs (e.g. log4j, thrift, avro) that go into the flume black box. Likewise there are lots of potential destinations for data (e.g. hdfs, hbase) exiting the flume black box.

Clients are the entities that produce events, e.g. log4j appender. A terminal sink is the final sink where data exits flume. Like flume 0.9.x, there are sources and sinks, but the start and end points have special names--  clients and terminal sinks. A flow describes how data moves from a client to a terminal sink.

Agents are the physical processes that house bunches of flume entities to move the flow along (pass along data from a source to a sink).

There can be other situations, such as fanout flows, in which an event gets replicated to two different sinks. From the terminology perspective, each start-end pipeline is a flow, even if two flows have hops in common and overlap.

A channel is a persistent or transient store for buffering events on agent nodes. An agent reads from a source and puts data on a channel. The sink polls the channel, picks up the events that arrived, packages them together, and sends them along. In doing so, the sink can do batching, encryption, compression, etc.  A source can feed into multiple channels, but a sink only reads from a single channel.

Rather than a ChannelDriver (flume 0.9.x), the sinks and sources each have their own threads, and they collaborate using a producer-consumer paradigm with the channel as a queue/buffer.

Because the definition of a node doesn't fit well in fanout situations, this term is no longer used.

Transactional Guarantees

  • if a agent has a single source and single sink. The source initiates a transaction with the channel. If the source can't accept the message, it invalidates the transaction.
  • likewise, sink has a transaction with its channel, so there is a channel -> sink -> src -> channel transaction, meaning you can have guaranteed delivery.
  • implementations of channels include:
    • memory, file, jdbc (with acid guarantees).


client (log4j) -> agent1 -> agent2 -> HDFS.

If agent2/HDFS link goes down, then the events will start accumulating on agent2. This is because channels are passive (the sink actively removes them). When the channel reaches its capacity, the message will get relayed as an exception and then events will start buffering on agent1. When agent1 reaches its capacity, then your client will start to fail. When link recovers, you'll start to see an immediate draining of events.

At this point, Eric takes over to chat about the flume-ng (and flume in general) dev efforts:

The bug for flume-ng is FLUME-728, the branch is called FLUME-728. Per Eric:

  • alpha quality.
  • sources: avro source, unix (tail) source, net cat source (for testing), a few other things.
  • configuration is now plugin/user extendable (zookeeper implemented, bug could use other stores).
  • hdfs sink supports append and sync, as well as bucketing.
  • avro client sink that lets you do tiered collection.
  • looking for committers that don't work for Cloudera.

Notes from breakout session run by Eric:

  • Avro is the new default RPC system (no more reliance on thrift binary).
  • Avro is a bit slower, so batching / buffering is builtin to improve performance
  • Not 100% api compatible, so custom flume decorators/sinks will have to have (minimal) changes.
  • lots more that I can't recall :).

Artimon - Arkéa Real Time Information Monitoring.

Mathias Herberts of Arkéa- Slides on Slideshare.

"we found out that none of the existing monitoring system meet our needs, so we created artimon"

ARTIMON -- last mast of a ship with at least 3 sails, and it's supposed to stabilize the ship.

Collecting data

Collect variable instances on machines (using flume) and store the metrics in HDFS and in-memory buckets.  Variable instances are similar to rowkeys in OpenTSDB, if you're familiar. Something like:

name, label=value

Support for types like:

  • Integer
  • Double
  • Boolean
  • String

The framework adds a few labels: datacenter, rack, module, context, environment (production, testing, etc), host. Retrieved via reverse dns, dc-rack-module.

Metrics are created by a library (mainly java), called ArtimonBookeeper. It has a very simple interface. To use ArtimonBookeeper:

    • build a singleton.
    • use a method like: addToIntegerVar(name, labels, offset).
      • creates (or adds to) a variable metric (counter) held in memory.
    • the bookkeeper is part of a context, and it registers itself in zookeeper.
      • part of the registration tells ZK the context, IP and port.
      • the ip and port are a thrift end point that you can retrieve variables/counters stored by the application.
    • metrics have been exported by java library using a thrift endpoint. The design allows you to expose metrics from any language supported by thrift.
    • if you can't provide a thrift endpoint to expose metrics, you can create a file in /var/run/artimon, and it'll read the data every minute.

Machines run apps that export the metrics. Can have several apps on each machine.


Each machine has flume running. Start several nodes and configure them without a master (patched flume cause they had problems with the master). Special source "artimon source" looks in Zookeeper for stuff to read. Periodically polls (every minute) the thrift endpoints exposed by the apps. Retrieve all metrics that are on the same app. Push to collectors. Collectors push to sink:

  • put most granular data in HDFS.
  • VHMS - VarHistoryMemStore (several servers running this). Store in buckets with various resolutions, depth, etc.
    • 60 buckets of 60 seconds.
    • 48 buckets of 5 minutes
    • 96 buckets of 15 minutes.
    • 72 buckets of 60 minutes. (3 days worth of data with 1-hour granularity).

Worked well for a while, but hit a "flume-wall":

  • DFOs for failures, which works very well.
  • problems arise when you have a slowdown, but not a complete failure. Not going to benefit from the DFO. You're going to run into OOME because you fill up buffers. Had to put in place a heuristic to drop stuff.
    • not great because when there is congestion is when there is a problem.
  • Solution was to create a Kafka sink. Kafka is the buffer. Failure scenario is much better. Haven't had any slowdowns.
  • Wrote a Kafka source on the other side to collect metrics.

Stats and numbers

  • Collecting around 1.5 million metrics per minute.
    • 25K metrics/second. But they experience that all metrics are collected in about 10s of every minute.
  • 1.5B metrics per day.
  • Keeping all raw data since end of june, collected around 80TB (divide by 3). 200-300GB/day, compressed.

The great thing about flume is that we have several dc, but when there's a colo partition, the data recovers. When the partition ends, the metrics flood in.

What they do with metrics?

VHMS has a thrift endpoint. You can query it like "Give me all metrics named memcached which have dc=jb for the last 600,000ms"

  • retrieves a matrix of values. memcached.varX x value
  • you can display and have a pretty quick view.
  • you can apply stuff on the matrices.
    • "Map paradigm" - historical data on one variable data. Library is in Groovy. DSL to play with metrics.
    • map_delta() -- take two values, create a new one. Creates a new dataset.
    • provides a bunch of builtin functions, but you can create your own using groovy closures.
  • labels are important. You can define equivalence classes. If you have the same names and values (or subset).
  • can do bulk operations.
    • group two operands by equivalence classes. e.g. compute available disk space. Works for one volume or thousands of volumes.
  • reduce paradigm to an equivalence class. e.g. could compute the average temperature of each rack in your datacenter with 1-line.

notes from breakout session:

  • have a JMX adapter that's used for pulling metrics from hadoop, hbase, etc.
  • using Kafka very much like the channels in flume-ng (flume releases use a push model, which is responsible for many of the problems).
  • Write pig scripts for custom / post-mortem analysis.
  • Groovy library has an email service for alerting.

Thanks again for everyone that help to make this event happen. It was a ton of fun.

Leave a Comment

Blog Search


Joe Crobak is a software engineer at Project Florida and runs Hadoop Weekly.

Elsewhere on the internet:

subscribe via RSS