using jarjar to solve hive and pig antlr conflicts

Pig 0.9+ and Hive 0.7+ (and maybe older versions, too) both use antlr. Unfortunately, they use incompatible versions which causes problems if you try to pull in both pig and hive via ivy or maven. Oozie has come up with a deployment workaround for this problem, but HCatalog is still on Pig 0.8 because of the above issue.

Per the recommendation of a co-worker (thanks johng!), I checked out jarjar to shade the pig jar to avoid the conflicts of antlr and all the other dependencies in the pig “fat jar.” Here are the steps that I used:

  1. Download jarjar 1.3 (note that the 1.4 release of jarjar seems to have a bad artifact).
  2. Generate a list of “rules” to rewrite the non-pig classes in the pig-withouthadoop jar:
    1. jar tf pig-0.10.0-cdh4.1.0-withouthadoop.jar | \
        grep "\\.class" | grep -v org.apache.pig | \
        python -c "import sys;[sys.stdout.write('.'.join(arg.split('/')[:-1]) +'\n') for arg in sys.stdin]" | \
        sort | uniq | \
        awk '{ print "rule " $1 ".* org.apache.pig.jarjar.@0" }' > \
        pig-jarjar-automated.rules
    2. The above command generates one rule per package containing a class file, rewriting the class with a prefix of org.apache.pig.jarjar. The rules are stored in pig-jarjar-automated.rules. See below for the rules file that I generated.
  3. Run jarjar with the rules listed to generate a new jar:
    1. java -jar jarjar-1.3.jar process \
        pig-jarjar-automated.rules \
        pig-0.10.0-cdh4.1.0-withouthadoop.jar \
        pig-0.10.0-cdh4.1.0-withouthadoop-jarjar.jar
    2. Checkout jarjar’s command line docs for more info, including the rules file format.
  4. Check the contents of your new jar.
    1.  jar tf pig-0.10.0-cdh4.1.0-withouthadoop-jarjar.jar | \
        egrep "\.class$" | grep -c -v "org/apache/pig"
    2. The above command should return 0 to show that all classes have been rewritten under org/apache/pig.
That’s it! We uploaded this jar into our internal nexus repo to use within tests. We’re still using the vanilla pig jar for our pig CLI and within oozie, but we may do further testing to see what happens once we start rolling out hcatalog.

For pig-0.10.0-cdh4.1, the rules file looks like this:

rule com.google.common.annotations.* org.apache.pig.jarjar.@0
rule com.google.common.base.* org.apache.pig.jarjar.@0
rule com.google.common.base.internal.* org.apache.pig.jarjar.@0
rule com.google.common.cache.* org.apache.pig.jarjar.@0
rule com.google.common.collect.* org.apache.pig.jarjar.@0
rule com.google.common.eventbus.* org.apache.pig.jarjar.@0
rule com.google.common.hash.* org.apache.pig.jarjar.@0
rule com.google.common.io.* org.apache.pig.jarjar.@0
rule com.google.common.math.* org.apache.pig.jarjar.@0
rule com.google.common.net.* org.apache.pig.jarjar.@0
rule com.google.common.primitives.* org.apache.pig.jarjar.@0
rule com.google.common.util.concurrent.* org.apache.pig.jarjar.@0
rule dk.brics.automaton.* org.apache.pig.jarjar.@0
rule jline.* org.apache.pig.jarjar.@0
rule org.antlr.runtime.* org.apache.pig.jarjar.@0
rule org.antlr.runtime.debug.* org.apache.pig.jarjar.@0
rule org.antlr.runtime.misc.* org.apache.pig.jarjar.@0
rule org.antlr.runtime.tree.* org.apache.pig.jarjar.@0
rule org.apache.tools.bzip2r.* org.apache.pig.jarjar.@0
rule org.stringtemplate.v4.* org.apache.pig.jarjar.@0
rule org.stringtemplate.v4.compiler.* org.apache.pig.jarjar.@0
rule org.stringtemplate.v4.debug.* org.apache.pig.jarjar.@0
rule org.stringtemplate.v4.gui.* org.apache.pig.jarjar.@0
rule org.stringtemplate.v4.misc.* org.apache.pig.jarjar.@0
Posted in Uncategorized | Leave a comment

Workflow Engines for Hadoop

Over the past 2 years, I’ve had the opportunity to work with two open-source workflow engines for Hadoop. I used and contributed to Azkaban, written and open-sourced by LinkedIn, for over a year while I worked at Adconion. Recently, I’ve been working with Oozie, which is bundled as part of Cloudera’s CDH3. Both systems have a lot of great features but also a number of weaknesses. The strengths and weaknesses of both systems don’t always overlap, so I hope that each can learn from the other to improve the tools available for Hadoop.

In that vain, I’m going to produce a head-head comparison of the two systems considering a number of different features. In the follow comparisons, I’m considering the version of Azkaban found in master on github (with exceptions noted) and Oozie from CDH3u3.

Job Definition

Both systems support defining a workflow as a DAG (directed acyclic graph) made up of individual steps.

Azkaban

In Azkaban, a “job” is defined as a java properties file. You specify a job type, any parameters, and any dependencies that job has. Azkaban doesn’t have any notion of a self-contained workflow — a job can depend on any other job in the system. Each job has a unique identifier which is used to reference dependent jobs.

Oozie

In Oozie, a “jobs” are referred to as “actions”. A workflow is defined in an XML file, which specifies a start action. There are special actions such as fork and join (which fork and join dependency graph), as well as the ability to reference a “sub-workflow” defined in another XML file.

Job Submission

Azkaban

To submit a job to Azkaban, one creates a tar.gz or zip archive and uploads it via Azkaban’s web interface. The archive contains any jars necessary to run the workflow, which are automatically added to the classpath of job at launch time.

It’s possible to bypass the archive upload (this is what we did at Adconion), and directly place the files on the filesystem then tell Azkaban to reload the workflow definitions. I liked this approach because we were able to use RPMs to install workflows, and thus gave us the ability to rollback to a previous version.

Oozie

Oozie comes with a command-line program for submitting jobs. This command-line program interacts with the Oozie server via REST. Unfortunately, the REST api (at least in our version of Oozie) doesn’t have very good error reporting. It’s actually very easy to cause the server to 500 in which case you have to investigate Oozie’s logs to guess at the problem.

Before submitting a job, the job definition, which is a folder contain xml and jar files, must be uploaded to HDFS. Any jars that are needed by the workflow should be placed in the “lib” directory of the workflow folder. Optionally, Oozie can include “system” libraries by setting a sytem library path in oozie-site and adding a property setting. Note that *only* HDFS is supported, which makes testing an Oozie workflow cumbersome since you must spin-up a MiniDFS cluster.

Running a Job

Azkaban

Azkaban provides a simple web-interface for running a job. Each job is given a name in its definition, and one can choose the appropriate job in the UI and click “Run Now”. It’s also easy to construct a HTTP POST to kick-off a job via curl or some other tool.

Determining which job to launch, though, can be quite confusing. With Azkaban, you don’t launch via the “first” or “start” node in your DAG, but rather, you find the last node in your DAG and run it. This causes all the (recursive) dependent jobs to run.  This model means that you sometimes have to jump through some hoops to prevent duplicate work from occurring if you have multiple sinks with a common DAG.

Azkaban runs the driver program as a child process of the Azkaban process. This means that you’re resource constrained by the memory on the box, which caused us to DOS or box a few times (Azkaban does have a feature to limit the number of simultaneous jobs, which we did use to alleviate this problem. But then your job-submission turns into FIFO).

Oozie

Once a workflow is uploaded to HDFS, one submits or runs a job using the Oozie client. You must give Oozie the full path to your workflow.xml file in HDFS as a parameter to the client. This can be cumbersome since the path changes if you version your workflows (and if you don’t version your workflow, a re-submission could cause a running job to fail). Job submission typically references a java properties file that contains a number of parameters for the workflow.

Oozie runs the “driver” program (e.g. PigMain or HiveMain or your MapReduce program’s main) as a MapTask. This has a few implications:

  1. If you have the wrong scheduler configuration, it’s possible to end up with all map task slots occupied only by these “driver” tasks.
  2. If a map task dies (disable preemption and hope that your TaskTrackers don’t die), you end up with an abandoned mapreduce job. Unless you kill the job, retrying at the Oozie level will likely fail.
  3. There’s another level of indirection to determine what’s happened if your job failed. You have to navigate from Oozie to Hadoop to the Hadoop job to the map task to the map task’s output to see what happened.
  4. But also, you don’t have a single box that you might DOS.

Scheduling a Job

Azkaban

Azkaban provides a WebUI for scheduling a job with cron-like precision. It’s rather easy to recreate this HTTP POST from the command-line.

Oozie

Oozie has a great feature called “coordinators”. A coordinator is an XML file that optionally describes datasets that a workflow consumes, and also describes the frequency of your dataset. For example, you can tell it that your dataset should be created daily at 1am. If there are input datasets described, then your workflow will only be launched when those datasets are available.

A coordinator requires a “startDate”, which is rather annoying in the usual case (I just want to launch this workflow today and going forward… we have taken to making the startDate a parameter since we don’t necessarily know when the coordinator will be released), but also makes it very easy to do a backfill of your data. E.g. if you have a new workflow that you want to run over all data from the first of the year onwards, just specify a startDate of Jan 1st.

Azkaban doesn’t include anything like Oozie’s coordinators. At Adconion, we wrote our own version of it, which also supported some nice features like reruns when data arrive late.

Security

Azkaban

Azkaban doesn’t support secure Hadoop. That means, if you’re running CDH3 or Hadoop 0.20.200+, that all of your jobs will be submitted to Hadoop as a single user. There have been discussions about fixing this, and I know that Adconion was working on something. Even so, with the fair scheduler it’s possible to assign jobs to different pools.

Oozie

Oozie has built-in support for secure Hadoop including kerberos. We haven’t used this, but it does mean that you have to configure Hadoop to allow Oozie to proxy as other users. Thus, jobs are submitted to the cluster as the user that submitted the job to Oozie (although it’s possible to override this in a non-kerberos setting).

Property Management

Azkaban

Azkaban has a notion of “global properties” that are embedded within Azkaban itself. These global properties can be referenced from within a workflow, and thus a generic workflow can be built as long as different values for these global properties are specified in different environments (e.g. testing, staging, production). Typical examples of global properties are things like the location of the pig global props and database usernames and passwords.

Azkaban determines which Hadoop cluster to talk to by checking for HADOOP_HOME and HADOOP_CONF_DIR directories containing core-site.xml and mapred-site.xml entries. This also allows you to specify things like the default number of reducers very easily.

Global properties are nice, because if you need to tweak one you don’t have to redeploy the workflows that depend on them.

Oozie

Oozie doesn’t have a notion of global properties. All properties must be submitted as part of every job run. This include the jobtracker and the namenode (so make sure you have CNAMEs setup for those in case they ever change!). Also, Oozie doesn’t let you refer to anything with a relative path (including sub-workflows!), so we’ve taken to setting a property called workflowBase that our tooling provides.

At foursquare, we’ve had to build a bunch of tooling around job submission so that we don’t have to keep around all of these properties in each of our workflows. We’re still stuck with resubmitting all coordinators, though, if we have to make a global change. Also, the jobtracker/namenode settings are extra annoying because you *must* specify these in each and every workflow action. Talk about boilerplate. I assume that since Yahoo has use-cases for supporting multiple clusters for a particular Oozie, but the design over-complicates things for the typical case.

Reruns

Azkaban

A neat feature of Azkaban is partial reruns – i.e. if your 10 step workflow fails on step 8, then you can pickup from step 8 and just run the last 3 steps. This was possible to do via the UI. This was an attractive feature of Azkaban, but we didn’t use it.

Oozie

In order to get a similar feature in Oozie, each action in your workflow must be a sub-workflow, then you can run the individual sub-workflows. At least in theory — it turns out that you have to set so many properties that it becomes untenable, and even with the right magic incantation, I couldn’t get this to work well.

Reruns of failed days in a coordinator are easy, but only in an all-or-nothing sense — if the last step of the workflow failed, there’s no easy way to rerun it.

UI

Azkaban

Azkaban has a phenomenal UI for viewing workflows (including visualizing the DAG!), run histories, submitting workflows, creating schedules, and more. The UI does have some bugs, such as when you run multiple instances of the same workflow, the history page gets confused. But in general, it’s very easy to tell what the state of the system is.

Oozie

The Oozie UI, on the other hand, is not very useful. It’s all Aaax, but is formatted in a window sized for a 1999 monitor. It’s laggy, doubl-clicks don’t always work, and things that should be links aren’t. It’s nearly impossible to navigate once you have a non-trivial number of jobs because jobs aren’t named with any human-readable form, the UI doesn’t support proper sorting, and it’s too laggy.

Monitoring

Azkaban

Azkaban supports a global email notification whenever a job finishes. This is a nice and cheap mechanism to detect failures. Also, my Adconion-colleague Don Pazel contributed a notification system that can be stitched up to detect failures, run times, etc and expose these via JMX or HTTP. That’s what we did at Adconion, but that piece wasn’t open-sourced.

Oozie

With Oozie, it’s possible to have an email action that mails on success or failure, but an action has to be defined for each workflow. Since there’s no good way to detect failure, we’ve written a workflow that uses the Oozie REST api to check the status of jobs and then sends us a daily email. This is far from ideal since we sometimes don’t learn about a failure until hours after it occurred.

Testing

Azkaban

Testing with Azkaban can be achieved by instantiating the Azkaban JobRunner and using the java api to submit a job. We had a lot of success with this at Adconion, and tests ran in a matter of seconds.

Oozie

Oozie has a LocalOozie utility, but it requires spinning up a HDFS cluster (since Oozie has lots of hard-coded checks that data lives in HDFS). Thus, integration testing is slow (on the order of a minute for a single workflow).

Oozie also has a class that can validate a schema, which we’ve incorporated to our build. But that doesn’t catch things like parameter typos, or referencing non-existant actions.

Custom Job Types

Azkaban

Writing a custom job is fairly straightforward. The Azkaban API has some abstract classes you can subclass. Unfortunately, you must recompile Azkaban to expose a new job type.

Oozie

Admittedly, I haven’t tried this. But the action classes that I’ve seen are well into the hundreds of lines of code.

Baked-in support

Azkaban

Azkaban has baked-in support for Pig, java, shell, and mapreduce jobs.

Oozie

Oozie has baked-in support for Pig, Hive, and Java. The shell and ssh actions have been deprecated. In addition, though, an Oozie action can have a “prepare” statement to cleanup directories to which you might want to write. But these directories *must* be HDFS, which means that if you use <prepare> then your code is less testable.

Storing State

Azkaban

Azkaban uses JSON files on the filesystem to store state. It caches these files in memory using a LRU cache, which makes the UI responsive. The JSON files are also easy to investigate if you want to poke-around bypassing the UI. Creating a backup is an easy snapshot of a single directory on a filesystem.

Oozie

Oozie uses an RDBMS for storing state, and one must backup that RDBMS via whatever mechanism in order to create a backup.

Documentation

Both systems feature rich documentation. Oozie’s documentation tends to be much longer and larger since it includes XML fragments as well as details of the XML schemas. Over the years, Azkaban’s documentation has at times fallen out of sync with implementation, but the general docs are still maintained.

A note on Oozie versions

To be fair to Oozie, I haven’t tried the latest version, yet. Hopefully many issues I’ve noted are fixed, but if not, it’ll be easier to file bug reports once on the latest versions. A number of bugs I found in the CDH3u3 version of Oozie were either fixed or not applicable to trunk, so it became difficult to keep track of what was what.

Summary

Both Azkaban and Oozie offer substantial features and are powerful workflow engines. The weaknesses and strengths of the systems tend to complement one-another, and it’d be fantastic if each system integrated the strengths of the other to improve.

It’s worth noting that there are also a number of other workflow systems available that I haven’t used. I am not discounting these systems, but I have zero authority to speak on them. Lot’s of folks also seem to be using in-house engines, and it’d be fantastic to see more work open-sourced.

Writing a general-purpose workflow engine is very hard, and there are certainly remnants of the “LinkedIn” or “Yahoo” way of doing things in each system. As communities grow, hopefully these engines will start to lose those types of annoyances.

The second-half of 2012 could be very interesting with workflow-engine improvements. For example, there is talk of a new UI for building Oozie workflows, HCatalog and Oozie integration could be interesting, and YARN integration could make for a better-solution to distributing the “driver” programs for workflows (I’ve heard rumblings that Oozie could go down this path).

Lastly, I realize that this post more than likely has mistakes or oversights. These are inadvertent — if you find any, please make a note in the comments, and I will try to correct.

Posted in hadoop | 7 Comments

Getting Started with Apache Hadoop 0.23.0

Hadoop 0.23.0 was released November 11, 2011. Being the future of the Hadoop platform, it’s worth checking out even though it is an alpha release.

Note: Many of the instructions in this article came from trial and error, and there are lots of alternative (and possibly better ways) to configure the systems. Please feel free to suggest improvements in the comments. Also, all commands were only tested on Mac OS X.

Download

To get started, download the hadoop-0.23.0.tar.gz file from one of the mirrors here: http://www.apache.org/dyn/closer.cgi/hadoop/common/hadoop-0.23.0.

Once downloaded, decompress the file. The bundled documentation is available in share/doc/hadoop/index.html

Notes for Users of Previous Versions of Hadoop

The directory layout of the hadoop distribution changed in hadoop 0.23.0 and 0.20.204 vs. previous versions. In particular, there are now sbin, libexec, and etc directories in the root of distribution tarball.

scripts and executables

In hadoop 0.23.0, a number of commonly used scripts from the bin directory have been removed or drastically changed.  Specifically, the following scripts were removed (vs 0.20.205.0):

  • hadoop-config.sh
  • hadoop-daemon(s).sh
  • start-balancher.sh and stop-balancer.sh
  • start-dfs.sh and stop-dfs.sh
  • start-jobhistoryserver.sh and stop-jobhistoryserver.sh
  • start-mapred.sh and stop-mapred.sh
  • task-controller

The start/stop mapred-related scripts have been replaced by “map-reduce 2.0″ scripts called yarn-*.  The start-all.sh and stop-all.sh scripts no longer start or stop HDFS, but they are used to start and stop the yarn daemons.  Finally, bin/hadoop has been deprecated. Instead, users should use bin/hdfs and bin/mapred.

Hadoop distributions now also include scripts in a sbin directory. The scripts include start-all.sh, start-dfs.sh, and start-balancer.sh (and the stop versions of those scripts).

configuration directories and files

The conf directory that comes with Hadoop is no longer the default configuration directory.  Rather, Hadoop looks in etc/hadoop for configuration files.  The libexec directory contains scripts hadoop-config.sh and hdfs-config.sh for configuring where Hadoop pulls configuration information, and it’s possible to override the location of the configuration directory the following ways:

  • hdfs-config.sh calls hadoop-config.sh in $HADOOP_COMMON_HOME/libexec and $HADOOP_HOME/libexec
  • hadoop-config.sh accepts a –config option for specifying a config directory, or the directory can be specified using $HADOOP_CONF_DIR.
    • This scripts also accepts a –hosts parameter to specify the hosts / slaves
    • This script uses variables typically set in hadoop-env.sh, such as: $JAVA_HOME, $HADOOP_HEAPSIZE, $HADOOP_CLASSPATH, $HADOOP_LOG_DIR, $HADOOP_LOGFILE and more.  See the file for a full list of variables.

Configure HDFS

To start hdfs, we will use sbin/start-dfs.sh which pulls configuration from etc/hadoop by default. We’ll be putting configuration files in that directory, starting with core-site.xml.  In core-site.xml, we must specify a fs.default.name:

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property>
    <name>fs.default.name</name>
    <value>hdfs://localhost:9000</value>
  </property>
</configuration>

Next, we want to override the locations that the NameNode and DataNode store data so that it’s in a non-transient location. The two relevant parameters are dfs.namenode.name.dir and dfs.datanode.data.dir.  We also set replication to 1, since we’re using a single datanode.

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
  <property>
    <name>dfs.replication</name>
    <value>1</value>
  </property>
  <property>
    <name>dfs.namenode.name.dir</name>
    <value>file:/Users/joecrow/Code/hadoop-0.23.0/data/hdfs/namenode</value>
  </property>
  <property>
    <name>dfs.datanode.data.dir</name>
    <value>file:/Users/joecrow/Code/hadoop-0.23.0/data/hdfs/datanode</value>
  </property>
</configuration>

Notes:

  • as of HDFS-456 and HDFS-873, the namenode and datanode dirs should be specified with a full URI.
  • by default, hadoop starts up with 1000 megabytes of RAM allocated to each daemon. You can change this by adding a hadoop-env.sh to etc/hadoop. There’s a template that can be added with: $ cp ./share/hadoop/common/templates/conf/hadoop-env.sh etc/hadoop
    • The template sets up a bogus value for HADOOP_LOG_DIR
    • HADOOP_PID_DIR defaults to /tmp, so you might want to change that variable, too.

Start HDFS

Start the NameNode:

sbin/hadoop-daemon.sh start namenode

Start a DataNode:

sbin/hadoop-daemon.sh start datanode

(Optionally) start the SecondaryNameNode (this is not required for local development, but definitely for production).

sbin/hadoop-daemon.sh start secondarynamenode

To confirm that the processes are running, issue jps and look for lines for NameNode, DataNode and SecondaryNameNode:

$ jps
55036 Jps
55000 SecondaryNameNode
54807 NameNode
54928 DataNode

Notes:

  • the hadoop daemons log to the “logs” dir.  Stdout goes to a file ending in “.out” and a logfile ends in “.log”. If a daemon doesn’t start up, check the file that includes the daemon name (e.g. logs/hadoop-joecrow-datanode-jcmba.local.out).
  • the commands might say “Unable to load realm info from SCDynamicStore” (at least on Mac OS X). This appears to be harmless output, see HADOOP-7489 for details.

Stopping HDFS

Eventually you’ll want to stop HDFS. Here are the commands to execute, in the given order:

sbin/hadoop-daemon.sh stop secondarynamenode
sbin/hadoop-daemon.sh stop datanode
sbin/hadoop-daemon.sh stop namenode

Use jps to confirm that the daemons are no longer running.

Running an example MR Job

This section just gives the commands for configuring and starting the Resource Manager, Node Manager, and Job History Server, but it doesn’t explain the details of those. Please refer to the References and Links section for more details.

The Yarn daemons use the conf directory in the distribution for configuration by default. Since we used etc/hadoop as the configuration directory for HDFS, it would be nice to use that as the config directory for mapreduce, too.  As a result, we update the following files:

In conf/yarn-env.sh, add the following lines under the definition of YARN_CONF_DIR:

export HADOOP_CONF_DIR="${HADOOP_CONF_DIR:-$YARN_HOME/etc/hadoop}"
export HADOOP_COMMON_HOME="${HADOOP_COMMON_HOME:-$YARN_HOME}"
export HADOOP_HDFS_HOME="${HADOOP_HDFS_HOME:-$YARN_HOME}"

In conf/yarn-site.xml, update the contents to:

<?xml version="1.0"?>
<configuration>
  <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce.shuffle</value>
  </property>
  <property>
    <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
    <value>org.apache.hadoop.mapred.ShuffleHandler</value>
  </property>
</configuration>

Set the contents of etc/hadoop/mapred-site.xml to:

<?xml version="1.0"?>
<?xml-stylesheet href="configuration.xsl"?>
<configuration>
  <property>
    <name>mapreduce.framework.name</name>
    <value>yarn</value>
  </property>
</configuration>

Now, start up the yarn daemons:

 $ bin/yarn-daemon.sh start resourcemanager
 $ bin/yarn-daemon.sh start nodemanager
 $ bin/yarn-daemon.sh start historyserver

A bunch of example jobs are available via the hadoop-examples jar. For example, to run the program that calculates pi:

$ bin/hadoop jar hadoop-mapreduce-examples-0.23.0.jar pi \
-Dmapreduce.clientfactory.class.name=org.apache.hadoop.mapred.YarnClientFactory \
-libjars modules/hadoop-mapreduce-client-jobclient-0.23.0.jar 16 10000

The command will output a lot of output, but towards the end you’ll see:

Job Finished in 67.705 seconds
Estimated value of Pi is 3.14127500000000000000

Notes

  • By default, the resource manager uses a number of IPC ports, including 8025, 8030, 8040, and 8141.  The web UI is exposed on port 8088.
  • By default, the JobHistoryServer uses port 19888 for a web UI and port 10020 for IPC.
  • By default, the node manager uses port 9999 for a web UI and port 4344 for IPC. Port 8080 is used for something? Also so random port… 65176 ?
  • The resource manager has a “proxy” url that it uses to link-through to the JobHistoryServer UI. e.g.:
    $ curl -I http://0.0.0.0:8088/proxy/application_1322622103371_0001/jobhistory/job/job_1322622103371_1_1
    HTTP/1.1 302 Found
    Content-Type: text/plain; charset=utf-8
    Location: http://192.168.1.12:19888/jobhistory/job/job_1322622103371_1_1/jobhistory/job/job_1322622103371_1_1
    Content-Length: 0
    Server: Jetty(6.1.26)

Conclusion

While Hadoop 0.23 is an alpha-release, getting it up and running in psuedo-distributed mode isn’t too difficult.  The new architecture will take some getting used to for users of previous releases of Hadoop, but it’s an exciting step forward.

Observations and Notes

There are a few bugs or gotchas that I discovered or verified to keep an eye on as you’re going through these steps.  These include:

  • HADOOP-7837 log4j isn’t setup correctly when using sbin/start-dfs.sh
  • HDFS-2574 Deprecated parameters appear in the hdfs-site.xml templates.
  • HDFS-2595 misleading message when fs.default.name not set and running sbin/start-dfs.sh
  • HDFS-2553 BlockPoolScanner spinning in a loop (causes DataNode to peg one cpu to 100%).
  • HDFS-2608 NameNode webui references missing hadoop.css

References and Links

Posted in hadoop | 19 Comments

Recap: Apache Flume (incubating) User Meetup, Hadoop World 2011 NYC Edition

The Apache Flume (incubating) User Meetup, Hadoop World 2011 NYC Edition was Wednesday, November 9. It was collocated with the Hive Meetup at Palantir’s awesome office space in the meatpacking district in Manhattan.

The following are my notes from the two presentations about Flume NG and Artimon. Hopefully I’ve recapped details accurately, but please let me know if corrections are appropriate!

Flume NG

Speakers Arvind Prabhakar and Eric Sammer of Cloudera. Arvind is first.

Motivation: Flume was built piecemeal as use-cases arose, and FlumeNG is taking a step back and rearchitecting from scratch with these use-cases in mind.  To understand the flume-ng architecture, Arvind took us through a tour of terminology.

If Flume is a blackbox, then there are bunches of potential inputs (e.g. log4j, thrift, avro) that go into the flume black box. Likewise there are lots of potential destinations for data (e.g. hdfs, hbase) exiting the flume black box.

Clients are the entities that produce events, e.g. log4j appender. A terminal sink is the final sink where data exits flume. Like flume 0.9.x, there are sources and sinks, but the start and end points have special names–  clients and terminal sinks. A flow describes how data moves from a client to a terminal sink.

Agents are the physical processes that house bunches of flume entities to move the flow along (pass along data from a source to a sink).

There can be other situations, such as fanout flows, in which an event gets replicated to two different sinks. From the terminology perspective, each start-end pipeline is a flow, even if two flows have hops in common and overlap.

A channel is a persistent or transient store for buffering events on agent nodes. An agent reads from a source and puts data on a channel. The sink polls the channel, picks up the events that arrived, packages them together, and sends them along. In doing so, the sink can do batching, encryption, compression, etc.  A source can feed into multiple channels, but a sink only reads from a single channel.

Rather than a ChannelDriver (flume 0.9.x), the sinks and sources each have their own threads, and they collaborate using a producer-consumer paradigm with the channel as a queue/buffer.

Because the definition of a node doesn’t fit well in fanout situations, this term is no longer used.

Transactional Guarantees

  • if a agent has a single source and single sink. The source initiates a transaction with the channel. If the source can’t accept the message, it invalidates the transaction.
  • likewise, sink has a transaction with its channel, so there is a channel -> sink -> src -> channel transaction, meaning you can have guaranteed delivery.
  • implementations of channels include:
    • memory, file, jdbc (with acid guarantees).

Example:

client (log4j) -> agent1 -> agent2 -> HDFS.

If agent2/HDFS link goes down, then the events will start accumulating on agent2. This is because channels are passive (the sink actively removes them). When the channel reaches its capacity, the message will get relayed as an exception and then events will start buffering on agent1. When agent1 reaches its capacity, then your client will start to fail. When link recovers, you’ll start to see an immediate draining of events.

At this point, Eric takes over to chat about the flume-ng (and flume in general) dev efforts:

The bug for flume-ng is FLUME-728, the branch is called FLUME-728. Per Eric:

  • alpha quality.
  • sources: avro source, unix (tail) source, net cat source (for testing), a few other things.
  • configuration is now plugin/user extendable (zookeeper implemented, bug could use other stores).
  • hdfs sink supports append and sync, as well as bucketing.
  • avro client sink that lets you do tiered collection.
  • looking for committers that don’t work for Cloudera.

Notes from breakout session run by Eric:

  • Avro is the new default RPC system (no more reliance on thrift binary).
  • Avro is a bit slower, so batching / buffering is builtin to improve performance
  • Not 100% api compatible, so custom flume decorators/sinks will have to have (minimal) changes.
  • lots more that I can’t recall :).

Artimon – Arkéa Real Time Information Monitoring.

Mathias Herberts of Arkéa- Slides on Slideshare.

“we found out that none of the existing monitoring system meet our needs, so we created artimon”

ARTIMON — last mast of a ship with at least 3 sails, and it’s supposed to stabilize the ship.

Collecting data

Collect variable instances on machines (using flume) and store the metrics in HDFS and in-memory buckets.  Variable instances are similar to rowkeys in OpenTSDB, if you’re familiar. Something like:

name, label=value

Support for types like:

  • Integer
  • Double
  • Boolean
  • String

The framework adds a few labels: datacenter, rack, module, context, environment (production, testing, etc), host. Retrieved via reverse dns, dc-rack-module.

Metrics are created by a library (mainly java), called ArtimonBookeeper. It has a very simple interface. To use ArtimonBookeeper:

    • build a singleton.
    • use a method like: addToIntegerVar(name, labels, offset).
      • creates (or adds to) a variable metric (counter) held in memory.
    • the bookkeeper is part of a context, and it registers itself in zookeeper.
      • part of the registration tells ZK the context, IP and port.
      • the ip and port are a thrift end point that you can retrieve variables/counters stored by the application.
    • metrics have been exported by java library using a thrift endpoint. The design allows you to expose metrics from any language supported by thrift.
    • if you can’t provide a thrift endpoint to expose metrics, you can create a file in /var/run/artimon, and it’ll read the data every minute.

Machines run apps that export the metrics. Can have several apps on each machine.

Flume

Each machine has flume running. Start several nodes and configure them without a master (patched flume cause they had problems with the master). Special source “artimon source” looks in Zookeeper for stuff to read. Periodically polls (every minute) the thrift endpoints exposed by the apps. Retrieve all metrics that are on the same app. Push to collectors. Collectors push to sink:

  • put most granular data in HDFS.
  • VHMS – VarHistoryMemStore (several servers running this). Store in buckets with various resolutions, depth, etc.
    • 60 buckets of 60 seconds.
    • 48 buckets of 5 minutes
    • 96 buckets of 15 minutes.
    • 72 buckets of 60 minutes. (3 days worth of data with 1-hour granularity).

Worked well for a while, but hit a “flume-wall”:

  • DFOs for failures, which works very well.
  • problems arise when you have a slowdown, but not a complete failure. Not going to benefit from the DFO. You’re going to run into OOME because you fill up buffers. Had to put in place a heuristic to drop stuff.
    • not great because when there is congestion is when there is a problem.
  • Solution was to create a Kafka sink. Kafka is the buffer. Failure scenario is much better. Haven’t had any slowdowns.
  • Wrote a Kafka source on the other side to collect metrics.

Stats and numbers

  • Collecting around 1.5 million metrics per minute.
    • 25K metrics/second. But they experience that all metrics are collected in about 10s of every minute.
  • 1.5B metrics per day.
  • Keeping all raw data since end of june, collected around 80TB (divide by 3). 200-300GB/day, compressed.

The great thing about flume is that we have several dc, but when there’s a colo partition, the data recovers. When the partition ends, the metrics flood in.

What they do with metrics?

VHMS has a thrift endpoint. You can query it like “Give me all metrics named memcached which have dc=jb for the last 600,000ms”

  • retrieves a matrix of values. memcached.varX x value
  • you can display and have a pretty quick view.
  • you can apply stuff on the matrices.
    • “Map paradigm” – historical data on one variable data. Library is in Groovy. DSL to play with metrics.
    • map_delta() — take two values, create a new one. Creates a new dataset.
    • provides a bunch of builtin functions, but you can create your own using groovy closures.
  • labels are important. You can define equivalence classes. If you have the same names and values (or subset).
  • can do bulk operations.
    • group two operands by equivalence classes. e.g. compute available disk space. Works for one volume or thousands of volumes.
  • reduce paradigm to an equivalence class. e.g. could compute the average temperature of each rack in your datacenter with 1-line.

notes from breakout session:

  • have a JMX adapter that’s used for pulling metrics from hadoop, hbase, etc.
  • using Kafka very much like the channels in flume-ng (flume releases use a push model, which is responsible for many of the problems).
  • Write pig scripts for custom / post-mortem analysis.
  • Groovy library has an email service for alerting.

Thanks again for everyone that help to make this event happen. It was a ton of fun.

Posted in flume, hadoop | 1 Comment

Recap: April Puppet NYC Meetup

Last week, I attended my first Puppet NYC meetup, which was hosted at Gilt Groupe. As a fairly recent user of puppet, it was great to meet some folks from the community in NYC that are using it on a daily basis. Here are my notes from the two great presentations in hopes that they’re useful for other people.

Foreman

Presenter: Eric E. Moore (Brandorr Group LLC)

Foreman is an application that runs atop puppet to let you interact with and see reports of your puppet nodes.  In addition, Foreman can act as a provisioning system for imaging machines with kickstart, signing puppet certificates, and much more.  Foreman scales — foreman has been known to power a 4,000 node operation.

The main page of the Foreman UI is a list of all of your hosts, and it links off to a number of reports.  There is a dashboard/overview page which displays a pie chart of your Active/Error/Out of Sync instances as well as the run-distribution (and timings) of previous runs.

Foreman imports information about your hosts from the puppet storeconfigs, and there’s detailed output from the last puppet run (on a per-node basis).  Foreman also gives you a UI to access to the lists of facts per server, and you can do things like search for all nodes matching a particular fact (i.e. see a list of all machines with processor XYZ).  This data is available via API, as well (more later).

Foreman lets you control which environment a node is in (if you’re using environments on the puppet server), but it also lets you set variables that are sent to the puppet client on run (somewhat like facter).  You can set these variables on four-levels: global, domain-level, hostgroups, and host-level.  Internally, foreman lets you group machines by domain or into host-groups for setting variables like these (the talk at the puppet meetup was that this is an alternative to extlookup).

Foreman is designed for total-provisioning.  It supports provisioning via/configuring (among others):

  • Kickstart, Jumpstart, and Preseed
  • PuppetCA
  • TFTP
  • DNS
  • DHCP
  • Virtual machines (VMWare)
  • Eric and Brian mentioned that they are planning on contributing ec2 provisioning to the project.

Foreman has full role-based access controls, meaning you can give you users access to particular views, reports, operations or subsets of nods.  In addition, it provides an audit log of what has changed (including graphs of the number of changes, failures etc).  It provides a mechanism to initiate puppet runs from the dashboard, and also has a “destroy” button to clean out the storeconfigs for a particular node.

An interesting feature of Foreman is the REST API, which follows full REST / HTTP semantics for CRUD operations.  Eric mentioned using the API for provisioning nodes as well as for running searches over the nodes in the system.  It was mentioned that authentication for the REST API was less than ideal — suggestion was to use some sort of proxy in front of Foreman.

Foreman vs. Puppet dashboard: The conversation seemed to suggest that Foreman’s features were a super-set of those of Puppet Dashboard, with a few exceptions.  For example, Puppet Dashboard has support for viewing diffs of your configurations.

Change Management with Puppet

Presenter: Garrett Honeycutt (Puppet Labs) Slides

Ideally, you want all the environments to be exactly the same: Dev == QA == Staging == PROD so that you can catch issues early.  With that said, there’s typically some sort of approval criteria to move changes from one environment to the other (code review, QA procedures, etc).  Given all of these different environments, each environment often has different teams and sometimes there are conflicting goals.  For example, dev wants to do quick features whereas ops wants production to be stable.

It’s important to document the different environments you have and what the policies are.  For example, who owns what, what the order of precedence is, what the SLAs are per environment, etc.  Garret has seen a flow like the following work well when doing puppet development: Puppet Test Area -> Dev -> QA -> Prod .  In addition, it’s important to document the gating factors between environments — who can approve migrations between environments and how are they approved.

Suggested SVN/git layout looks like this:

branches/123
branches/124
...
tags/2011041300
tags/2011041301
tags/2011041400
...
trunk

Breaking it down:

  • Branches are short-lived and topical (“feature” branches). For example, branches/123 is a branch to work on ticket 123. All work should be done on a branch and then merged to trunk after review.
  • Tags are immutable, and Garret has found that BIND style timestamps work the best. 2011041300 would be the first tag for April 13th, 2011 (the last two digits are an incrementing counter).
  • Trunk contains all of the best known working code, but it is not very-well tested.

Development flow:

Consider these environments: Puppet Test Area -> Dev -> QA -> Prod

  1. When a ticket/change request comes in, create a branch off of trunk. Eventually, the change will get merged back to trunk.
  2. There should be an environment that is always running off of trunk so that you can verify that it works well-enough to create a tag. Tags should always be made off of trunk.
  3. Once a tag has been created, that tag should be deployed to each environment in turn.
  4. Testing is done per environment, and if any verifications fail (for example, there’s an issue in QA and it’s not a candidate for production), then create a new ticket to fix the issue and go back to #1.

Important: Avoid taking shortcuts, they will become more and more expensive.

Tag generation can be automated, and the selection of tags for environments can be automated as well (svn switch module).

Branch development:

It needs to be easy to have test instances using branches. Need to have it easy for spinning up instances, and you can either use puppet apply or have a puppet master that knows about each branch and has a separate environment for it. Person reviewing code can then spin up an instance to verify the module.

Release manager:

Garret has very successfully used the role of “release manager” in the past to facilitate branch-management. The RM is responsible for merging all branches back to trunk once they are stable. This person should also be responsible for monitoring commits to a branch so that they can offer constructive criticism early in the process (possibly pair-programming), particularly if there are people new to using puppet. The RM position can be a rolling person (e.g. he’s had it switch weekly in the past).

Multiple teams exchanging code:

  • Use multiple module paths.
  • Communication is super-important. Advocated using svn commit emails so that teams can see what other teams are doing.
  • Private github accounts are supposedly very useful, too.

Test driven development:

  • Use puppet apply to do manifest testing.
  • Recommends having a ${module}/test directory to put test files. The test file should show how to use the module you’re developing, so that it can be tested with puppet apply. The test should be written before the module. NOTE, this is not a unit test. You’re not verifying what puppet has done, only that it is exiting cleanly.
  • End-to-end testing should be done via monitoring. Machines add themselves to nagios and you can verify that services get started correctly that way. Suggestion is that all environments have monitoring for this end-end testing.

We briefly discussed a use-case of a company that runs hundreds of machines. This organization runs puppet via cron rather than as the puppet agent daemon in order to disable updates during busy hours. During those hours, puppet runs in noop mode so that reports are generated for Puppet Dashboard. When they are ready to roll-out changes during the change-window, they run puppet with noop to test what would happen and give a green light before actually applying the changes.

Summary

The April puppet meetup was great, and I definitely plan on attending future meetups to learn what people are doing.  I’d like to thank everyone that made the meetup possible and a great success!

Posted in Devops, Linux, Puppet | 1 Comment

Silently broken Gmail

At work, we have google apps, which comes with several gigs of gmail storage. For email, though, we use outlook server with a low quota. Rather than deleting email, I “archive” to gmail via IMAP.

One day, though, gmail IMAP silently stopped syncing. I could login, but no mail was being transfered. I tried everything — synchronizing accounts, rebuilding the mailbox, but nothing worked. I was hoping to enable some more verbose logging, when I came across an old article from eriklabs.com about Mail.app Logging.

After learning the magic incantation, I started up Mail.app to log all operations on port 993 (the SSL port that gmail IMAP uses).  A message in the logs immediately stood out:

5.1 BAD [ALERT] Message too large. http://mail.google.com/support/bin/answer.py?answer=8770

The prior message logged the timestamp of the offending email, which was over 25MB in size (the max size for Gmail).  Unfortunately, Mail.app had created a large number of copies of this email (over 1,000) and placed them in the “Recovered Items” folder.

Even after deleting all of these “recovered item” copies, Mail kept making new one.  I tracked down a copy of this file in the .OfflineBackups directory. After removing the file that gmail rejects (see http://automatica.com.au/2010/01/mail-app-and-its-offline-cache/ — I chose the file with the large file size in that directory), I was finally able to resync with gmail!

Unfortunately, mail.app was not happy that I removed a file from .OfflineBackups, and it refused to process the rest of the files in that directory.  I wrote the following python script to convert the unprocessed files in .OfflineBackups into a single .mbox file suitable for import into Mail.app.

Thankfully, in the end, everything is back to normal.

Posted in Apple, Programming | 1 Comment

two puppet tricks: combining arrays and local tests

  1. Joining Arrays
    I found myself wanting to join a bunch of arrays in my puppet manifests. I had 3 lists of ip addresses, but wanted to join all 3 lists together into a single list to provide all ips to a template. I found some good tricks for flattening nested arrays in an erb –http://weblog.etherized.com/posts/175 — but I found those solutions to be too “magic” and hard to read.I ended up settling on using an inline_template and split, like so:

    $all_ips = split(inline_template("<%= (worker_ips+entry_point_ips+master_ips).join(',') %>"),',')

  2. Testing Puppet Locally
    To debug things like this, I like to use puppet locally to test a configuration by writing a test.pp file and passing that into puppet, e.g.:File: arrays-test.pp
    define tell_me() { notify{$name:}}

    $worker_ips = [ "192.168.0.1",
    "192.168.0.2",
    ]
    $entry_point_ips = ["10.0.0.1",
    "10.0.0.2",
    ]
    $master_ips = [ "192.168.15.1",
    ]

    $all_ips = split(inline_template("<%= (worker_ips+entry_point_ips+master_ips).join(',') %>"),',')
    tell_me{$all_ips:}

    Running puppet:

    $ puppet arrays-test.pp
    notice: 192.168.0.2
    notice: /Stage[main]//Tell_me[192.168.0.2]/Notify[192.168.0.2]/message: defined 'message' as '192.168.0.2'
    notice: 192.168.15.1
    notice: /Stage[main]//Tell_me[192.168.15.1]/Notify[192.168.15.1]/message: defined 'message' as '192.168.15.1'
    notice: 192.168.0.1
    notice: /Stage[main]//Tell_me[192.168.0.1]/Notify[192.168.0.1]/message: defined 'message' as '192.168.0.1'
    notice: 10.0.0.1
    notice: /Stage[main]//Tell_me[10.0.0.1]/Notify[10.0.0.1]/message: defined 'message' as '10.0.0.1'
    notice: 10.0.0.2
    notice: /Stage[main]//Tell_me[10.0.0.2]/Notify[10.0.0.2]/message: defined 'message' as '10.0.0.2'

    Note that for these kinds of tests, I like to use notify rather than building files.

Posted in Linux | Tagged | 6 Comments

Python setup.py bdist_rpm on CentOS 5.5

I recently learned that python setup.py can be used to build a rpm using the bdist command.  Since we’re using puppet to manage installed software, this makes it really easy to add python modules to a bunch of servers.

During the process of trying to build a rpm, I ran into a problem: “Installed (but unpackaged) file(s) found.” The details of this problem are explained here: https://bugzilla.redhat.com/show_bug.cgi?id=198877

Comment #1 in the above bug offers a work-around, which I simplified into the following two commands:

echo 'python setup.py install --optimize 1 --root=$RPM_BUILD_ROOT --record=INSTALLED_FILES' > install.spec
python setup.py bdist_rpm --install-script install.spec --packager="Joe Crobak"

RPMs are placed in the dist dir.

Posted in Linux | Tagged , , | Leave a comment

Moving wordpress blog to lighttpd

I’ve moved my wordpress blog from a hosted account on godaddy.com to a server that’s running lighttpd on ubuntu. The move was more complex than I expected, so I thought I’d share some details for others…

  1. I already had lighttpd installed, but I followed these instructions to add php support to lighttpd: Lighttpd+PHP – Ubuntu Wiki.
  2. I followed these great instructions for installing wordpress: Installing WordPress 3.0 on Ubuntu 10.04…
  3. I used wordpress import/export to export my data from the old wordpress install. The old install was version 2.8, but the file imported just fine into wordpress 3.0. Here are some good instructions from wordpress.com: Moving a Blog.
  4. I had to install all the plugins I wanted on my new install.  This required my wordpress install be owned by the www-data user, which lighttpd is installed as.  I did a sudo chown -R ww-data:www-data on the install directory.
  5. I migrated my links from the old wordpress install using the instructions from wordpress.com: Import-Export Links.
  6. Make index.php the 404 handler, i.e. server.error-handler-404 = "/index.php" per Setting up a WordPress Blog on lighttpd.
  7. Add rewrite rules so that most urls in / gets handled by index.php per URL Rewriting for WordPress and lighttpd.
  8. Update the permalink settings to use /%year%/%monthnum%/%postname%/

In the end, my lighttpd config looks something like:

$HTTP["host"] == "www.crobak.org" {
  server.document-root = "/var/www/www.crobak.org"

  server.error-handler-404 = "/index.php"

  url.rewrite-final = (

    # Exclude some directories from rewriting
    "^/(wp-admin|wp-includes|wp-content|gallery2)/(.*)" => "$0",

    # Exclude .php files at root from rewriting
    "^/(.*.php)" => "$0",

    # Handle permalinks and feeds
    "^/(.*)$" => "/index.php/$1"
  )
}

I’ve also switched to the awesome new Twenty Ten 1.1 Theme by the WordPress team.

Posted in Linux | 3 Comments

JAVA_HOME on Mac OS X

I was working on configuring HBase to run on my Mac OS X machine, and I ran into a hiccup setting up the JAVA_HOME environment variable. Eventually, I determined that there’s a “Home” directory inside of each Java Framework. So, the full command is is:

export JAVA_HOME=/System/Library/Frameworks/JavaVM.framework/Versions/1.6.0/Home

Posted in Apple | Leave a comment