Requirement
In this post, we are going to learn about how to compare data frames data in Spark. Let’s see a scenario where your daily job consumes data from the source system and append it into the target table as it is a Delta/Incremental load.
There is a possibility to get duplicate records when running the job multiple times. In this situation, we can compare the new data with the ingested data and ingest only the new data in order to prevent duplicate data.
Solution
Here, we are going to use the same sample data which has been used in our last post.
val dummyRDD = sc.parallelize(Seq( ("1001", "Ename1", "Designation1", "Manager1") , ("1002", "Ename2", "Designation2", "Manager2") , ("1003", "Ename3", "Designation3", "Manager3") )) val schemaSeq = Seq("empno", "ename", "designation", "manager") val dfMaster = dummyRDD.toDF(schemaSeq: _*)
First, we have created an RDD named dummyRDD. Next, we have defined the schema of the RDD – EmpNo, Ename, Designation, Manager.
Next, I have cast each field of an RDD to the respective data type. At last, I have converted an RDD to Dataframe with a defined schema.
Now, we can assume this dataframe i.e. df1 as a target table. Let’s assume you are getting a new set of data. Sample of new data:
1002, Ename2, Designation2, Manager2 |
1004, Ename4, Designation4, Manager4 |
1005, Ename5, Designation5, Manager5 |
1003, Ename3, Designation3, Manager1 |
If you have the closure look of the sample data, we are getting
- 1 Duplicate Record (i.e. 1002)
- 2 New Records (i.e. 1004 & 1005)
- 1 Updated Record (i.e. 1003)
Load the New Data in New Data Frame
val dailyDummyRDD = sc.parallelize(Seq( ("1002", "Ename2", "Designation2", "Manager2") , ("1004", "Ename4", "Designation4", "Manager4") , ("1005", "Ename5", "Designation5", "Manager5") , ("1003", "Ename3", "Designation3", "Manager1") )) val schemaSeq = Seq("empno", "ename", "designation", "manager") val dfDaily = dummyRDD.toDF(schemaSeq: _*)
Note: It is not required to write duplicate code. We have performed just to load the sample data.
Extract Incremental Data between 2 Data Frames using EXCEPT
val incrementalDf = dfDaily.exceptAll(dfMaster) incrementalDf.show
val incrementalDf = dfMaster.exceptAll(dfDaily) incrementalDf.show
Wrapping Up
In this post, we have learned to compare the data of two data frames. This has been performed using EXCEPT function. It is required to have the same schema for both the data frame.