Requirement
Let’s say we are getting data from two different sources (i.e. RDBMS table and File), and we need to merge these data into a single dataframe. Both the source data having the same schema.
Sample Data
MySQL Table Data:
empno,ename,designation,manager,hire_date,sal,deptno 7369,SMITH,CLERK,7902,12/17/1980,800,20 7499,ALLEN,SALESMAN,7698,2/20/1981,1600,30 7521,WARD,SALESMAN,7698,2/22/1981,1250,30 7566,TURNER,MANAGER,7839,4/2/1981,2975,20 7654,MARTIN,SALESMAN,7698,9/28/1981,1250,30
CSV File Data:
empno,ename,designation,manager,hire_date,sal,deptno 8369,SMITH,CLERK,7902,12/17/1980,800,20 8499,ALLEN,SALESMAN,7698,2/20/1981,1600,30 8521,WARD,SALESMAN,7698,2/22/1981,1250,30 8566,TURNER,MANAGER,7839,4/2/1981,2975,20 8654,MARTIN,SALESMAN,7698,9/28/1981,1250,30 7369,SMITH,CLERK,7902,12/17/1980,800,20
Components Involved
- Spark 2.x
- CSV
- MySQL
Solution
Step 1: Read data from RDBMS Table
There is a MySQL table having some dummy data. We will load this table data into a dataframe. Below is the code snippet for the same:
val driver = "com.mysql.jdbc.Driver" val url = "jdbc:mysql://localhost:3306/bdp" val user = "root" val pass = "Password" val sourceTable = "employee" // JDBC Connection and load table in Dataframe val mysqlDf = spark.read.format("jdbc") .option("driver", driver) .option("url", url) .option("dbtable", sourceTable) .option("user", user) .option("password", pass).load()
Note: MySQL connector jar is required.
Step 2: Read CSV file data
val csvDf = spark.read.format("csv") .option("header", "true") .load("file:///home/bdp/data/employee_details.csv")
Here, we are loading a local file into a dataframe. In case, you have a file at the HDFS path, then no need to specify “file://” in the file path.
Step 3: Merging Two Dataframes
We have two dataframes i.e. mysqlDf and csvDf with a similar schema. Let’s merge this dataframe:
val mergeDf = mysqlDf.union(csvDf) mergeDf.show()
Here, We have used the UNION function to merge the dataframes. You can load this final dataframe to the target table.
Source Code:
import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession /** * */ object MergeTwoDataframe { def main(args: Array[String]): Unit = { val sourceTable = args(0) val filePath = args(1) // Spark Configuration set up val config = new SparkConf().setAppName("Merge Two Dataframes") config.set("spark.driver.allowMultipleContexts", "true") val spark = SparkSession.builder().appName("Full Load").getOrCreate() try { print("Started.......") // JDBC connection details val driver = "com.mysql.jdbc.Driver" val url = "jdbc:mysql://localhost:3306/bdp" val user = "root" val pass = "Password" // JDBC Connection and load table in Dataframe val sourceDf = spark.read.format("jdbc") .option("driver", driver) .option("url", url) .option("dbtable", sourceTable) .option("user", user) .option("password", pass) .load() // Read data from Dataframe sourceDf.show() // Load data from CSV file val csvDf = spark.read.format("csv").option("header", "true").load(filePath) csvDf.show() // Merge both dataframes val mergeDf = sourceDf.union(csvDf) mergeDf.show // Dump final dataframe into hive target table mergeDf.write.mode(saveMode = "append").saveAsTable("bdp.employee") } } }
Output
+-----+------+-----------+-------+----------+----+------+ |empno| ename|designation|manager| hire_date| sal|deptno| +-----+------+-----------+-------+----------+----+------+ | 7369| SMITH| CLERK| 7902|1980-12-17| 800| 20| | 7499| ALLEN| SALESMAN| 7698|1981-02-20|1600| 30| | 7521| WARD| SALESMAN| 7698|1981-02-22|1250| 30| | 7566| JONES| MANAGER| 7839|1981-04-02|2975| 20| | 7654|MARTIN| SALESMAN| 7698|1981-09-28|1250| 30| | 8369| SMITH| CLERK| 7902|12/17/1980| 800| 20| | 8499| ALLEN| SALESMAN| 7698| 2/20/1981|1600| 30| | 8521| WARD| SALESMAN| 7698| 2/22/1981|1250| 30| | 8566|TURNER| MANAGER| 7839| 4/2/1981|2975| 20| | 8654|MARTIN| SALESMAN| 7698| 9/28/1981|1250| 30| | 7369| SMITH| CLERK| 7902|12/17/1980| 800| 20| +-----+------+-----------+-------+----------+----+------+
Wrapping Up
In this post, we have read the data from two different sources RDBMS and CSV file, loaded the data into dataframes and then merged both the dataframes into a single dataframe.