Requirement
In this post, we will learn how to convert a table’s schema into a Data Frame in Spark.
![]()
Sample Data
empno | ename | designation | manager | hire_date | sal | deptno | location |
9369 | SMITH | CLERK | 7902 | 12/17/1980 | 800 | 20 | BANGALORE |
9499 | ALLEN | SALESMAN | 7698 | 2/20/1981 | 1600 | 30 | HYDERABAD |
9521 | WARD | SALESMAN | 7698 | 2/22/1981 | 1250 | 30 | PUNE |
9566 | TURNER | MANAGER | 7839 | 4/2/1981 | 2975 | 20 | MUMBAI |
9654 | MARTIN | SALESMAN | 7698 | 9/28/1981 | 1250 | 30 | CHENNAI |
9369 | SMITH | CLERK | 7902 | 12/17/1980 | 800 | 20 | KOLKATA |
Solution
Step 1: Load data into DF
val empDf = spark.read.option("header", "true").csv("./src/main/resources/emp_data1.csv")
empDf.createOrReplaceTempView("empTbl")Step 2: Extract Schema in Complex Data Type
val metaSchema = empDf.schema.prettyJson
val schmeaDataset = spark.createDataset(metaSchema :: Nil)
val schemaDf = spark.read.json(schmeaDataset)
schemaDf.createOrReplaceTempView("schemaTempView")![]()
Step 3: Explode complex Data Type
val schemaInTbl = spark.sql("SELECT exp.name, exp.type FROM schemaTempView" +
" LATERAL VIEW explode(fields) explodeTbl as exp")
schemaInTbl.show![]()
Full Code
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object ConvertSchemaToDF {
def main(args: Array[String]): Unit = {
val config = new SparkConf()
config.set("spark.driver.allowMultipleContexts", "true")
Logger.getLogger("org").setLevel(Level.OFF)
val spark = SparkSession.builder().appName("Convert Schema to DF")
.config("spark.master", "local")
.getOrCreate()
try {
val empDf = spark.read.option("header", "true").option("inferSchema", "true").csv("./src/main/resources/emp_data1.csv")
empDf.createOrReplaceTempView("empTbl")
import spark.sqlContext.implicits._
val metaSchema = empDf.schema.prettyJson
val schmeaDataset = spark.createDataset(metaSchema :: Nil)
val schemaDf = spark.read.json(schmeaDataset)
schemaDf.createOrReplaceTempView("schemaTempView")
val schemaInTbl = spark.sql("SELECT exp.name, exp.type FROM schemaTempView" +
" LATERAL VIEW explode(fields) explodeTbl as exp")
schemaInTbl.show
} catch {
case e: Exception => {
println(e.printStackTrace())
}
}
}
}Wrapping Up
In this post, we have learned to get the data frame schema in JSON format, convert that into a table having columns name and data type of the respective columns.