Hadoop MapReduce wordcount example in Java. Introduction to Hadoop job.

In this article we are going to review the classic Hadoop word count example, customizing it a little bit. As usual I suggest to use Eclipse with Maven in order to create a project that can be modified, compiled and easily executed on the cluster. First of all, download the maven boilerplate project from here: https://github.com/H4ml3t/maven-hadoop-java-wordcount-template

$ git clone git@github.com:H4ml3t/maven-hadoop-java-wordcount-template.git

If you want to compile it directly than you can

$ cd maven-hadoop-java-wordcount-template
$ mvn package

the result fat jar will be found in the target folder with name “maven-hadoop-java-wordcount-template-0.0.1-SNAPSHOT-jar-with-dependencies.jar“.

Alternatively, if you want to modify the code (like we are about to do now) open Eclipse and go for [File] -> [Import] -> [Existing maven project] -> Browse for the directory of the project -> [Finish]. To compile, right-click on the project folder, then [Run As] -> [Maven build…] -> specify as goals “clean package” -> [Run]. Like before you can find the result “with-dependency” jar in the target folder. Of course you can use any other Java IDE that supports maven or also a text editor and then the maven command!

Let’s first review what means doing a word count (or in general, executing a job) in Hadoop: there is a first phase in which input data are read, parsed and elaborated for the counting; than there is a middle phase in which elements are delivered and sorted across components, and the last phase in which there is the actual counting and result writing.

mapreduce job execution flow

The first phase is called “map” and thanks to the Hadoop environment you don’t have to do any operation to read files from HDFS. You just have to specify which files you want to process (I’ll show you how) and automatically they are given as input of the Mapper class. This class, defined by the user, is in charge of the first elaboration. Each Mapper read parts of input files and after elaborating them, emit couples (key, value). Then the “shuffle and sort” phase, brings partial result from Mappers to Reducers for the “reduce” phase, taking couples with same key to the same Reducer. The “shuffle and sort” phase is totally transparent for the user. So in the “reduce” phase, Reducers (implemented by the user) have as input couples with the same key and they can iterate over those values to produce another result that is written in HDFS. To recap: just 2 class are needed to be implemented by the user, a Mapper and a Reducer, but the job needs another class (a Driver) that can describe to Hadoop which are the Mapper and Reducer do be executed and with which files. Now let’s see how.

This is the simplified version of the Driver you can find in the Github repository (link), do not use this class since logs and imports are omitted, use the one in the repo:

public class Driver extends Configured implements Tool {
  public static void main(String[] args) throws Exception {
    try {
      int res = ToolRunner.run(new Configuration(), (Tool) new Driver(), args);
      System.exit(res);
    } catch (Exception e) {
      e.printStackTrace();
      System.exit(255);
    }
  }
  public int run(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "Hadoop wordcount nailed");
    job.setJarByClass(Driver.class);
    job.setMapperClass(WordcountMapper.class);
    job.setReducerClass(WordcountReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    job.waitForCompletion(true);
    return 0;
  }
}

The goal of the Driver class is to dialogue with the Hadoop environment, specifying which are the classes to be executed, input files, output folder and, very important, output format. In bold you can see the instructions by which we specify to the type of output of Mappers and of Reducers. Both the output of Mapper and Reducer are couples, and then we need to specify a “key” and a “value” class type. If the output for Mappers and Reducers are the same (in terms of types), in that case just one declaration is fine (like in this case). In Hadoop instead of normal primitive types (int, long, char, boolean, …), you have to use wrappers (Writable) that can serialize value across the environment (IntWritable, LongWritable, …), and String are replaced by the Text wrapper. Now let’s see why we have (Text,IntWritable) as output.

I have on my HDFS folder a file and I want to count each word.

$ hadoop fs -text /user/meniluca/nowomannocry
Said, said, said, I remember when we used to sit

In the government yard in Trenchtown,
Oba - obaserving the 'ypocrites. . .

wordcount-mapreduce-example

As you can see from the image, the Mapper take as input a file (or parts of files) and its map() method is executed for each line of the file assigned to the Mapper. We are passing the only file we want to analyze to the Driver, specifying the path while submitting the jar (see below). Then the map() function parse the line, splitting the string with spaces and emits, for each word found, a couple (word, 1); this is why we have as output types (String,int) or better (Text, IntWritable). In this way we are saying that the current word appeared one time. This is the code for the Mapper:

public class WordcountMapper extends Mapper<Text, IntWritable> {
  public static final IntWritable ONE = new IntWritable(1);
  @Override
  protected void map(LongWritable offset, Text line, Context context)
      throws IOException, InterruptedException {
    for (String word : line.toString().split(" ")) {
      context.write(new Text(word), ONE);
    }
  }
}

Note as the last two classes in the extended Mapper class, are defining the output types that are in the write() function (Text, IntWritable). The first two classes on the other hand are defining the input type that are specified in the first two parameters of the map() function (LongWritable, Text). In particular the ‘Text line’ parameter contains the entire line that the Mapper is currently processing. Here for example if we want to improve our output, we can clean each word (since we are splitting with spaces, all other characters are remaining) of select to write only some kind of word (for example word that are bigger than 3 char):

protected void map(LongWritable offset, Text line, Context context) {
  for (String word : line.toString().split(" ")) {
    word = word.replace(",","").replace("!","").toLowerCase();
    if (word.length() > 3) {
      context.write(new Text(word), ONE);
    }
  }
}

This will emit only words of interest (note also the lower case function: Said and said are different word otherwise). After the mapping phase, in which each mapper will process all the lines for all the input files, there is the shuffle and sort phase in which all couple generated by the Mappers are sent as input of the Reducers. This procedure is really important since allow all couples with the same key to be processed by the same Reducers: this is what the shuffle part is all about. Later, in case more than one key is sent to the same Reducer, the sort phase will organize all couples collecting the key-values with same key all together, transforming them as a couple of (key, List(values)), in which the list in this case is a list of IntWritable containing all the 1. Imagine that a Reducer has assigned all the couple with key “said” and “woman” after the shuffle phase:

(said,1)(woman,1)(said,1)(woman,1)(woman,1)... -> SORT -> (said,[1,1,..])(woman,[1,1,1,...])

The aim now for a reducer is to iterate for each word his list and do the sum! Here it is the code:

public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
   @Override
   protected void reduce(Text key, Iterable<IntWritable> values, Context context)
     throws IOException, InterruptedException {
     int count = 0;
     for (IntWritable current : values) {
       count += current.get();
     }
     context.write(key, new IntWritable(count));
   }
 }

Also here note as the first two class types in the extended Reducer are the output on the Mapper and input parameters of the reduce() function, meanwhile the last two are the output of the Reducer and input of the write() function. Now that we’ve modified our code and built a jar (like showed above), we can run it and see the result with the following command:

$ hadoop jar maven-hadoop-java-wordcount-template-0.0.1-SNAPSHOT-jar-with-dependencies.jar  \
/user/meniluca/nowomannocry  \
/user/meniluca/result-wordcount

$ hadoop fs -text /user/meniluca/result-wordcount/*
(woman,18)
(said,7)
(remember,3)
...

If you have any question please write a comment. Hope it helps!

meniluca 😀

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s