Merge into Delta Table using Spark SQL

Requirement

Suppose we have a process that consumes data from upstream. This data includes both new and updated information. We now need to consume and ingest this information into the table in the same manner. It means we have to insert all the new data and update the modified data. The aim of this post is to give an overview of how to merge into delta table.

Solution

Our earlier post created some delta tables which will be used in this post. This table will be used for daily ingestion. The data in the delta table will look like this:

The following five records contain basic information about a user, such as id, name, location, and contact.

Suppose that today we received data and it has been loaded into a dataframe. Here is the data in the dataframe:

val dailyDf = Seq((1400, "Person4", "Location4", "Contact4")
                 ,(1500, "Person5", "Location5", "Contact5")
                 ,(1600, "Person6", "Location6", "Contact6")).toDF("id", "name", "location", "contact")
display(dailyDf)

There are 3 records in this data frame.  As you can see from the table above, id 1400 and 1500 have updated data (modified contact information and location), but id 1600 contains new information.

Merge in Delta Table Databricks

The merge operation basically updates, inserts, and deletes data by comparing the delta table data from the source and the target. In this case, testdatatable is a target, while the data frame can be seen as a source.

Syntax

MERGE INTO <target_table> [AS target]
USING <source_table> [AS source]
ON <merge_condition>
WHEN MATCHED [AND CONDITION] THEN <match_action>
WHEN NOT MATCHED [AND CONDITION] THEN <notmatch_action>

Here,

<merge_condition>: A condition on which merge operation will perform

[AND CONDITION]: An additional condition for performing any action

Actions: Update, Insert and Delete

MERGE INTO testdb.testdeltatable as target
USINg dailyTable as source
ON target.id = source.id
WHEN MATCHED 
  THEN UPDATE SET *
WHEN NOT MATCHED
  THEN INSERT *

Data Validation

We can check now the delta table, and what got inserted and updated.

Rows 1400 and 1500 have been updated with the most recent location and contact information, while 1600 has been added.

Full Source Code

%scala
val dailyDf = Seq((1400, "Person4", "Location4", "Contact4")
                 ,(1500, "Person5", "Location5", "Contact5")
                 ,(1600, "Person6", "Location6", "Contact6")).toDF("id", "name", "location", "contact")
dailyDf.createOrReplaceTempView("dailyTable")
%sql

MERGE INTO testdb.testdeltatable as target
USINg dailyTable as source
ON target.id = source.id
WHEN MATCHED 
  THEN UPDATE SET *
WHEN NOT MATCHED
  THEN INSERT *

Wrapping Up

The merge operation can be very useful when working with incremental data. It offers all the necessary features for inserting, updating, and deleting records. The actions can be used according to our requirements.

Sharing is caring!

Subscribe to our newsletter
Loading

Leave a Reply