Requirement
Let’s say we are getting data from multiple sources, but we need to ingest these data into a single target table. These data can have different schemas. We want to merge these data and load/save it into a table.
Sample Data
Emp_data1.csv
empno,ename,designation,manager,hire_date,sal,deptno,location 9369,SMITH,CLERK,7902,12/17/1980,800,20,BANGALORE 9499,ALLEN,SALESMAN,7698,2/20/1981,1600,30,HYDERABAD 9521,WARD,SALESMAN,7698,2/22/1981,1250,30,PUNE 9566,TURNER,MANAGER,7839,4/2/1981,2975,20,MUMBAI 9654,MARTIN,SALESMAN,7698,9/28/1981,1250,30,CHENNAI 9369,SMITH,CLERK,7902,12/17/1980,800,20,KOLKATA |
Emp_data2.csv
empno,ename,designation,manager,hire_date,sal,deptno 8369,SMITH,CLERK,7902,12/17/1980,800,20 8499,ALLEN,SALESMAN,7698,2/20/1981,1600,30 8521,WARD,SALESMAN,7698,2/22/1981,1250,30 8566,TURNER,MANAGER,7839,4/2/1981,2975,20 8654,MARTIN,SALESMAN,7698,9/28/1981,1250,30 8369,SMITH,CLERK,7902,12/17/1980,800,20 |
Emp_data2.csv
empno,ename,designation,manager,hire_date,sal,deptno,location 7369,SMITH,CLERK,7902,12/17/1980,800,20,BANGALORE 7499,ALLEN,SALESMAN,7698,2/20/1981,1600,30,HYDERABAD 7521,WARD,SALESMAN,7698,2/22/1981,1250,30,PUNE 7566,TURNER,MANAGER,7839,4/2/1981,2975,20,MUMBAI 7654,MARTIN,SALESMAN,7698,9/28/1981,1250,30,CHENNAI 7369,SMITH,CLERK,7902,12/17/1980,800,20,KOLKATA |
Solution
Step 1: Load CSV in DataFrame
val emp_dataDf1=spark.read.format("csv") .option("header","true") .load("./src/main/resources/emp_data1.csv") val emp_dataDf2=spark.read.format("csv") .option("header","true") .load("./src/main/resources/emp_data2.csv") valemp_dataDf3=spark.read.format("csv") .option("header","true") .load("./src/main/resources/emp_data3.csv")
Step 2: Schema validation and add if find missing
As the data is coming from different sources, it is good to compare the schema, and update all the Data Frames with the same schemas.
def customSelect(availableCols: Set[String], requiredCols: Set[String]) = { requiredCols.toList.map(column => column match { case column if availableCols.contains(column) => col(column) case _ => lit(null).as(column) }) } val emp_data1Cols = emp_dataDf1.columns.toSet val emp_data2Cols = emp_dataDf2.columns.toSet val emp_data3Cols = emp_dataDf3.columns.toSet val requiredColumns = emp_data1Cols ++ emp_data2Cols ++ emp_data3Cols // union val empDf1 = emp_dataDf1.select(customSelect(emp_data1Cols, requiredColumns):_*) val empDf2 = emp_dataDf2.select(customSelect(emp_data2Cols, requiredColumns): _*) val empDf3 = emp_dataDf3.select(customSelect(emp_data3Cols, requiredColumns): _*)
Step 3: Merge All Data Frames
Now, we have all the Data Frames with the same schemas.
Approach 1: Merge One-By-One DataFrames
val mergeDf = empDf1.union(empDf2).union(empDf3) mergeDf.show()
Here, we have merged the first 2 data frames and then merged the result data frame with the last data frame.
Approach 2: Merging All DataFrames Together
val dfSeq = Seq(empDf1, empDf2, empDf3) val mergeSeqDf = dfSeq.reduce(_ union _) mergeSeqDf.show()
Here, have created a sequence and then used the reduce function to union all the data frames.
Full Code
object MergeMultipleDataframe { def customSelect(availableCols: Set[String], requiredCols: Set[String]) = { requiredCols.toList.map(column => column match { case column if availableCols.contains(column) => col(column) case _ => lit(null).as(column) }) } def main(args: Array[String]): Unit = { val config = new SparkConf().setAppName("Merge Two Dataframes") config.set("spark.driver.allowMultipleContexts", "true") val spark = SparkSession.builder() .appName("Merge Multiple Dataframes") .config("spark.master", "local") .getOrCreate() try { //import spark.implicits._ val emp_dataDf1 = spark.read.format("csv") .option("header", "true") .load("./src/main/resources/emp_data1.csv") val emp_dataDf2 = spark.read.format("csv") .option("header", "true") .load("./src/main/resources/emp_data2.csv") val emp_dataDf3 = spark.read.format("csv") .option("header", "true") .load("./src/main/resources/emp_data3.csv") emp_dataDf3.show() emp_dataDf1.show() emp_dataDf2.show() // Schema fixes val emp_data1Cols = emp_dataDf1.columns.toSet val emp_data2Cols = emp_dataDf2.columns.toSet val emp_data3Cols = emp_dataDf3.columns.toSet val requiredColumns = emp_data1Cols ++ emp_data2Cols ++ emp_data3Cols // union val empDf1 = emp_dataDf1.select(customSelect( emp_data1Cols, requiredColumns):_*) val empDf2 = emp_dataDf2.select(customSelect( emp_data2Cols, requiredColumns): _*) val empDf3 = emp_dataDf3.select(customSelect( emp_data3Cols, requiredColumns): _*) // Approach 1 val mergeDf = empDf1.union(empDf2).union(empDf3) mergeDf.show() // Approach 2 val dfSeq = Seq(empDf1, empDf2, empDf3) val mergeSeqDf = dfSeq.reduce(_ union _) mergeSeqDf.show() } } }
Here, we have merged all sources data into a single data frame. We can save or load this data frame at any HDFS path or into the table.
Wrapping Up
In this post, we have learned how can we merge multiple Data Frames, even having different schema, with different approaches. You can also try to extend the code for accepting and processing any number of source data and load into a single target table.