Load data from MySQL in Spark using JDBC

Requirement

We have data in an RDBMS table say MySQL table. The requirement is to load data from MySQL in Spark using JDBC connection.

Sample Data

We will use below sample data which contains the basic details of an employee like employee number, employee name, designation, manager, hire date, salary and department. 

 empno,ename,designation,manager,hire_date,sal,deptno
7369,SMITH,CLERK,7902,12/17/1980,800,20
7499,ALLEN,SALESMAN,7698,2/20/1981,1600,30
7521,WARD,SALESMAN,7698,2/22/1981,1250,30
7566,TURNER,MANAGER,7839,4/2/1981,2975,20
7654,MARTIN,SALESMAN,7698,9/28/1981,1250,30

Components Involved

  • MySQL
  • Spark 2.x

Solution

We will first create the source table with sample data and then read the data in Spark using JDBC connection.

Step 1: Data Preparation

Let’s create a table named employee MySQL and load the sample data using the below query:

 CREATE TABLE employee (
      empno INT,
      ename VARCHAR(100),
      designation VARCHAR(100),
      manager INT,
      hire_date VARCHAR(50),
      sal INT,
      deptno INT
);

INSERT INTO employee (empno, ename, designation, manager, hire_date, sal, deptno)
VALUES (7369,'SMITH','CLERK',7902,'1980-12-17',800.00,20),
(7499,'ALLEN','SALESMAN',7698,'1981-02-20',1600.00,30),
(7521,'WARD','SALESMAN',7698,'1981-02-22',1250.00,30),
(7566,'JONES','MANAGER',7839,'1981-04-02',2975.00,20),
(7654,'MARTIN','SALESMAN',7698,'1981-09-28',1250.00,30);

Step 2: Spark Code

Now, let’s write the spark code to establish the connection and load data from MySQL to Spark data frame.

import java.sql.{Connection, DriverManager, ResultSet}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext

object ReadDataFromJdbc {
  def main(args: Array[String]): Unit = {
    val sourceTable = args(0)

    // Spark Configuration set up
    val config = new SparkConf().setAppName("Read JDBC Data: " + sourceTable)
    config.set("spark.driver.allowMultipleContexts","true")

    try {
      print("Started.......")

      // JDBC connection details
      val driver = "com.mysql.jdbc.Driver"
      val url = "jdbc:mysql://localhost:3306/bdp"
      val user = "root"
      val pass = "Password"

      // JDBC Connection and load table in Dataframe
      val sourceDf = spark.read.format("jdbc")
        .option("driver", driver)
        .option("url", url)
        .option("dbtable", sourceTable)
        .option("user", user)
        .option("password", pass)
        .load()

      // Read data from Dataframe
      sourceDf.show()

    } catch {
      case e : Throwable => println("Connectivity Failed for Table ", e)
    }
  }
}

Here, we have established a JDBC connection to a MySQL table. We have provided all the required details like URL, DRIVER, DB TABLE, and credential – user and password.

Note: The connection details must be hidden in the code. We will show you in another post how to hide these details in the code.

Step 3: Job Execution

Here, we will execute the spark code in Spark-shell. We also required a MySQL connector to connect to the MySQL table. 

The command used for execution:

spark-shell
scala>:require /home/bdp/jars/mysql-connector-java-5.1.30-bin.jar
scala>:load /home/bdp/codebase/ReadDataFromJdbc.scala

Here, we started the spark-shell. Once the spark-shell open, we loaded the MySQL connector jar.

We have used LOAD command to load the spark code and executed the main class by passing the table name as an argument.

 scala> ReadDataFromJdbc.main(Array("employee"))

You can check here multiples way to execute your spark code without creating JAR.

Wrapping Up

In this post, we have created a JDBC connection for MySQL and fetched the data. Likewise, we can do for other RDBMS sources – SQL Server, Oracle, etc.

Sharing is caring!

Subscribe to our newsletter
Loading

Leave a Reply