Requirement
Let’s say we have a set of data which is in JSON format. The file may contain data either in a single line or in a multi-line. The requirement is to process these data using the Spark data frame.
In addition to this, we will also see how to compare two data frame and other transformations.
Sample Data
JSON data in a single line:
[{"empno":"7369","ename":"SMITH","designation":"CLERK","manager":"7902","hire_date":"12/17/1980","sal":"800","deptno":"20" },{"empno":"7499","ename":"ALLEN","designation":"SALESMAN","manager":"7698","hire_date":"2/20/1981","sal":"1600","deptno":"30"},{"empno":"7521","ename":"WARD","designation":"SALESMAN","manager":"7698","hire_date":"2/22/1981","sal":"1250","deptno":"30" },{"empno":"7566","ename":"TURNER","designation":"MANAGER","manager":"7839","hire_date":"4/2/1981","sal":"2975","deptno":"20"},{"empno":"7654","ename":"MARTIN","designation":"SALESMAN","manager":"7698","hire_date":"9/28/1981","sal":"1250","deptno":"30"},{"empno":"7698","ename":"MILLER","designation":"MANAGER","manager":"7839","hire_date":"5/1/1981","sal":"2850","deptno":"30"},{"empno":"7782","ename":"CLARK","designation":"MANAGER","manager":"7839","hire_date":"6/9/1981","sal":"2450","deptno":"10"},{"empno":"7788","ename":"SCOTT","designation":"ANALYST","manager":"7566","hire_date":"12/9/1982","sal":"3000","deptno":"20"},{"empno":"7839","ename":"KING","designation":"PRESIDENT","manager":"NULL","hire_date":"11/17/1981","sal":"5000","deptno":"10"}]
JSON data with multiline:
[ { "empno":"7369", "ename":"SMITH", "designation":"CLERK", "manager":"7902", "hire_date":"12/17/1980", "sal":"800", "deptno":"20" }, { "empno":"7499", "ename":"ALLEN", "designation":"SALESMAN", "manager":"7698", "hire_date":"2/20/1981", "sal":"1600", "deptno":"30" }, { "empno":"7521", "ename":"WARD", "designation":"SALESMAN", "manager":"7698", "hire_date":"2/22/1981", "sal":"1250", "deptno":"30" }, { "empno":"7566", "ename":"TURNER", "designation":"MANAGER", "manager":"7839", "hire_date":"4/2/1981", "sal":"2975", "deptno":"20" }, { "empno":"7654", "ename":"MARTIN", "designation":"SALESMAN", "manager":"7698", "hire_date":"9/28/1981", "sal":"1250", "deptno":"30" }, { "empno":"7698", "ename":"MILLER", "designation":"MANAGER", "manager":"7839", "hire_date":"5/1/1981", "sal":"2850", "deptno":"30" }, { "empno":"7782", "ename":"CLARK", "designation":"MANAGER", "manager":"7839", "hire_date":"6/9/1981", "sal":"2450", "deptno":"10" }, { "empno":"7788", "ename":"SCOTT", "designation":"ANALYST", "manager":"7566", "hire_date":"12/9/1982", "sal":"3000", "deptno":"20" }, { "empno":"7839", "ename":"KING", "designation":"PRESIDENT", "manager":"NULL", "hire_date":"11/17/1981", "sal":"5000", "deptno":"10" } ]
You can download the sample data from below:
Solution
Step 1: Setup
We will use the given sample data in the code. You can download the data from here and keep at any location. In my case, I have kept these file at ‘/home/bdp/data/employees_singleLine.json’ and ‘/home/bdp/data/employees_multiLine.json’
Step 2: Write Code and Execute
Once the spark-shell open, you can load the JSON data using the below command:
// Load json data: scala> val jsonData_1 = sqlContext.read.json("file:///home/bdp/data/employees_singleLine.json") // Check schema scala> jsonData_1.printSchema()
Here, We have loaded the JSON file data available at the local path. Now, load another JSON file data which are in multi-line.
scala> val jsonData_2 = sqlContext.read.json(sc.wholeTextFiles("file:///home/bdp/data/employees_multiLine.json").values) scala> jsonData_2.printSchema
Compare both the JSON data:
scala> jsonData_1.except(jsonData_2).show
Here, except function has used to compare both the data frame.
Check the data frame data.
// Check Data scala> jsonData_1.show()
// Get ename scala> jsonData_1.select("ename").show
// Get Distinct deptno scala> jsonData_1.select("deptno").distinct.show
Alternatively, we can create a temp table and execute the query on the table:
// Register a table scala> jsonData_1.registerTempTable("employeeTbl") // Get Distinct deptno using query on table scala> sqlContext.sql("select distinct deptno from employeeTbl").show
Note: Table name is case sensitive. It table name should be the same in the query as it has been created.
All the command used for the processing:
// Load JSON data: scala> val jsonData_1 = sqlContext.read.json("file:///home/bdp/data/employees_singleLine.json") scala> val jsonData_2 = sqlContext.read.json(sc.wholeTextFiles("file:///home/bdp/data/employees_multiLine.json").values) // Check the schema scala> jsonData_1.printSchema() scala> jsonData_2.printSchema() // Compare the data frame scala> jsonData_1.except(jsonData_2).show // Check Data scala> jsonData_1.show() // Get ename scala> jsonData_1.select("ename").show // Get Distinct deptno scala> jsonData_1.select("deptno").distinct.show // Register a table scala> jsonData_1.registerTempTable("employeeTbl") // Get Distinct deptno using query on table scala> sqlContext.sql("select distinct deptno from employeeTbl").show
You can download the script from the below link:
In Spark 2.0:
Load the JSON file data using below command:
scala> spark.read.option("multiLine", true).option("mode", "PERMISSIVE").json("file:///home/bdp/data/employees_multiLine.json").show
Wrapping Up
In this post, we have gone through how to parse the JSON format data which can be either in a single line or in multi-line. We also have seen how to fetch a specific column from the data frame directly and also by creating a temp table. The except function have used to compare two data frame in order to check both are having the same data or not. You can explore more by doing more transformation and action on the created data frame.