Convert Schema to DataFrame in Spark

Requirement

In this post, we will learn how to convert a table’s schema into a Data Frame in Spark.

Sample Data

empno

ename

designation

manager

hire_date

sal

deptno

location

9369

SMITH

CLERK

7902

12/17/1980

800

20

BANGALORE

9499

ALLEN

SALESMAN

7698

2/20/1981

1600

30

HYDERABAD

9521

WARD

SALESMAN

7698

2/22/1981

1250

30

PUNE

9566

TURNER

MANAGER

7839

4/2/1981

2975

20

MUMBAI

9654

MARTIN

SALESMAN

7698

9/28/1981

1250

30

CHENNAI

9369

SMITH

CLERK

7902

12/17/1980

800

20

KOLKATA

Solution

Step 1: Load data into DF

val empDf = spark.read.option("header", "true").csv("./src/main/resources/emp_data1.csv")
empDf.createOrReplaceTempView("empTbl")

Step 2: Extract Schema in Complex Data Type

val metaSchema = empDf.schema.prettyJson
val schmeaDataset = spark.createDataset(metaSchema :: Nil)
val schemaDf = spark.read.json(schmeaDataset)
schemaDf.createOrReplaceTempView("schemaTempView")

Step 3: Explode complex Data Type

 val schemaInTbl = spark.sql("SELECT exp.name, exp.type FROM schemaTempView" +
                            " LATERAL VIEW explode(fields) explodeTbl as exp")
schemaInTbl.show

Full Code

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object ConvertSchemaToDF {
  def main(args: Array[String]): Unit = {
    val config = new SparkConf()
    config.set("spark.driver.allowMultipleContexts", "true")

    Logger.getLogger("org").setLevel(Level.OFF)
    val spark = SparkSession.builder().appName("Convert Schema to DF")
      .config("spark.master", "local")
      .getOrCreate()

    try {
      val empDf = spark.read.option("header", "true").option("inferSchema", "true").csv("./src/main/resources/emp_data1.csv")
      empDf.createOrReplaceTempView("empTbl")

      import spark.sqlContext.implicits._
      val metaSchema = empDf.schema.prettyJson
      val schmeaDataset = spark.createDataset(metaSchema :: Nil)
      val schemaDf = spark.read.json(schmeaDataset)
      schemaDf.createOrReplaceTempView("schemaTempView")
      val schemaInTbl = spark.sql("SELECT exp.name, exp.type FROM schemaTempView" +
                                           " LATERAL VIEW explode(fields) explodeTbl as exp")
      schemaInTbl.show
    } catch {
      case e: Exception => {
        println(e.printStackTrace())
      }
    }
  }
}

Wrapping Up

In this post, we have learned to get the data frame schema in JSON format, convert that into a table having columns name and data type of the respective columns.

Sharing is caring!

Subscribe to our newsletter
Loading

Leave a Reply