HelloWorld Spark? Smart (selective) wordcount Scala example!

In the previous post I showed how to build a Spark Scala jar and submit a job using spark-submit, now let’s customize a little bit our main Scala Spark object. You can find the project of the following example here on github.

Let’s imagine we’ve collected a series of messages about football (tweets or whatever) and we want to count all words, but not simply every word, all those are of interest. Say we have a “dictionary” of football players’ names, and we want to see which of them appears the most in those messages.

Example time!

Imagine we have a file (called names) with a list of names (one per line):

Kane
Sirigu
Neymar

Totti

And in another file (called messages) we have a list of messages

I’m obviously with Harry Kane (Hurricane) today… Let’s go Tottenham!!!
Kid Kane with Arsenal jersey, lol
Another top save from Sirigu to keep Toulouse out. PSG seconds away from top spot.
Why Sirigu and Verratti are laughing so much?
Francesco Totti Scores With Flying Kung Fu Kick, Celebrates With Selfie
What would Roma do without Totti? See his great goal & celebration HERE
! #Totti #Goals
Wenger wasn’t the Arsenal coach when Totti started to play in Serie A

The result analyzing these lines has to be (Kane, 2), (Sirigu, 2), (Totti, 3). To achieve this, all we have to do is: parse each line, filter each word and for each of them see if it is the name of a player in the list. Then all players that doesn’t appear in the list (like Verratti in the example) won’t be taken in consideration. Moreover we will use the “equals()” function to check if the current word is a player, then without any particular treatment, the hashtag “#Totti”, won’t be counted (the same if there would be a word in uppercase “TOTTI”, and so on).

[meniluca@myclusternode ~]$ hadoop fs -ls /user/meniluca
    Found 2 items
    -rw-r--r--   3 meniluca sgroup    28 2015-03-02 14:50 names
    -rw-r--r--   3 meniluca sgroup  4440 2015-03-02 14:51 messages

These files are already in HDFS and we want to create a Spark Scala jar to be executed like this:

[meniluca@myclusternode ~]$ spark-submit --class com.example.SelectiveWordcount \
    --master yarn-cluster \
    spark-selective-wordcount-example.jar \
    /user/meniluca/names \ # arg(0)
    /user/meniluca/messages \ # arg(1)
    /user/meniluca/output-selective-wordcount # arg(2)

If the messages are scattered in files inside a folder in HDFS we can use the star “*” to select all of them, like this: /user/meniluca/dir-with-messages/*

You can find the full code of the Spark Scala maven project with the solution in this github repository. Download the code and build the project to create the jar with the command mvn package (the right jar is the one titled “with-dependencies”).

Now let’s go through the Scala object code you can find in the repository. The input files can be loaded with the textFiles() Spark function that reads directly from HDFS and return an RDD[String].

val names = sc.textFile(arg(0))
val files = sc.textFile(arg(1))

Then we have in the first variable all names. With the following command, we are creating for each instance of Spark executor an Array[String]

val namesArray = words.collect()

And then we can easily perform the selecting wordcount like this:

val selectiveWordCount = files
        .flatMap(_.split(" "))
        .filter(namesArray.contains(_))
        .map((_,1))
        .reduceByKey(_+_)

Let’s comment line per line considering the type result for each step. At the beginning files is a RDD[String] (you can consider an RDD like a set or a list, something on which you can iterate) in which each String is a line of the file parsed (a message in our case). Using the map (flatMap) function you will iterate over each element in the RDD applying the function given in the parentheses; consider the underscore “the current element” that will be replaced with his splitted form by spaces (the split of a string is an array). Then each message (String) is transformed in an Array[String], and this would be the result type of the RDD after the first function (RDD[Array[String]]) if it was a map() function; but instead it is a flatMap() which will flatten the RDD in order to have back an RDD[String] type. Example:

RDD[String] = { “Totti #oneclubman is the best”, “Neymar what a goal!” }
–> MAP –>
RDD[Array[String]] = { Array(“Totti”, “#oneclubman”, “is”, “the”, “best”), Array(“Neymar”, “what”, “a”, “goal!”)  }
–> FLAT –>
RDD[String] = { “Totti”, “#oneclubman”, “is”, “the”, “best”, “Neymar”, “what”, “a”, “goal!” }

The next operation is the filter() function that returns a RDD with elements that satisfy the condition “is contained in namesArray?”. Here it is where the method “equals” is used (inside “contains”) and why there is the exact matching check between words (also with letters size). So now, in the RDD[String] result, we will have only names of interest. Here starts the wordcount part, for each name in the RDD we are going to create a couple containing the name and the integer 1 with the function map. Remember, what is in the parenthesis is what are executed for each member of the RDD. The underscore is a shortcut used for the parameter substitution, and is the equivalent of map(name => (name,1)). After the map we have:

RDD[(String,Int)] = (Totti, 1) (Totti, 1) (Sirigu, 1) (Kane, 1) (Totti, 1) (Sirigu, 1) (Kane, 1)

Now the reduceByKey() has two inner phases, in the first it will gather all couple with the same key (same first value) creating a list for all remaining values:

(Totti, [1,1,1]) (Sirigu, [1,1]) (Kane, [1,1])

and in the second, the function specified in the parentheses is executed for each element. “_+_” That is the shortcut for “prev,curr => prev+curr”, the first underscore is the previous result and the second is the current value, in short will sum all the values in the array returning the final result:

(Totti, 3), (Kane, 2), (Sirigu, 2) = RDD[(String,Int)]

result that can be easily saved with the spark function

selectiveWordCount.saveAsTextFile(arg(2))

After launching the execution like descrived above, use the -text hadoop command to see result or the -getmerge operation

hadoop fs -text /user/meniluca/output-selective-wordcount/*
hadoop fs -getmerge /user/meniluca/output-selective-wordcount output-selective-wordcount

Please write any comment if you find an error (also English errors) or something is not so clear. Hope it helps.

meniluca 😀

Advertisements

One comment

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s