Merge Multiple Data Frames in Spark

Requirement

Let’s say we are getting data from multiple sources, but we need to ingest these data into a single target table.  These data can have different schemas. We want to merge these data and load/save it into a table.

Sample Data

Emp_data1.csv

empno,ename,designation,manager,hire_date,sal,deptno,location

9369,SMITH,CLERK,7902,12/17/1980,800,20,BANGALORE

9499,ALLEN,SALESMAN,7698,2/20/1981,1600,30,HYDERABAD

9521,WARD,SALESMAN,7698,2/22/1981,1250,30,PUNE

9566,TURNER,MANAGER,7839,4/2/1981,2975,20,MUMBAI

9654,MARTIN,SALESMAN,7698,9/28/1981,1250,30,CHENNAI

9369,SMITH,CLERK,7902,12/17/1980,800,20,KOLKATA

 

Emp_data2.csv

empno,ename,designation,manager,hire_date,sal,deptno

8369,SMITH,CLERK,7902,12/17/1980,800,20

8499,ALLEN,SALESMAN,7698,2/20/1981,1600,30

8521,WARD,SALESMAN,7698,2/22/1981,1250,30

8566,TURNER,MANAGER,7839,4/2/1981,2975,20

8654,MARTIN,SALESMAN,7698,9/28/1981,1250,30

8369,SMITH,CLERK,7902,12/17/1980,800,20

 

Emp_data2.csv

empno,ename,designation,manager,hire_date,sal,deptno,location

7369,SMITH,CLERK,7902,12/17/1980,800,20,BANGALORE

7499,ALLEN,SALESMAN,7698,2/20/1981,1600,30,HYDERABAD

7521,WARD,SALESMAN,7698,2/22/1981,1250,30,PUNE

7566,TURNER,MANAGER,7839,4/2/1981,2975,20,MUMBAI

7654,MARTIN,SALESMAN,7698,9/28/1981,1250,30,CHENNAI

7369,SMITH,CLERK,7902,12/17/1980,800,20,KOLKATA

Solution

Step 1: Load CSV in DataFrame

 val emp_dataDf1=spark.read.format("csv")
                          .option("header","true")
                          .load("./src/main/resources/emp_data1.csv")

val emp_dataDf2=spark.read.format("csv")
                          .option("header","true")
                          .load("./src/main/resources/emp_data2.csv")

valemp_dataDf3=spark.read.format("csv")
                         .option("header","true")
                         .load("./src/main/resources/emp_data3.csv")

Step 2: Schema validation and add if find missing

As the data is coming from different sources, it is good to compare the schema, and update all the Data Frames with the same schemas.

 def customSelect(availableCols: Set[String], requiredCols: Set[String]) = {
    requiredCols.toList.map(column => column match {
      case column if availableCols.contains(column) => col(column)
      case _ => lit(null).as(column)
  })
}

val emp_data1Cols = emp_dataDf1.columns.toSet
val emp_data2Cols = emp_dataDf2.columns.toSet
val emp_data3Cols = emp_dataDf3.columns.toSet
val requiredColumns = emp_data1Cols ++ emp_data2Cols ++ emp_data3Cols // union

val empDf1 = emp_dataDf1.select(customSelect(emp_data1Cols,
                                              requiredColumns):_*)
val empDf2 = emp_dataDf2.select(customSelect(emp_data2Cols,
                                             requiredColumns): _*) 
val empDf3 = emp_dataDf3.select(customSelect(emp_data3Cols,
                                             requiredColumns): _*)

Step 3: Merge All Data Frames

Now, we have all the Data Frames with the same schemas.

Approach 1: Merge One-By-One DataFrames

val mergeDf = empDf1.union(empDf2).union(empDf3)
mergeDf.show()

Here, we have merged the first 2 data frames and then merged the result data frame with the last data frame.

Approach 2: Merging All DataFrames Together

val dfSeq = Seq(empDf1, empDf2, empDf3)
val mergeSeqDf = dfSeq.reduce(_ union _)
mergeSeqDf.show()

Here, have created a sequence and then used the reduce function to union all the data frames.

Full Code

 object MergeMultipleDataframe {
  def customSelect(availableCols: Set[String], requiredCols: Set[String]) = {
    requiredCols.toList.map(column => column match {
      case column if availableCols.contains(column) => col(column)
      case _ => lit(null).as(column)
    })
  }

  def main(args: Array[String]): Unit = {
    val config = new SparkConf().setAppName("Merge Two Dataframes")
    config.set("spark.driver.allowMultipleContexts", "true")
    val spark = SparkSession.builder()
                            .appName("Merge Multiple Dataframes")
                            .config("spark.master", "local")
                            .getOrCreate()
    try {
      //import spark.implicits._
      val emp_dataDf1 = spark.read.format("csv")
                                  .option("header", "true")
                                  .load("./src/main/resources/emp_data1.csv")
      val emp_dataDf2 = spark.read.format("csv")
                                  .option("header", "true")
                                  .load("./src/main/resources/emp_data2.csv")
      val emp_dataDf3 = spark.read.format("csv")
                                  .option("header", "true")
                                  .load("./src/main/resources/emp_data3.csv")
      emp_dataDf3.show()
      emp_dataDf1.show()
      emp_dataDf2.show()

      // Schema fixes
      val emp_data1Cols = emp_dataDf1.columns.toSet
      val emp_data2Cols = emp_dataDf2.columns.toSet
      val emp_data3Cols = emp_dataDf3.columns.toSet
      val requiredColumns = emp_data1Cols ++ emp_data2Cols ++ emp_data3Cols // union

      val empDf1 = emp_dataDf1.select(customSelect(
        emp_data1Cols,
        requiredColumns):_*)
      val empDf2 = emp_dataDf2.select(customSelect(
        emp_data2Cols,
        requiredColumns): _*)
      val empDf3 = emp_dataDf3.select(customSelect(
        emp_data3Cols,
        requiredColumns): _*)

      // Approach 1
      val mergeDf = empDf1.union(empDf2).union(empDf3)
      mergeDf.show()

      // Approach 2
      val dfSeq = Seq(empDf1, empDf2, empDf3)
      val mergeSeqDf = dfSeq.reduce(_ union _)
      mergeSeqDf.show()
    }
  }
}

Here, we have merged all sources data into a single data frame. We can save or load this data frame at any HDFS path or into the table.

Wrapping Up

In this post, we have learned how can we merge multiple Data Frames, even having different schema, with different approaches. You can also try to extend the code for accepting and processing any number of source data and load into a single target table.

Sharing is caring!

Subscribe to our newsletter
Loading

Leave a Reply