Requirement
Suppose we have a process that consumes data from upstream. This data includes both new and updated information. We now need to consume and ingest this information into the table in the same manner. It means we have to insert all the new data and update the modified data. The aim of this post is to give an overview of how to merge into delta table.
Solution
Our earlier post created some delta tables which will be used in this post. This table will be used for daily ingestion. The data in the delta table will look like this:
The following five records contain basic information about a user, such as id, name, location, and contact.
Suppose that today we received data and it has been loaded into a dataframe. Here is the data in the dataframe:
val dailyDf = Seq((1400, "Person4", "Location4", "Contact4") ,(1500, "Person5", "Location5", "Contact5") ,(1600, "Person6", "Location6", "Contact6")).toDF("id", "name", "location", "contact") display(dailyDf)
There are 3 records in this data frame. As you can see from the table above, id 1400 and 1500 have updated data (modified contact information and location), but id 1600 contains new information.
Merge in Delta Table Databricks
The merge operation basically updates, inserts, and deletes data by comparing the delta table data from the source and the target. In this case, testdatatable is a target, while the data frame can be seen as a source.
Syntax
MERGE INTO <target_table> [AS target] USING <source_table> [AS source] ON <merge_condition> WHEN MATCHED [AND CONDITION] THEN <match_action> WHEN NOT MATCHED [AND CONDITION] THEN <notmatch_action>
Here,
<merge_condition>: A condition on which merge operation will perform
[AND CONDITION]: An additional condition for performing any action
Actions: Update, Insert and Delete
MERGE INTO testdb.testdeltatable as target USINg dailyTable as source ON target.id = source.id WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT *
Data Validation
We can check now the delta table, and what got inserted and updated.
Rows 1400 and 1500 have been updated with the most recent location and contact information, while 1600 has been added.
Full Source Code
%scala val dailyDf = Seq((1400, "Person4", "Location4", "Contact4") ,(1500, "Person5", "Location5", "Contact5") ,(1600, "Person6", "Location6", "Contact6")).toDF("id", "name", "location", "contact") dailyDf.createOrReplaceTempView("dailyTable")
%sql MERGE INTO testdb.testdeltatable as target USINg dailyTable as source ON target.id = source.id WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT *
Wrapping Up
The merge operation can be very useful when working with incremental data. It offers all the necessary features for inserting, updating, and deleting records. The actions can be used according to our requirements.