Requirement
In this post, we will learn how to convert a table’s schema into a Data Frame in Spark.
Sample Data
empno | ename | designation | manager | hire_date | sal | deptno | location |
9369 | SMITH | CLERK | 7902 | 12/17/1980 | 800 | 20 | BANGALORE |
9499 | ALLEN | SALESMAN | 7698 | 2/20/1981 | 1600 | 30 | HYDERABAD |
9521 | WARD | SALESMAN | 7698 | 2/22/1981 | 1250 | 30 | PUNE |
9566 | TURNER | MANAGER | 7839 | 4/2/1981 | 2975 | 20 | MUMBAI |
9654 | MARTIN | SALESMAN | 7698 | 9/28/1981 | 1250 | 30 | CHENNAI |
9369 | SMITH | CLERK | 7902 | 12/17/1980 | 800 | 20 | KOLKATA |
Solution
Step 1: Load data into DF
val empDf = spark.read.option("header", "true").csv("./src/main/resources/emp_data1.csv") empDf.createOrReplaceTempView("empTbl")
Step 2: Extract Schema in Complex Data Type
val metaSchema = empDf.schema.prettyJson val schmeaDataset = spark.createDataset(metaSchema :: Nil) val schemaDf = spark.read.json(schmeaDataset) schemaDf.createOrReplaceTempView("schemaTempView")
Step 3: Explode complex Data Type
val schemaInTbl = spark.sql("SELECT exp.name, exp.type FROM schemaTempView" + " LATERAL VIEW explode(fields) explodeTbl as exp") schemaInTbl.show
Full Code
import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession object ConvertSchemaToDF { def main(args: Array[String]): Unit = { val config = new SparkConf() config.set("spark.driver.allowMultipleContexts", "true") Logger.getLogger("org").setLevel(Level.OFF) val spark = SparkSession.builder().appName("Convert Schema to DF") .config("spark.master", "local") .getOrCreate() try { val empDf = spark.read.option("header", "true").option("inferSchema", "true").csv("./src/main/resources/emp_data1.csv") empDf.createOrReplaceTempView("empTbl") import spark.sqlContext.implicits._ val metaSchema = empDf.schema.prettyJson val schmeaDataset = spark.createDataset(metaSchema :: Nil) val schemaDf = spark.read.json(schmeaDataset) schemaDf.createOrReplaceTempView("schemaTempView") val schemaInTbl = spark.sql("SELECT exp.name, exp.type FROM schemaTempView" + " LATERAL VIEW explode(fields) explodeTbl as exp") schemaInTbl.show } catch { case e: Exception => { println(e.printStackTrace()) } } } }
Wrapping Up
In this post, we have learned to get the data frame schema in JSON format, convert that into a table having columns name and data type of the respective columns.