Spark examples: how to work with CSV / TSV files (performing selection and projection operation)

One of the most simple format your files may have in order to start playing with Spark, is CSV (comma separated value or TSV tab…). Let’s see how to perform, over a set of this files, some operation. As usual, I suggest you to create a Scala Maven project on Eclipse, compile a jar and execute it on the cluster with the spark-submit command.

See this previous article for detailed instructions about how to setup Eclipse for developing in Spark Scala and this other article to see how to build a Spark jat jar and submit a job.

Say we have a folder in HDFS that contains the partial result of a previous computation.

[meniluca@hadoopnode: ~ ] $ hadoop fs -ls /user/meniluca/wireshark-csv-files/
-rw-r--r--   3 lmeniche supergroup          0 2015-03-13 23:00 _SUCCESS
-rw-r--r--   3 lmeniche supergroup          0 2015-03-13 22:57 part-00000
-rw-r--r--   3 lmeniche supergroup          0 2015-03-13 22:58 part-00001
-rw-r--r--   3 lmeniche supergroup          0 2015-03-13 22:57 part-00002
-rw-r--r--   3 lmeniche supergroup          0 2015-03-13 22:58 part-00003
...

[meniluca@hadoopnode: ~ ] $ hadoop fs -text /user/meniluca/wireshark-csv-files/part-00000 | head
132,0,localhost,192.168.0.100,80,8234
0,3420,192.168.1.24,192.168.0.100,8888,9000
8538,0,192.168.0.100,nosqlnocry.com,80,45600
...

Let’s imagine our directory “wireshark-csv-files” contains csv files coming from some sort of elaboration of wireshark data with the following schema:

read bytes, write bytes, sourse ip/name, destination ip/name, outgoing port, ingoing port

Now let’s perform some operation, say we want to “grep” (select) all the lines that are referring to the host “192.168.0.100” as sourse ip connection. Our main Scala object will be right this:

object CSVOperations {
 def main(arg: Array[String]) {
  val conf = new SparkConf().setAppName("CSV wireshark selection")
  val sc = new SparkContext(conf)
  val pathToFiles = "/user/meniluca/wireshark-csv-files/*"
  val outputPath = "/user/meniluca/selection-result"
  val files = sc.textFile(pathToFiles)
  val rowsWithoutSpaces = 
      files.filter(line => 
         "192.168.0.100".equals(line.split(",")(2)))
  rowsWithoutSpaces.saveAsTextFile(outputPath)
 }
}

This will filter the lines containing the value “192.168.0.100” in the column in position number 2 inside the array after the split(). The result will be like:

[meniluca@hadoopnode: ~ ] $ hadoop fs -text /user/meniluca/selection-result/* | head
134,0,192.168.0.100,google.com,80,7546
3489735,0,192.168.0.100,nosqlnocry.com,80,4444
8538,0,192.168.0.100,192.168.3.42,8088,23400
...

Now let’s image we want to perform a “projection” over a column of the csv files, which means taking out all other values, leaving just the column we need. As result we will have a value per line.

object CSVOperations {
 def main(arg: Array[String]) {
  val conf = new SparkConf().setAppName("CSV wireshark projection")
  val sc = new SparkContext(conf)
  val pathToFiles = "/user/meniluca/wireshark-csv-files/*"
  val outputPath = "/user/meniluca/projection-result"
  val files = sc.textFile(pathToFiles)
  val rowsWithoutSpaces = files.map(_.split(",")(2))
  rowsWithoutSpaces.saveAsTextFile(outputPath)
 }
}

[meniluca@hadoopnode: ~ ] $ hadoop fs -text /user/meniluca/projection-result/* | head
192.168.0.100
192.168.0.100
localhost
192.168.1.24
192.168.0.100
...

Now, instead, we want to select all the lines in which the ip “192.168.0.100” has read more than 1000 bytes. Whit this is really easy to implement every possible combination of selection criteria.

object CSVOperations {
 def f(csv: Array[String]){
  return (csv(0).toInt >= 1000) && (csv(2).equals("192.168.0.100"))
 }
 def main(arg: Array[String]) {
  val conf = new SparkConf().setAppName("CSV wireshark selection if rb>1000")
  val sc = new SparkContext(conf)
  val pathToFiles = "/user/meniluca/wireshark-csv-files/*"
  val outputPath = "/user/meniluca/selection-rb-gt-1000-result"
  val files = sc.textFile(pathToFiles)
  val rowsWithoutSpaces = files.filter(f(_.split(",")))
  rowsWithoutSpaces.saveAsTextFile(outputPath)
 }
}

Another useful example is when you have to project more than one field, for example if you want to project the couple “ip source , ip destination“:

object CSVOperations {
 def main(arg: Array[String]) {
  val conf = new SparkConf().setAppName("CSV double projection")
  val sc = new SparkContext(conf)
  val pathToFiles = "/user/meniluca/wireshark-csv-files/*"
  val outputPath = "/user/meniluca/double-projection-result"
  val files = sc.textFile(pathToFiles)
  val rowsWithoutSpaces = files.map(line => line.split(",")(2)+","+line.split(",")(3))
  rowsWithoutSpaces.saveAsTextFile(outputPath)
 }
}

This is not a smart way to perform the mapping of the line, since you are executing a split(“,”) twice. This is a better way to do it:

  val rowsWithoutSpaces = files.map(_.split(",")).map(csv => csv(2)+","+csv(3))

Eventually, this is an example of a combination of  a selection and a double projection:

object CSVOperations {
 def main(arg: Array[String]) {
  val conf = new SparkConf().setAppName("CSV selection and double projection")
  val sc = new SparkContext(conf)
  val pathToFiles = "/user/meniluca/wireshark-csv-files/*"
  val outputPath = "/user/meniluca/last-test-result"
  val files = sc.textFile(pathToFiles)
  valrowsWithoutSpaces = files
        .map(_.split(","))
        .filter(csv => csv(0).toInt >= 1000 && csv(2).equals("192.168.0.100"))
        .map(csv => csv(2)+","+csv(3))
  rowsWithoutSpaces.saveAsTextFile(outputPath)
 }
}

Hope this will help, write a comment for any request or report a problem (and like the post)!

meniluca 😀

Advertisements

One comment

  1. Good day! I know this is somewhat off topic but I was
    wondering which blog platform are you using for this site?
    I’m getting fed up of WordPress because I’ve had problems with hackers and
    I’m looking at options for another platform. I would be awesome
    if you could point me in the direction of a good platform.

    Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s