Compare Data Frame in Spark

Requirement

In this post, we are going to learn about how to compare data frames data in Spark. Let’s see a scenario where your daily job consumes data from the source system and append it into the target table as it is a Delta/Incremental load.

There is a possibility to get duplicate records when running the job multiple times. In this situation, we can compare the new data with the ingested data and ingest only the new data in order to prevent duplicate data.

Solution

Here, we are going to use the same sample data which has been used in our last post.

 val dummyRDD = sc.parallelize(Seq(
      ("1001", "Ename1", "Designation1", "Manager1")
      , ("1002", "Ename2", "Designation2", "Manager2")
      , ("1003", "Ename3", "Designation3", "Manager3")
            ))

val schemaSeq = Seq("empno", "ename", "designation", "manager")
val dfMaster = dummyRDD.toDF(schemaSeq: _*)

First, we have created an RDD named dummyRDD. Next, we have defined the schema of the RDD – EmpNo, Ename, Designation, Manager.

Next, I have cast each field of an RDD to the respective data type. At last, I have converted an RDD to Dataframe with a defined schema.

Now, we can assume this dataframe i.e. df1 as a target table. Let’s assume you are getting a new set of data. Sample of new data:

1002, Ename2, Designation2, Manager2

1004, Ename4, Designation4, Manager4

1005, Ename5, Designation5, Manager5

1003, Ename3, Designation3, Manager1

If you have the closure look of the sample data, we are getting

  • 1 Duplicate Record (i.e. 1002)
  • 2 New Records (i.e. 1004 & 1005)
  • 1 Updated Record (i.e. 1003)

Load the New Data in New Data Frame

 val dailyDummyRDD = sc.parallelize(Seq(
        ("1002", "Ename2", "Designation2", "Manager2")
        , ("1004", "Ename4", "Designation4", "Manager4")
        , ("1005", "Ename5", "Designation5", "Manager5")
        , ("1003", "Ename3", "Designation3", "Manager1")
              ))
        
val schemaSeq = Seq("empno", "ename", "designation", "manager")
val dfDaily = dummyRDD.toDF(schemaSeq: _*)

Note: It is not required to write duplicate code. We have performed just to load the sample data.

Extract Incremental Data between 2 Data Frames using EXCEPT

 val incrementalDf = dfDaily.exceptAll(dfMaster)
incrementalDf.show

 val incrementalDf = dfMaster.exceptAll(dfDaily)
incrementalDf.show

Wrapping Up

In this post, we have learned to compare the data of two data frames. This has been performed using EXCEPT function. It is required to have the same schema for both the data frame.

Sharing is caring!

Subscribe to our newsletter
Loading

Leave a Reply