Requirement
Suppose we have a dataset which is in CSV format. We want to read the file in spark using Scala. So the requirement is to create a spark application which read CSV file in spark data frame using Scala.
Components Involved
Following components are involved:
- Spark RDD/Data Frame
- Scala
- IntelliJ
- SBT
Sample Data
Let’s have a look at the sample dataset which we will use for this requirement:
empno,ename,designation,manager,hire_date,sal,deptno 7369,SMITH,CLERK,7902,12/17/1980,800,20 7499,ALLEN,SALESMAN,7698,2/20/1981,1600,30 7521,WARD,SALESMAN,7698,2/22/1981,1250,30 7566,TURNER,MANAGER,7839,4/2/1981,2975,20 7654,MARTIN,SALESMAN,7698,9/28/1981,1250,30 7698,MILLER,MANAGER,7839,5/1/1981,2850,30 7782,CLARK,MANAGER,7839,6/9/1981,2450,10 7788,SCOTT,ANALYST,7566,12/9/1982,3000,20 7839,KING,PRESIDENT,NULL,11/17/1981,5000,10 7844,TURNER,SALESMAN,7698,9/8/1981,1500,30 7876,ADAMS,CLERK,7788,1/12/1983,1100,20 7900,JAMES,CLERK,7698,12/3/1981,950,30 7902,FORD,ANALYST,7566,12/3/1981,3000,20 7934,MILLER,CLERK,7782,1/23/1982,1300,10
Download the sample dataset from here:
emp_dataSolution
We are going to create a spark application using IntelliJ IDE. Follow the steps as mentioned below:
Step 1: Create Spark Application
The first step is to create a spark project with IntelliJ IDE with SBT.
Open IntelliJ. Once it opened, Go to File -> New -> Project -> Choose SBT
Click next and provide all the details like Project name and choose scala version. In my case, I have given project name ReadCSVFileInSpark and have selected 2.10.4 as scala version.
Step 2: Resolve Dependency
Adding below dependency:
libraryDependencies++=Seq( "org.apache.spark"%"spark-core_2.10"%"1.6.0", "org.apache.spark"%"spark-sql_2.10"%"1.6.0" )
It will download all the required packages.
Step 3: Write Code
In this step, we will write the code to read CSV file and load the data into spark rdd/dataframe.
import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf, SparkContext} object ReadCSVFile { case class Employee(empno:String, ename:String, designation:String, manager:String, hire_date:String, sal:String , deptno:String) def main(args : Array[String]): Unit = { var conf = new SparkConf().setAppName("Read CSV File").setMaster("local[*]") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val textRDD = sc.textFile("src\\main\\resources\\emp_data.csv") //println(textRDD.foreach(println) val empRdd = textRDD.map { line => val col = line.split(",") Employee(col(0), col(1), col(2), col(3), col(4), col(5), col(6)) } val empDF = empRdd.toDF() empDF.show() /* Spark 2.0 or up val empDF= sqlContext.read.format("csv") .option("header", "true") .option("inferSchema", "true") .load("src\\main\\resources\\emp_data.csv") */ } }
Let’s understand each line of the code:
case class – It has been created for modeling data. You can say for mapping data with the schema.
textFile – Function to load the dataset into RDD as a text file format
map – Function is used to map data set value with created case class Employee.
toDF – Function is used to transform RDD to Data Frame.
You can download the full spark application code from codebase page.
Step 4: Execution
Once all the above steps get over, we are ready for the execution to test our application. In order to execute the application, right-click on the script and choose run.
Step 5: Output
While executing, you will able to see below content in the console:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 17/11/28 23:29:19 INFO SparkContext: Running Spark version 1.6.0 +-----+------+-----------+-------+----------+----+------+ |empno| ename|designation|manager| hire_date| sal|deptno| +-----+------+-----------+-------+----------+----+------+ |empno| ename|designation|manager| hire_date| sal|deptno| | 7369| SMITH| CLERK| 7902|12/17/1980| 800| 20| | 7499| ALLEN| SALESMAN| 7698| 2/20/1981|1600| 30| | 7521| WARD| SALESMAN| 7698| 2/22/1981|1250| 30| | 7566|TURNER| MANAGER| 7839| 4/2/1981|2975| 20| | 7654|MARTIN| SALESMAN| 7698| 9/28/1981|1250| 30| | 7698|MILLER| MANAGER| 7839| 5/1/1981|2850| 30| | 7782| CLARK| MANAGER| 7839| 6/9/1981|2450| 10| | 7788| SCOTT| ANALYST| 7566| 12/9/1982|3000| 20| | 7839| KING| PRESIDENT| NULL|11/17/1981|5000| 10| | 7844|TURNER| SALESMAN| 7698| 9/8/1981|1500| 30| | 7876| ADAMS| CLERK| 7788| 1/12/1983|1100| 20| | 7900| JAMES| CLERK| 7698| 12/3/1981| 950| 30| | 7902| FORD| ANALYST| 7566| 12/3/1981|3000| 20| | 7934|MILLER| CLERK| 7782| 1/23/1982|1300| 10| +-----+------+-----------+-------+----------+----+------+
Wrapping Up
In this post, we have created a spark application using IntelliJ IDE with SBT. Here, we have loaded the CSV file into spark RDD/Data Frame without using any external package. Also, used case class to transform the RDD to the data frame. The CSV format is the common file format which gets used as a source file in most of the cases.
I have tried the above scenario and getting following error.
Exception in thread “main” java.lang.NoSuchMethodError: scala.collection.mutable.Buffer$.empty()Lscala/collection/GenTraversable;
at org.apache.spark.sql.SparkSessionExtensions.(SparkSessionExtensions.scala:69)
at org.apache.spark.sql.SparkSession$Builder.(SparkSession.scala:765)
at org.apache.spark.sql.SparkSession$.builder(SparkSession.scala:970)
at broadcastrights$.main(broadcastrights.scala:11)
at broadcastrights.main(broadcastrights.scala)