Requirement
Suppose we have a process that consumes data from upstream. This data includes both new and updated information. We now need to consume and ingest this information into the table in the same manner. It means we have to insert all the new data and update the modified data. The aim of this post is to give an overview of how to merge into delta table using Spark Scala.
Solution
Our earlier post created some delta tables which will be used in this post. This table will be used for daily ingestion. The data in the delta table will look like this:
The following five records contain basic information about a user, such as an id, name, location, and contact.
Suppose that today we received data and it has been loaded into a dataframe. Here is the data in the dataframe:
val dailyDf = Seq((1400, "Person4", "Location4", "Contact4") ,(1500, "Person5", "Location5", "Contact5") ,(1600, "Person6", "Location6", "Contact6")).toDF("id", "name", "location", "contact") display(dailyDf)
There are 3 records in this data frame. As you can see from the table above, id 1400 and 1500 have updated data (modified contact information and location), but id 1600 contains new information.
Merge in Delta Table Databricks
The merge operation basically updates, inserts, and deletes data by comparing the delta table data from the source and the target. In this case, testdatatable is a target, while the dataframe can be seen as a source.
%scala import io.delta.tables._ val target_table = DeltaTable.forName("testdb.testDeltaTable") target_table.as("target") .merge( dailyDf.as("source"), "source.id = target.id") .whenMatched().updateAll() .whenNotMatched().insertAll() .execute()
Data Validation
We can check now the delta table, and what got inserted and updated.
Rows 1400 and 1500 have been updated with the most recent location and contact information, while 1600 has been added.
Full Source Code
%scala import io.delta.tables._ val dailyDf = Seq((1400, "Person4", "Location4", "Contact4") ,(1500, "Person5", "Location5", "Contact5") ,(1600, "Person6", "Location6", "Contact6")).toDF("id", "name", "location", "contact") val target_table = DeltaTable.forName("testdb.testDeltaTable") target_table.as("target") .merge( dailyDf.as("source"), "source.id = target.id") .whenMatched().updateAll() .whenNotMatched().insertAll() .execute()
Wrapping Up
The merge operation can be very useful when working with incremental data. It offers all the necessary features for inserting, updating, and deleting records. The actions can be used according to our requirements.