Requirement
We have data in an RDBMS table say MySQL table. The requirement is to load data from MySQL in Spark using JDBC connection.
Sample Data
We will use below sample data which contains the basic details of an employee like employee number, employee name, designation, manager, hire date, salary and department.
empno,ename,designation,manager,hire_date,sal,deptno 7369,SMITH,CLERK,7902,12/17/1980,800,20 7499,ALLEN,SALESMAN,7698,2/20/1981,1600,30 7521,WARD,SALESMAN,7698,2/22/1981,1250,30 7566,TURNER,MANAGER,7839,4/2/1981,2975,20 7654,MARTIN,SALESMAN,7698,9/28/1981,1250,30
Components Involved
- MySQL
- Spark 2.x
Solution
We will first create the source table with sample data and then read the data in Spark using JDBC connection.
Step 1: Data Preparation
Let’s create a table named employee MySQL and load the sample data using the below query:
CREATE TABLE employee ( empno INT, ename VARCHAR(100), designation VARCHAR(100), manager INT, hire_date VARCHAR(50), sal INT, deptno INT ); INSERT INTO employee (empno, ename, designation, manager, hire_date, sal, deptno) VALUES (7369,'SMITH','CLERK',7902,'1980-12-17',800.00,20), (7499,'ALLEN','SALESMAN',7698,'1981-02-20',1600.00,30), (7521,'WARD','SALESMAN',7698,'1981-02-22',1250.00,30), (7566,'JONES','MANAGER',7839,'1981-04-02',2975.00,20), (7654,'MARTIN','SALESMAN',7698,'1981-09-28',1250.00,30);
Step 2: Spark Code
Now, let’s write the spark code to establish the connection and load data from MySQL to Spark data frame.
import java.sql.{Connection, DriverManager, ResultSet} import org.apache.spark.sql.functions._ import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.hive.HiveContext object ReadDataFromJdbc { def main(args: Array[String]): Unit = { val sourceTable = args(0) // Spark Configuration set up val config = new SparkConf().setAppName("Read JDBC Data: " + sourceTable) config.set("spark.driver.allowMultipleContexts","true") try { print("Started.......") // JDBC connection details val driver = "com.mysql.jdbc.Driver" val url = "jdbc:mysql://localhost:3306/bdp" val user = "root" val pass = "Password" // JDBC Connection and load table in Dataframe val sourceDf = spark.read.format("jdbc") .option("driver", driver) .option("url", url) .option("dbtable", sourceTable) .option("user", user) .option("password", pass) .load() // Read data from Dataframe sourceDf.show() } catch { case e : Throwable => println("Connectivity Failed for Table ", e) } } }
Here, we have established a JDBC connection to a MySQL table. We have provided all the required details like URL, DRIVER, DB TABLE, and credential – user and password.
Note: The connection details must be hidden in the code. We will show you in another post how to hide these details in the code.
Step 3: Job Execution
Here, we will execute the spark code in Spark-shell. We also required a MySQL connector to connect to the MySQL table.
The command used for execution:
spark-shell scala>:require /home/bdp/jars/mysql-connector-java-5.1.30-bin.jar scala>:load /home/bdp/codebase/ReadDataFromJdbc.scala
Here, we started the spark-shell. Once the spark-shell open, we loaded the MySQL connector jar.
We have used LOAD command to load the spark code and executed the main class by passing the table name as an argument.
scala> ReadDataFromJdbc.main(Array("employee"))
You can check here multiples way to execute your spark code without creating JAR.
Wrapping Up
In this post, we have created a JDBC connection for MySQL and fetched the data. Likewise, we can do for other RDBMS sources – SQL Server, Oracle, etc.