Requirement
In the last post, we have seen how to merge two data frames in spark where both the sources were having the same schema. Now, let’s say the few columns got added to one of the sources. In this case, both the sources are having a different number of a schema. In this post, we are going to merge two DataFrames with different schema.
Sample Data
Emp_data1.csv
empno,ename,designation,manager,hire_date,sal,deptno,location 9369,SMITH,CLERK,7902,12/17/1980,800,20,BANGALORE 9499,ALLEN,SALESMAN,7698,2/20/1981,1600,30,HYDERABAD 9521,WARD,SALESMAN,7698,2/22/1981,1250,30,PUNE 9566,TURNER,MANAGER,7839,4/2/1981,2975,20,MUMBAI 9654,MARTIN,SALESMAN,7698,9/28/1981,1250,30,CHENNAI 9369,SMITH,CLERK,7902,12/17/1980,800,20,KOLKATA
Emp_data2.csv
empno,ename,designation,manager,hire_date,sal,deptno 8369,SMITH,CLERK,7902,12/17/1980,800,20 8499,ALLEN,SALESMAN,7698,2/20/1981,1600,30 8521,WARD,SALESMAN,7698,2/22/1981,1250,30 8566,TURNER,MANAGER,7839,4/2/1981,2975,20 8654,MARTIN,SALESMAN,7698,9/28/1981,1250,30 8369,SMITH,CLERK,7902,12/17/1980,800,20
Here, the Emp_data1.csv file have an additional column named location.
Components Involved
- Spark 2.x
- CSV
Solution
Step 1: Read CSV file data
val emp_dataDf1 = spark.read.format("csv") .option("header", "true") .load("./My-Work-Book/BDP/data/emp_data1.csv)
val emp_dataDf2 = spark.read.format("csv") .option("header", "true") .load("./My-Work-Book/BDP/data/emp_data2.csv")
Step 2: Merging Two DataFrames
We have loaded both the CSV files into two Data Frames. Let’s try to merge these Data Frames using below UNION function:
val mergeDf = emp_dataDf1.union(emp_dataDf2)
We will get the below exception saying UNION can only be performed on the same number of columns.
Approach 1: When you know the missing column name
You can add the missing column in a Data Frame:
val emp_dataWithColDf = emp_dataDf2.withColumn("location", lit(null))
val mergeDf = emp_dataDf1.union(emp_dataWithColDf)
Approach 2: Generic way to find missing and add a column with a dummy value in a DataFrames
def customSelect(availableCols: List[String], requiredCols: List[String]) = { requiredCols.map(column => column match { case column if availableCols.contains(column) => col(column) case _ => lit(null).as(column) }) } Val mergeDf2 = emp_dataDf2.select(customSelect( emp_dataDf2.columns.toList, emp_dataDf1.columns.toList) :_*) mergeDf2.show
Approach 3: Generic way to find missing and add column(s) with a dummy value in all DataFrames
val emp_data1Cols = emp_dataDf1.columns.toSet val emp_data2Cols = emp_dataDf2.columns.toSet val requiredColumns = emp_data1Cols ++ emp_data2Cols // union def customSelect(availableCols: Set[String], requiredCols: Set[String]) = { requiredCols.toList.map(column => column match { case column if availableCols.contains(column) => col(column) case _ => lit(null).as(column) }) } Val mergeDf3 = emp_dataDf1.select(customSelect( emp_data1Cols, requiredColumns):_*) .union (emp_dataDf2.select(customSelect( emp_data2Cols, requiredColumns):_*))
Wrapping Up
In this post, we have learned how we can merge DataFrames when having different schema, and how to overcome this problem by applying withColumn straight forward with a dummy value or how to apply dynamically. This kind of scenario is very practical and comes into the pictures when you are playing with multiple source data.