Exploring Scalable Data Processing with Apache Hadoop

Exploring Scalable Data Processing with Apache Hadoop

By Tom Wheeler, OCI Principal Software Engineer

November 2008

As the undisputed Web search leader, there's no question that Google scales to meet increasing demand. I've been intrigued by the architecture that makes this possible since first reading of it in The Anatomy of a Large-Scale Hypertextual Web Search Engine published several years ago at Stanford University by Google's founders.

What appealed to me most about this paper was that Brin and Page described processing vast amounts of data without relying on the "big iron" hardware typically associated with high-volume data processing. Instead of using mainframes, supercomputers or huge multiprocessor machines with expensive fibre-channel attached storage systems, the design relies on spreading both computation and storage across an array of commodity PCs.

Such cheap hardware is likely to fail, but that's exactly the point. There's far more potential for growth when you scale out instead of scale up. Besides, you'll be able to replace the machine with something faster and cheaper when it finally does fail.

Google's architecture has likely changed a lot since that paper was published, but these core concepts remain the same. In fact, company engineers have since published a few more interesting papers describing the Google File System (GFS) and MapReduce. These ideas have inspired developers to create Apache Hadoop, an open source project that lets you take advantage of this scalable architecture for your own projects.

In this article, I explain some interesting ways organizations are using Hadoop, introduce many of its fundamental concepts and connect you with the best information I've found for learning more about it.

The project's unusual name comes from the name of stuffed elephant belonging to the son of Hadoop's creator, Doug Cutting. Although Hadoop has been getting a lot of attention recently, it's not new. Hadoop can trace its roots back a few years to the Nutch project, which itself came from the popular Lucene search engine library. Hadoop became a top-level Apache project this January and has since spawned several new projects of its own. I will mention a few of these later in the article.

Who's Using Hadoop?

Google may have pioneered this architecture, but the fact that Hadoop is open source means that anyone can use or improve it. And lots of companies do, including Facebook, ImageShack, Last.fm and Rackspace. Yahoo! has been the greatest contributor to Hadoop and is also probably its largest user. This summer, they used a Hadoop cluster to win the speed record for sorting and serializing a terabyte of data, which they were able to do in just 209 seconds using a cluster of more than 900 machines. More recently, their developer blog described what is thought to be the world's largest Hadoop cluster, which contains 4,000-nodes and 16-petabytes of raw disk capacity.

Derek Gottfrid explained how the New York Times used Hadoop to convert 11 million vintage articles to PDF format. Using an army of machines on Amazon's EC2 cluster, they completed the job in a single day at an estimated cost of just $240! This conversion allowed them to build the very interesting TimesMachine site which gives us a chance to see the original stories about many important historical events like the assassination of President Lincoln and the sinking of the Titanic.

Now that I've shown why Hadoop is an interesting project, I'll explain how it works.


MapReduce is a programming model pioneered by Google but which has strong roots in functional languages like Lisp and Haskell. It makes use of two functions: map and reduce.

The map function takes data as key/value pairs and performs some operation on it to produce zero or more intermediate key/value pairs. There's no requirement that the data type for the input match that of the output and it's not necessary for the output keys be unique.
The reduce function (known as fold or foldr in some functional languages) is called once for each unique key from the map function's output. Along with this key, it is passed a list of all values associated with that key so it can perform some merge operation to ultimately yield a smaller set of values.

The classic example of a word counting program can help put this into perspective. We'll keep it simple and concentrate on what data actually is used for input and output of the map and reduce functions. First, assume that we have a plain text file named file1.txt as input:

This is a file
It is a very simple file

The configuration of the MapReduce job will determine exactly what values are passed to the map function as a key/value pair. The key is ignored in this example, but it could represent the current file name or position in that file. The value in this example is a list of all lines in that file. The map function can process this by iterating over the lines and splitting them into individual words, perhaps using java.util.StringTokenizer. Remember that the goal of map is simply to produce a series of intermediate key/value pairs to be processed by the reduce function, but these need not be unique key/value pairs. Therefore, for each word we find, we'll use that word as the key and something arbitrary (such as the number 1) as the value. You'll see in a moment why the value is not important for this example.

I'll use a colon as a delimiter to illustrate the key/value pairs that would be output from the map function when using the file shown earlier as input:

This   : 1
is     : 1
a      : 1
file   : 1
It     : 1
is     : 1
a      : 1
very   : 1
simple : 1
file   : 1

The reduce function will be called once for each unique key. In addition to that unique key, it will be passed a list of values associated with that key. Therefore, given the output from the map function shown above, the input for the first few calls to our reduce function would be:

This   :  (1)
is     :  (1, 1)
a      :  (1, 1)
file   :  (1, 1)
It     :  (1)
very   :  (1)
simple :  (1)

We'll be able to count the number of word occurrences by simply counting the elements in the list, which is why the value for these elements does not matter for this example. We'd almost certainly want to use a meaningful value when developing a more complex job. For example, a job which builds an index of pages on an intranet might return keywords from its map function.

Here's the output from the word count MapReduce program when run on the previous input file:

This   1
is     2
a      2
file   2
It     1
very   1
simple 1

While this word count example is a simple illustration of how MapReduce works, it should also help to show that MapReduce programs are inherently simple. The difficult part is the infrastructure which distributes the MapReduce calculations and associated data across a series of machines in a cluster, monitors the progress of active jobs and transparently handles machine failure. Luckily, Hadoop provides the infrastructure to do all of those things for us.

The Hadoop APIs

In practice, writing a program to run a Hadoop job is a little more involved than what I've shown, but the concepts are the same. I did not explain how the values were passed into map function or how either map or reduce should supply the key/value pairs they produce. Hadoop provides an API for programming with the MapReduce model which addresses these concerns. There are three major components to a MapReduce job: the MapperReducer and the JobConf. These classes will likely reference additional API classes such as InputFormatOutputFormatOutputCollectorReporter and RecordWriter to help read or write the data, report on job progress or provide final job output.

The JobConf describes the pertinent details of the job itself, including its name, the data types for the key and value, the class to use for the map and reduce functions, and information about the input and output data used for the job.

The Mapper class does the work described by the map function. It has a single method map which is passed a key, a value, an OutputCollector for accepting the key/value pairs created as output, and a Reporter which is used for indicating progress. The key and value are both defined using generics and the key type generally implements the Comparable interface. The Hadoop API defines the Writable interface, typically implemented by both the key and the value, which allows Hadoop to gain extra performance by using a custom serialization protocol. To aid in implementing both the Comparable and Writable interfaces, Hadoop also provides the WritableComparable interface along with a number of convenience classes which implement it for common data types.

The Reducer class does the work described by the reduce function. It has a single method reducewhich is passed a key, an Iterator which supplies values, an OutputCollector for gathering output pairs, and a Reporter for indicating progress. Like the Mapper, the key and value types are defined using generics and the key typically implements the Comparable interface. The values provided by the Iterator typically implement both Comparable and Writable.

Putting it all together

The Hadoop MapReduce tutorial shows a complete program for counting words in text files similar to the one I've described.

Hadoop Architecture

Hadoop's scalability comes from the fact that the map and reduce operations can be run in parallel across several machines by breaking the input into smaller chunks. This is mainly handled by the machine playing the role of JobTracker node in the cluster. The calculations themselves are handled by the machines acting as the TaskTracker. In any Hadoop cluster, there will be exactly one JobTracker and one or more TaskTrackers.

One of the most important components of Hadoop's infrastructure comes from the distributed file system which is based on concepts from the Google filesystem paper.

The Hadoop Distributed File System

All modern computers have some sort of filesystem; some of the most common are NTFS (Microsoft Windows), ext3 (GNU/Linux), HFS Plus (Mac OS X) and ZFS (Solaris). The Hadoop Distributed File System (HDFS) is similar to these filesystems in that its purpose is to organize files in a hierarchical namespace for storage and retrieval. But HDFS also has two fundamental differences with those filesystems. Although the typical filesystems mentioned above can span multiple disks, they are not intended to span multiple computers as HDFS does. Also, HDFS runs in user space, as contrasted to the other filesystems which are inextricably linked to their operating systems's kernel. Therefore, you could conceivably run HDFS on any operating system supported by Java and avoid the risk that a filesystem problem could crash the machine(s) on which it runs. The tradeoff is that you won't be able to use the tools you typically use for working with the filesystem, such as Windows Explorer, Mac OS X Finder or UNIX shell commands like cpmv or rm. Instead, Hadoop provides a series of user commands for working with HDFS which are likely to make UNIX users feel right at home.

HDFS assumes that hardware is unreliable and will eventually fail. It is somewhat similar in concept to certain RAID levels which seek to offset this risk by replicating data blocks throughout the system. ButRAID merely replicates data across disks on a single machine: HDFS can replicate data across multiple machines in a cluster. This scheme provides not only fault tolerance, but also the potential for extremely high capacity storage given that the overall capacity will be based on all usable space of all disks across all machines. HDFS also assumes that the data will be written only once and is able to gain extra performance by optimizing for subsequent reads while disallowing subsequent writes.

Because Hadoop deals with large volumes of data and because moving large amounts of data will be constrained by either network transfer speed or disk write speed, HDFS operates on the principle that "moving computation is cheaper than moving data." In other words, HDFS makes it possible for calculations to be run on the machine where the data resides, rather than moving data to where the calculations take place. HDFS is said to be "rack aware" meaning that it can be configured to know about the proximity of machines to one another and replicate data near the nodes which might need it.

Just as with other filesystems, the data block is the logical unit of storage. All files — regardless of size — take up at least one data block in the filesystem. The data comprising larger files will be spread across multiple data blocks (and in this case all but the final block will be filled to capacity). HDFS can replicate these blocks across machines. Hadoop jobs tend to use very large files as input and HDFS is tuned to handle these efficiently. An example of this is the default block size, which ranges from 4 KB to 8 KB for various UNIX filesystems. HDFS has a default block size of 64 MB.

HDFS defines two roles for computers to play: NameNode and DataNode. Put simply, a DataNode is responsible for low-level operations including block creation, deletion, reads and writes. A NameNode keeps track of which DataNodes have which blocks of data and uses this information to manage the hierarchy of the overall filesystem. Each cluster will have exactly one NameNode (plus a secondary NameNode for checkpointing), but will have many DataNodes. One machine may fill several of these roles simultaneously in a very small Hadoop installation.

Installing and Running Hadoop

Most large Hadoop installations tend to use GNU/Linux servers, perhaps because the licensing cost of proprietary operating systems would be prohibitive. Hadoop has been heavily used in GNU/Linux and so it's the recommended environment, along with a Sun JDK, for production use. Although the Hadoop quickstart page lists GNU/Linux as the only supported production platform, Hadoop is known to also work under Microsoft Windows, Mac OS X and BSD UNIX. There's even an OpenSolaris-based Hadoop Live CD available.

Hadoop supports three distinct modes of operation. A simple Hadoop installation is likely to run in the default "non-distributed" mode in which Hadoop runs as a single process on a single machine. Running in non-distributed mode is useful when first learning Hadoop since you won't need to worry about communication between machines. For the same reason, running in non-distributed mode can be helpful for isolating problems or debugging. The second mode, known as "pseudo-distributed operation," helps to simulate a larger installation with a single machine by running multiple Hadoop processes on that machine. The final mode, called "fully-distributed mode," is the most useful since it's the only one of these three nodes which allows Hadoop to run across multiple machines.

Installing Hadoop is fairly straightforward for any Java developer who has some experience with managing a GNU/Linux system, since it also uses Secure Shell (ssh) to control services across the cluster. I'll list the basic steps here and then follow this with links that explain the process in greater detail.

  1. Install Java 
    The Hadoop quickstart page lists Java 1.5.x as the required version, though I had no problem using Java 1.6.0. Technically Hadoop nodes only need the JRE rather than the entire JDK, but I find using the JDK is better; it provides the jps tool for viewing Java process information which comes in handy for checking the status of a multinode cluster.
  2. Install and Configure ssh / rsync 
    You need to install ssh and rsync on all machines in your Hadoop cluster. Many Linux distributions come with these tools already installed, but installing Cygwin (a UNIX-style environment) is the most common way of getting these tools for Microsoft Windows. Each machine should be configured with publickey authentication for ssh so that the commands can run without the need for interactive login. In other words, you should be able to type ssh machine1 from a shell prompt on the master node and immediately get a shell prompt on a machine named machine1 without having to type a password. You should be able to repeat this for every other node in your cluster with the same results.
  3. Install and Configure Hadoop 
    Download the most recent stable release of Hadoop and unpack it to some directory. Although there are many configuration options available, Hadoop is set to run as a single Java process on a single node by default. Therefore, the only configuration change you mustmake to get it running is to edit the conf/hadoop-env.sh script and set JAVA_HOME to point to the directory containing your JRE or JDK.
  4. Format the NameNode 
    You must initialize HDFS before you can use it for the first time. To do this, run the following command from the Hadoop installation directory on the machine which is acting as your namenode:
bin/hadoop namenode -format

Don't be alarmed by the name of this command — while it will erase any data in Hadoop's filesystem such as input and output from previous Hadoop jobs, it won't erase any data on your computer's filesystem such as your resume, MP3 collection or vacation pictures.

    5.  Start Hadoop 

      Run the following command from the Hadoop installation directory on the machine which is acting as your namenode. This will start the HDFS process:


Run the following command from the Hadoop installation directory on the machine which is acting as your JobTracker. This will start the JobTracker process on this host and will also start the TaskTracker process on all machines you have configured, via the conf/slaves file, to work on MapReduce problems:


Installation under Linux and Microsoft Windows

Michael Noll has written the most thorough instructions I've found for installing Hadoop in Linux. He describes how to set up both a single-node Hadoop cluster and a multi-node Hadoop cluster. If you're running Microsoft Windows, then Hayes Davis' article about running Hadoop on Windows will give you a lot of helpful tips on getting your cluster going.

Running Your First Hadoop Job

Once you've installed and configured Hadoop, the easiest way to test your installation is to run a job based on the example programs distributed with Hadoop. Perhaps the simplest of these is the Pi estimator, which seeks to estimate the value of Pi using the Monte Carlo method. It requires no file input and provides no file output, it just requires two command line arguments to tell it the number of maps and the number of samples to use for the estimation. To run it, type the following command from the root directory of your Hadoop installation:

bin/hadoop jar hadoop-0.18.1-examples.jar pi 20 10                

You'll see some output from Hadoop as it describes the current state of the job. Eventually you'll see the result printed to standard output similar to this:

Job Finished in 39.741 seconds
Estimated value of PI is 3.16

Related Projects

As I mentioned earlier, Hadoop has inspired several interesting projects, many of which have equally interesting names:

HBase is a distributed system for managing the storage, indexing and retrieval of structured data. This project was inspired by Google's Bigtable paper. The main focus of HBase is scalability — it claims to support billions of rows and millions of columns. A recent discussion on the Hadoop mailing list estimated that HBase could support up to 27 petabytes of data. Unfortunately, I lack the disk space to verify this claim.
Hive is an infrastructure based on Hadoop which provides data warehousing capabilities. It supports table-like structures with support for metadata and ad hoc queries using a SQL-like language. Hive was developed by Facebook but has been submitted to Hadoop as a contributed project.
Pig, named in honor of Pig Latin, was created by researchers at Yahoo! who sought to provide a high-level language for data analysis. A program written in this language gets compiled down to MapReduce programs which are run by Hadoop. Yahoo! has since contributed Pig to the Apache Software Foundation where it can grow alongside Hadoop.
The Mahout project, whose name derives from the Hindi word for a person who drives an elephant, aims to provide a suite of scalable libraries for machine learning. The project's scalability comes from parallelizable operation using the Hadoop framework. The Taste Collaborative Filtering project recently donated its codebase to Mahout, which might allow you to implement a very effective recommendation engine like those you may have seen at Amazon, NetFlix or Pandora.
The Cascading project allows developers to define potentially very large data processing jobs, which can be run across a cluster of machines in a fault tolerant manner. Although Hadoop is the current means of storage and execution, Cascading can be seen as an abstraction of Hadoop. In other words, a program which makes use of Cascading could run on other computing frameworks supported by future versions of Cascading. One compelling feature of this projects is the ability to define data processing applications in Groovy.
ZooKeeper allows nodes in a cluster to coordinate with one another through the shared hierarchy of metadata. This hierarchy is replicated across some — possibly all — nodes within that cluster such that no node can be a single point of failure. Although the structure of this hierarchy is conceptually somewhat similar to a UNIX filesystem, the focus of ZooKeeper is on the synchronization of this data. ZooKeeper might therefore be a good choice for implementing things like locks or event notification in a distributed application. The project seems to have been inspired by Google's The Chubby Lock Service for Loosely-Coupled Distributed Systems paper.


Apache Hadoop is an exciting project which makes a low-cost, high-performance architecture available to everyone. This article explained how companies are using it, introduced the core concepts of MapReduce and the Hadoop infrastructure, and referenced some of the best documentation available for learning more about it. Although Hadoop is not appropriate for solving every problem, it's certainly worth considering when you need a scalable and reliable batch-oriented approach for processing large volumes of data. And since most developers probably have access to a few idle machines, perhaps the most exciting aspect of the Hadoop is imagining the new ways we can use it.

Tom Wheeler would like to thank Michael Easter, Lance Finney, Mafish Liu and Amit Kumar Saha for their help in reviewing this article.