Merging Two Dataframes in Spark

Requirement

Let’s say we are getting data from two different sources (i.e. RDBMS table and File), and we need to merge these data into a single dataframe. Both the source data having the same schema. 

Sample Data

MySQL Table Data:

 empno,ename,designation,manager,hire_date,sal,deptno
 7369,SMITH,CLERK,7902,12/17/1980,800,20
 7499,ALLEN,SALESMAN,7698,2/20/1981,1600,30
 7521,WARD,SALESMAN,7698,2/22/1981,1250,30
 7566,TURNER,MANAGER,7839,4/2/1981,2975,20
 7654,MARTIN,SALESMAN,7698,9/28/1981,1250,30

CSV File Data:

 empno,ename,designation,manager,hire_date,sal,deptno
 8369,SMITH,CLERK,7902,12/17/1980,800,20
 8499,ALLEN,SALESMAN,7698,2/20/1981,1600,30
 8521,WARD,SALESMAN,7698,2/22/1981,1250,30
 8566,TURNER,MANAGER,7839,4/2/1981,2975,20
 8654,MARTIN,SALESMAN,7698,9/28/1981,1250,30
 7369,SMITH,CLERK,7902,12/17/1980,800,20

Components Involved

  • Spark 2.x
  • CSV
  • MySQL

Solution

Step 1: Read data from RDBMS Table

There is a MySQL table having some dummy data. We will load this table data into a dataframe. Below is the code snippet for the same:

 val driver = "com.mysql.jdbc.Driver"
 val url = "jdbc:mysql://localhost:3306/bdp" 
 val user = "root"
 val pass = "Password"
 val sourceTable = "employee"

 // JDBC Connection and load table in Dataframe
 val mysqlDf = spark.read.format("jdbc")
                         .option("driver", driver)
                         .option("url", url)
                         .option("dbtable", sourceTable)
                         .option("user", user)
                         .option("password", pass).load()

Note: MySQL connector jar is required.

Step 2: Read CSV file data

 val csvDf = spark.read.format("csv")
                       .option("header", "true")
                       .load("file:///home/bdp/data/employee_details.csv")

Here, we are loading a local file into a dataframe. In case, you have a file at the HDFS path, then no need to specify “file://” in the file path.

Step 3: Merging Two Dataframes

We have two dataframes i.e. mysqlDf and csvDf with a similar schema. Let’s merge this dataframe:

 val mergeDf = mysqlDf.union(csvDf)
 mergeDf.show()

Here, We have used the UNION function to merge the dataframes. You can load this final dataframe to the target table.

Source Code:

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
  *
  */

object MergeTwoDataframe {
  def main(args: Array[String]): Unit = {
    val sourceTable = args(0)
    val filePath = args(1)

    // Spark Configuration set up
    val config = new SparkConf().setAppName("Merge Two Dataframes")
    config.set("spark.driver.allowMultipleContexts", "true")
    val spark = SparkSession.builder().appName("Full Load").getOrCreate()

    try {
      print("Started.......")
      // JDBC connection details
      val driver = "com.mysql.jdbc.Driver"
      val url = "jdbc:mysql://localhost:3306/bdp"
      val user = "root"
      val pass = "Password"

      // JDBC Connection and load table in Dataframe
      val sourceDf = spark.read.format("jdbc")
        .option("driver", driver)
        .option("url", url)
        .option("dbtable", sourceTable)
        .option("user", user)
        .option("password", pass)
        .load()

      // Read data from Dataframe
      sourceDf.show()

      // Load data from CSV file
      val csvDf = spark.read.format("csv").option("header", "true").load(filePath)
      csvDf.show()

      // Merge both dataframes
      val mergeDf = sourceDf.union(csvDf)
      mergeDf.show

      // Dump final dataframe into hive target table
      mergeDf.write.mode(saveMode = "append").saveAsTable("bdp.employee")
    }
  }
}

Output

+-----+------+-----------+-------+----------+----+------+
|empno| ename|designation|manager| hire_date| sal|deptno|
+-----+------+-----------+-------+----------+----+------+
| 7369| SMITH|      CLERK|   7902|1980-12-17| 800|    20|
| 7499| ALLEN|   SALESMAN|   7698|1981-02-20|1600|    30|
| 7521|  WARD|   SALESMAN|   7698|1981-02-22|1250|    30|
| 7566| JONES|    MANAGER|   7839|1981-04-02|2975|    20|
| 7654|MARTIN|   SALESMAN|   7698|1981-09-28|1250|    30|
| 8369| SMITH|      CLERK|   7902|12/17/1980| 800|    20|
| 8499| ALLEN|   SALESMAN|   7698| 2/20/1981|1600|    30|
| 8521|  WARD|   SALESMAN|   7698| 2/22/1981|1250|    30|
| 8566|TURNER|    MANAGER|   7839|  4/2/1981|2975|    20|
| 8654|MARTIN|   SALESMAN|   7698| 9/28/1981|1250|    30|
| 7369| SMITH|      CLERK|   7902|12/17/1980| 800|    20|
+-----+------+-----------+-------+----------+----+------+

Wrapping Up

In this post, we have read the data from two different sources RDBMS and CSV file, loaded the data into dataframes and then merged both the dataframes into a single dataframe.

Sharing is caring!

Subscribe to our newsletter
Loading

Leave a Reply