Read CSV file in Spark Scala

Requirement

Suppose we have a dataset which is in CSV format. We want to read the file in spark using Scala. So the requirement is to create a spark application which read CSV file in spark data frame using Scala.

Components Involved

Following components are involved:

  • Spark RDD/Data Frame
  • Scala
  • IntelliJ
  • SBT

Sample Data

Let’s have a look at the sample dataset which we will use for this requirement:

 empno,ename,designation,manager,hire_date,sal,deptno
7369,SMITH,CLERK,7902,12/17/1980,800,20
7499,ALLEN,SALESMAN,7698,2/20/1981,1600,30
7521,WARD,SALESMAN,7698,2/22/1981,1250,30
7566,TURNER,MANAGER,7839,4/2/1981,2975,20
7654,MARTIN,SALESMAN,7698,9/28/1981,1250,30
7698,MILLER,MANAGER,7839,5/1/1981,2850,30
7782,CLARK,MANAGER,7839,6/9/1981,2450,10
7788,SCOTT,ANALYST,7566,12/9/1982,3000,20
7839,KING,PRESIDENT,NULL,11/17/1981,5000,10
7844,TURNER,SALESMAN,7698,9/8/1981,1500,30
7876,ADAMS,CLERK,7788,1/12/1983,1100,20
7900,JAMES,CLERK,7698,12/3/1981,950,30
7902,FORD,ANALYST,7566,12/3/1981,3000,20
7934,MILLER,CLERK,7782,1/23/1982,1300,10

Download the sample dataset from here:

emp_data

Solution

We are going to create a spark application using IntelliJ IDE. Follow the steps as mentioned below:

Step 1: Create Spark Application

The first step is to create a spark project with IntelliJ IDE with SBT.

Open IntelliJ. Once it opened, Go to File -> New -> Project -> Choose SBT

Click next and provide all the details like Project name and choose scala version. In my case, I have given project name ReadCSVFileInSpark and have selected 2.10.4 as scala version.

Step 2: Resolve Dependency

Adding below dependency:

 libraryDependencies++=Seq(
"org.apache.spark"%"spark-core_2.10"%"1.6.0",
"org.apache.spark"%"spark-sql_2.10"%"1.6.0"
)

It will download all the required packages.

Step 3: Write Code

In this step, we will write the code to read CSV file and load the data into spark rdd/dataframe.

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

object ReadCSVFile {

  case class Employee(empno:String, ename:String, designation:String, manager:String, hire_date:String, sal:String , deptno:String)

  def main(args : Array[String]): Unit = {
    var conf = new SparkConf().setAppName("Read CSV File").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._

    val textRDD = sc.textFile("src\\main\\resources\\emp_data.csv")
    //println(textRDD.foreach(println)

    val empRdd = textRDD.map {
      line =>
        val col = line.split(",")
        Employee(col(0), col(1), col(2), col(3), col(4), col(5), col(6))
    }

    val empDF = empRdd.toDF()
    empDF.show()

    /* Spark 2.0 or up
      val empDF= sqlContext.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("src\\main\\resources\\emp_data.csv")
     */
  }
}

Let’s understand each line of the code:

case class – It has been created for modeling data. You can say for mapping data with the schema.

textFile – Function to load the dataset into RDD as a text file format

map – Function is used to map data set value with created case class Employee.

toDF – Function is used to transform RDD to Data Frame.

You can download the full spark application code from codebase page.

Step 4: Execution

Once all the above steps get over, we are ready for the execution to test our application. In order to execute the application, right-click on the script and choose run.

Step 5: Output

While executing, you will able to see below content in the console:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
17/11/28 23:29:19 INFO SparkContext: Running Spark version 1.6.0
+-----+------+-----------+-------+----------+----+------+
|empno| ename|designation|manager| hire_date| sal|deptno|
+-----+------+-----------+-------+----------+----+------+
|empno| ename|designation|manager| hire_date| sal|deptno|
| 7369| SMITH|      CLERK|   7902|12/17/1980| 800|    20|
| 7499| ALLEN|   SALESMAN|   7698| 2/20/1981|1600|    30|
| 7521|  WARD|   SALESMAN|   7698| 2/22/1981|1250|    30|
| 7566|TURNER|    MANAGER|   7839|  4/2/1981|2975|    20|
| 7654|MARTIN|   SALESMAN|   7698| 9/28/1981|1250|    30|
| 7698|MILLER|    MANAGER|   7839|  5/1/1981|2850|    30|
| 7782| CLARK|    MANAGER|   7839|  6/9/1981|2450|    10|
| 7788| SCOTT|    ANALYST|   7566| 12/9/1982|3000|    20|
| 7839|  KING|  PRESIDENT|   NULL|11/17/1981|5000|    10|
| 7844|TURNER|   SALESMAN|   7698|  9/8/1981|1500|    30|
| 7876| ADAMS|      CLERK|   7788| 1/12/1983|1100|    20|
| 7900| JAMES|      CLERK|   7698| 12/3/1981| 950|    30|
| 7902|  FORD|    ANALYST|   7566| 12/3/1981|3000|    20|
| 7934|MILLER|      CLERK|   7782| 1/23/1982|1300|    10|
+-----+------+-----------+-------+----------+----+------+

Wrapping Up

In this post, we have created a spark application using IntelliJ IDE with SBT. Here, we have loaded the CSV file into spark RDD/Data Frame without using any external package. Also, used case class to transform the RDD to the data frame. The CSV format is the common file format which gets used as a source file in most of the cases.

Sharing is caring!

Subscribe to our newsletter
Loading

1 Comment

  1. Lawrance Amburose

    I have tried the above scenario and getting following error.

    Exception in thread “main” java.lang.NoSuchMethodError: scala.collection.mutable.Buffer$.empty()Lscala/collection/GenTraversable;
    at org.apache.spark.sql.SparkSessionExtensions.(SparkSessionExtensions.scala:69)
    at org.apache.spark.sql.SparkSession$Builder.(SparkSession.scala:765)
    at org.apache.spark.sql.SparkSession$.builder(SparkSession.scala:970)
    at broadcastrights$.main(broadcastrights.scala:11)
    at broadcastrights.main(broadcastrights.scala)

Leave a Reply