Requirement
Let’s say we are getting data from two different sources (i.e. RDBMS table and File), and we need to merge these data into a single dataframe. Both the source data having the same schema.
![]()
Sample Data
MySQL Table Data:
empno,ename,designation,manager,hire_date,sal,deptno 7369,SMITH,CLERK,7902,12/17/1980,800,20 7499,ALLEN,SALESMAN,7698,2/20/1981,1600,30 7521,WARD,SALESMAN,7698,2/22/1981,1250,30 7566,TURNER,MANAGER,7839,4/2/1981,2975,20 7654,MARTIN,SALESMAN,7698,9/28/1981,1250,30
CSV File Data:
empno,ename,designation,manager,hire_date,sal,deptno 8369,SMITH,CLERK,7902,12/17/1980,800,20 8499,ALLEN,SALESMAN,7698,2/20/1981,1600,30 8521,WARD,SALESMAN,7698,2/22/1981,1250,30 8566,TURNER,MANAGER,7839,4/2/1981,2975,20 8654,MARTIN,SALESMAN,7698,9/28/1981,1250,30 7369,SMITH,CLERK,7902,12/17/1980,800,20
Components Involved
- Spark 2.x
- CSV
- MySQL
Solution
Step 1: Read data from RDBMS Table
There is a MySQL table having some dummy data. We will load this table data into a dataframe. Below is the code snippet for the same:
val driver = "com.mysql.jdbc.Driver"
val url = "jdbc:mysql://localhost:3306/bdp"
val user = "root"
val pass = "Password"
val sourceTable = "employee"
// JDBC Connection and load table in Dataframe
val mysqlDf = spark.read.format("jdbc")
.option("driver", driver)
.option("url", url)
.option("dbtable", sourceTable)
.option("user", user)
.option("password", pass).load()![]()
Note: MySQL connector jar is required.
Step 2: Read CSV file data
val csvDf = spark.read.format("csv")
.option("header", "true")
.load("file:///home/bdp/data/employee_details.csv")![]()
Here, we are loading a local file into a dataframe. In case, you have a file at the HDFS path, then no need to specify “file://” in the file path.
Step 3: Merging Two Dataframes
We have two dataframes i.e. mysqlDf and csvDf with a similar schema. Let’s merge this dataframe:
val mergeDf = mysqlDf.union(csvDf) mergeDf.show()
![]()
Here, We have used the UNION function to merge the dataframes. You can load this final dataframe to the target table.
Source Code:
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
*
*/
object MergeTwoDataframe {
def main(args: Array[String]): Unit = {
val sourceTable = args(0)
val filePath = args(1)
// Spark Configuration set up
val config = new SparkConf().setAppName("Merge Two Dataframes")
config.set("spark.driver.allowMultipleContexts", "true")
val spark = SparkSession.builder().appName("Full Load").getOrCreate()
try {
print("Started.......")
// JDBC connection details
val driver = "com.mysql.jdbc.Driver"
val url = "jdbc:mysql://localhost:3306/bdp"
val user = "root"
val pass = "Password"
// JDBC Connection and load table in Dataframe
val sourceDf = spark.read.format("jdbc")
.option("driver", driver)
.option("url", url)
.option("dbtable", sourceTable)
.option("user", user)
.option("password", pass)
.load()
// Read data from Dataframe
sourceDf.show()
// Load data from CSV file
val csvDf = spark.read.format("csv").option("header", "true").load(filePath)
csvDf.show()
// Merge both dataframes
val mergeDf = sourceDf.union(csvDf)
mergeDf.show
// Dump final dataframe into hive target table
mergeDf.write.mode(saveMode = "append").saveAsTable("bdp.employee")
}
}
}Output
+-----+------+-----------+-------+----------+----+------+ |empno| ename|designation|manager| hire_date| sal|deptno| +-----+------+-----------+-------+----------+----+------+ | 7369| SMITH| CLERK| 7902|1980-12-17| 800| 20| | 7499| ALLEN| SALESMAN| 7698|1981-02-20|1600| 30| | 7521| WARD| SALESMAN| 7698|1981-02-22|1250| 30| | 7566| JONES| MANAGER| 7839|1981-04-02|2975| 20| | 7654|MARTIN| SALESMAN| 7698|1981-09-28|1250| 30| | 8369| SMITH| CLERK| 7902|12/17/1980| 800| 20| | 8499| ALLEN| SALESMAN| 7698| 2/20/1981|1600| 30| | 8521| WARD| SALESMAN| 7698| 2/22/1981|1250| 30| | 8566|TURNER| MANAGER| 7839| 4/2/1981|2975| 20| | 8654|MARTIN| SALESMAN| 7698| 9/28/1981|1250| 30| | 7369| SMITH| CLERK| 7902|12/17/1980| 800| 20| +-----+------+-----------+-------+----------+----+------+
![]()
Wrapping Up
In this post, we have read the data from two different sources RDBMS and CSV file, loaded the data into dataframes and then merged both the dataframes into a single dataframe.