Merge Two DataFrames With Different Schema in Spark

Requirement

In the last post, we have seen how to merge two data frames in spark where both the sources were having the same schema. Now, let’s say the few columns got added to one of the sources. In this case, both the sources are having a different number of a schema. In this post, we are going to merge two DataFrames with different schema.

Sample Data

Emp_data1.csv

empno,ename,designation,manager,hire_date,sal,deptno,location
9369,SMITH,CLERK,7902,12/17/1980,800,20,BANGALORE
9499,ALLEN,SALESMAN,7698,2/20/1981,1600,30,HYDERABAD
9521,WARD,SALESMAN,7698,2/22/1981,1250,30,PUNE
9566,TURNER,MANAGER,7839,4/2/1981,2975,20,MUMBAI
9654,MARTIN,SALESMAN,7698,9/28/1981,1250,30,CHENNAI
9369,SMITH,CLERK,7902,12/17/1980,800,20,KOLKATA

Emp_data2.csv

empno,ename,designation,manager,hire_date,sal,deptno
8369,SMITH,CLERK,7902,12/17/1980,800,20
8499,ALLEN,SALESMAN,7698,2/20/1981,1600,30
8521,WARD,SALESMAN,7698,2/22/1981,1250,30
8566,TURNER,MANAGER,7839,4/2/1981,2975,20
8654,MARTIN,SALESMAN,7698,9/28/1981,1250,30
8369,SMITH,CLERK,7902,12/17/1980,800,20

Here, the Emp_data1.csv file have an additional column named location. 

Components Involved

  • Spark 2.x
  • CSV

Solution

Step 1: Read CSV file data

 val emp_dataDf1 = spark.read.format("csv")
                             .option("header", "true")
                             .load("./My-Work-Book/BDP/data/emp_data1.csv)

 val emp_dataDf2 = spark.read.format("csv")
                             .option("header", "true")
                             .load("./My-Work-Book/BDP/data/emp_data2.csv")

Step 2: Merging Two DataFrames

We have loaded both the CSV files into two Data Frames. Let’s try to merge these Data Frames using below UNION function:

val mergeDf = emp_dataDf1.union(emp_dataDf2)

We will get the below exception saying UNION can only be performed on the same number of columns.

Approach 1: When you know the missing column name

You can add the missing column in a Data Frame:

val emp_dataWithColDf = emp_dataDf2.withColumn("location", lit(null))

val mergeDf = emp_dataDf1.union(emp_dataWithColDf)

Approach 2: Generic way to find missing and add a column with a dummy value in a DataFrames

def customSelect(availableCols: List[String], requiredCols: List[String]) = {
      requiredCols.map(column => column match {
            case column if availableCols.contains(column) => col(column)
            case _ => lit(null).as(column)
      })
}
Val mergeDf2 = emp_dataDf2.select(customSelect(
                                   emp_dataDf2.columns.toList, 
                                   emp_dataDf1.columns.toList)
                                 :_*)
mergeDf2.show

Approach 3: Generic way to find missing and add column(s) with a dummy value in all DataFrames

val emp_data1Cols = emp_dataDf1.columns.toSet
val emp_data2Cols = emp_dataDf2.columns.toSet
val requiredColumns = emp_data1Cols ++ emp_data2Cols // union
def customSelect(availableCols: Set[String], requiredCols: Set[String]) = {
  requiredCols.toList.map(column => column match {
    case column if availableCols.contains(column) => col(column)
    case _ => lit(null).as(column)
  })
}

Val mergeDf3 = emp_dataDf1.select(customSelect(
                                   emp_data1Cols, 
                                   requiredColumns):_*)
               .union
               (emp_dataDf2.select(customSelect(
                                   emp_data2Cols, 
                                   requiredColumns):_*))

Wrapping Up

In this post, we have learned how we can merge DataFrames when having different schema, and how to overcome this problem by applying withColumn straight forward with a dummy value or how to apply dynamically. This kind of scenario is very practical and comes into the pictures when you are playing with multiple source data.

Sharing is caring!

Subscribe to our newsletter
Loading

Leave a Reply