Read CSV file in Spark Scala

Read CSV file in Spark Scala

Requirement

Suppose we have a dataset which is in CSV format. We want to read the file in spark using Scala. So the requirement is to create a spark application which read CSV file in spark data frame using Scala.

Components Involved

Following components are involved:

  • Spark RDD/Data Frame
  • Scala
  • IntelliJ
  • SBT

Sample Data

Let’s have a look at the sample dataset which we will use for this requirement:

emp_data
 
  1. empno,ename,designation,manager,hire_date,sal,deptno
  2. 7369,SMITH,CLERK,7902,12/17/1980,800,20
  3. 7499,ALLEN,SALESMAN,7698,2/20/1981,1600,30
  4. 7521,WARD,SALESMAN,7698,2/22/1981,1250,30
  5. 7566,TURNER,MANAGER,7839,4/2/1981,2975,20
  6. 7654,MARTIN,SALESMAN,7698,9/28/1981,1250,30
  7. 7698,MILLER,MANAGER,7839,5/1/1981,2850,30
  8. 7782,CLARK,MANAGER,7839,6/9/1981,2450,10
  9. 7788,SCOTT,ANALYST,7566,12/9/1982,3000,20
  10. 7839,KING,PRESIDENT,NULL,11/17/1981,5000,10
  11. 7844,TURNER,SALESMAN,7698,9/8/1981,1500,30
  12. 7876,ADAMS,CLERK,7788,1/12/1983,1100,20
  13. 7900,JAMES,CLERK,7698,12/3/1981,950,30
  14. 7902,FORD,ANALYST,7566,12/3/1981,3000,20
  15. 7934,MILLER,CLERK,7782,1/23/1982,1300,10

Download the sample dataset from here:

emp_data

Solution

We are going to create a spark application using IntelliJ IDE. Follow the steps as mentioned below:

Step 1: Create Spark Application

The first step is to create a spark project with IntelliJ IDE with SBT.

Open IntelliJ. Once it opened, Go to File -> New -> Project -> Choose SBT

Click next and provide all the details like Project name and choose scala version. In my case, I have given project name ReadCSVFileInSpark and have selected 2.10.4 as scala version.

Step 2: Resolve Dependency

Adding below dependency:

build.sbt
 
libraryDependencies++=Seq(
"org.apache.spark"%"spark-core_2.10"%"1.6.0",
"org.apache.spark"%"spark-sql_2.10"%"1.6.0"
)

It will download all the required packages.

Step 3: Write Code

In this step, we will write the code to read CSV file and load the data into spark rdd/dataframe.

ReadCSVFile.scala
 
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
object ReadCSVFile {
  case class Employee(empno:String, ename:String, designation:String, manager:String, hire_date:String, sal:String , deptno:String)
  def main(args : Array[String]): Unit = {
    var conf = new SparkConf().setAppName("Read CSV File").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._
    val textRDD = sc.textFile("src\\main\\resources\\emp_data.csv")
    //println(textRDD.foreach(println)
    val empRdd = textRDD.map {
      line =>
        val col = line.split(",")
        Employee(col(0), col(1), col(2), col(3), col(4), col(5), col(6))
    }
    val empDF = empRdd.toDF()
    empDF.show()
    /* Spark 2.0 or up
      val empDF= sqlContext.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("src\\main\\resources\\emp_data.csv")
     */
  }
}

Let’s understand each line of the code:

case class – It has been created for modeling data. You can say for mapping data with the schema.

textFile – Function to load the dataset into RDD as a text file format

map – Function is used to map data set value with created case class Employee.

toDF – Function is used to transform RDD to Data Frame.

You can download the full spark application code from codebase page.

Step 4: Execution

Once all the above steps get over, we are ready for the execution to test our application. In order to execute the application, right-click on the script and choose run.

Step 5: Output

While executing, you will able to see below content in the console:

 
 
  1. Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
  2. 17/11/28 23:29:19 INFO SparkContext: Running Spark version 1.6.0
  3. +-----+------+-----------+-------+----------+----+------+
  4. |empno| ename|designation|manager| hire_date| sal|deptno|
  5. +-----+------+-----------+-------+----------+----+------+
  6. |empno| ename|designation|manager| hire_date| sal|deptno|
  7. | 7369| SMITH| CLERK| 7902|12/17/1980| 800| 20|
  8. | 7499| ALLEN| SALESMAN| 7698| 2/20/1981|1600| 30|
  9. | 7521| WARD| SALESMAN| 7698| 2/22/1981|1250| 30|
  10. | 7566|TURNER| MANAGER| 7839| 4/2/1981|2975| 20|
  11. | 7654|MARTIN| SALESMAN| 7698| 9/28/1981|1250| 30|
  12. | 7698|MILLER| MANAGER| 7839| 5/1/1981|2850| 30|
  13. | 7782| CLARK| MANAGER| 7839| 6/9/1981|2450| 10|
  14. | 7788| SCOTT| ANALYST| 7566| 12/9/1982|3000| 20|
  15. | 7839| KING| PRESIDENT| NULL|11/17/1981|5000| 10|
  16. | 7844|TURNER| SALESMAN| 7698| 9/8/1981|1500| 30|
  17. | 7876| ADAMS| CLERK| 7788| 1/12/1983|1100| 20|
  18. | 7900| JAMES| CLERK| 7698| 12/3/1981| 950| 30|
  19. | 7902| FORD| ANALYST| 7566| 12/3/1981|3000| 20|
  20. | 7934|MILLER| CLERK| 7782| 1/23/1982|1300| 10|
  21. +-----+------+-----------+-------+----------+----+------+

Wrapping Up

In this post, we have created a spark application using IntelliJ IDE with SBT. Here, we have loaded the CSV file into spark RDD/Data Frame without using any external package. Also, used case class to transform the RDD to the data frame. The CSV format is the common file format which gets used as a source file in most of the cases.

70
0

Join in hive with example

Requirement You have two table named as A and B. and you want to perform all types of join in ...
Read More

Join in pyspark with example

Requirement You have two table named as A and B. and you want to perform all types of join in ...
Read More

Join in spark using scala with example

Requirement You have two table named as A and B. and you want to perform all types of join in ...
Read More

Java UDF to convert String to date in PIG

About Code Many times it happens like you have received data from many systems and each system operates on a ...
Read More
/ java udf, Pig, pig, pig udf, string to date, udf

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.