Requirement
In the last post, we have demonstrated how to load JSON data in Hive non-partitioned table. This time we are having the same sample JSON data. The requirement is to load JSON Data into Hive Partitioned table using Spark. The hive table will be partitioned by some column(s). The below tasks will fulfill the requirement.
- Parse JSON data and read it.
- Process the data with Business Logic (If any)
- Stored in a hive partition table.
Components Involved
To achieve the requirement, below components will be used:
- Hive – It is used to store data in non-partitioned with ORC format.
- Spark SQL – It is used to load the JSON data, process and store into the hive table.
Solution
Step 1: Get JSON Sample data
First of all, let’s have a look at the sample data. The sample of JSON formatted data:
[{"id":1,"first_name":"Steven","last_name":"Garrett","email":"sgarrett0@amazon.co.uk","gender":"Male","designation":"Design Engineer","phone":"62-(454)694-4544","country":"Indonesia"},{"id":2,"first_name":"Brandon","last_name":"Green","email":"bgreen1@fc2.com","gender":"Male","designation":"Graphic Designer","phone":"7-(587)965-1714","country":"Russia"},{"id":3,"first_name":"Martha","last_name":"Bennett","email":"mbennett2@abc.net.au","gender":"Female","designation":"Recruiting Manager","phone":"48-(582)234-3809","country":"Poland"},{"id":4,"first_name":"Samuel","last_name":"Lopez","email":"slopez3@usgs.gov","gender":"Male","designation":"Automation Specialist I","phone":"33-(654)376-1795","country":"France"},{"id":5,"first_name":"Judy","last_name":"Bishop","email":"jbishop4@163.com","gender":"Female","designation":"Librarian","phone":"81-(509)760-1241","country":"Japan"},{"id":6,"first_name":"Anna","last_name":"Morales","email":"amorales5@eventbrite.com","gender":"Female","designation":"Assistant Professor","phone":"33-(675)922-1030","country":"France"},{"id":7,"first_name":"Benjamin","last_name":"Walker","email":"bwalker6@furl.net","gender":"Male","designation":"Computer Systems Analyst II","phone":"86-(249)310-6467","country":"China"},{"id":8,"first_name":"Sean","last_name":"Perkins","email":"sperkins7@usatoday.com","gender":"Male","designation":"Nurse","phone":"1-(504)398-8997","country":"Canada"}]
Download sample data from the below link:
sample json dataStep 2: Create hive database
First, open the hive CLI and create a database using below command. If the database is available, then select the database using “use” command.
hive> CREATE DATABASE bdp_db; OK Time taken: 0.024 seconds hive> use bdp_db; OK Time taken: 0.018 seconds
Step 3: Create Hive Partitioned Table
This hive table will be used to load the JSON data. Here, we are creating a hive table which is partitioned by COUNTRY.
hive>CREATE TABLE cust_partition( designation STRING, email STRING, first_name STRING, gender STRING, id INT, last_name STRING, phone STRING ) PARTITIONED BY (country STRING) STORED AS ORC TBLPROPERTIES ('hive.exec.dynamic.partition.mode'='nonstrict', 'hive.exec.dynamic.partition'='true', 'orc.compress'='SNAPPY');
Here, in the create statement, we have taken country column as a partition column.
Step 4: Execution on Spark-shell
Start the pyspark using the command:
pyspark
Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 1.6.1 /_/ Using Python version 2.7.12 (default, Jun 29 2016 11:08:50) SparkContext available as sc, HiveContext available as sqlContext.
Here, if you see the last line, spark context available as sc.
Load JSON to Dataframe
>>> import json >>> from pyspark.sql import HiveContext >>> hiveContext = HiveContext(sc)
First of all, I have imported the JSON package to load the JSON data. Next, created hive context to access the hive table.
>>>hiveContext.setConf("hive.exec.dynamic.partition", "true") >>>hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
In addition, I have overwritten two hive properties. These properties are required for partition. Otherwise, you will get an error like “Dynamic partition strict mode requires at least one static partition column“.
>>> jsonDF = hiveContext.read.json('file:///home/bdp/My_Work_Book/Spark/jsondata.json')
Now load the JSON file. It will return a data frame. The variable named ‘jsonDF’ holds the return data frame. Let’s check the schema of the created dataframe. We can use printSchema() function to see the schema.
>>> jsonDF.printSchema()
Show Dataframe data
The show() function is used to show the dataframe record(s). In addition, it takes an argument to limit the records.
>>> jsonDF.show(3)
Here, it is showing only 3 records because I have given 3 in show function.
Insert Dataframe to Hive Table
Now, we have dataframe. Next, insert this dataframe into the created hive table.
>>>jsonDF.write.mode("append").partitionBy('country').insertInto("bdp_db.cust_partition")
Here, I have used insertInto function to store dataframe into the hive. In addition, we can also use the saveAsTable function. I have created a hive table partitioned by country. So while inserting dataframe into a hive table, passing country as a partition.
Step 5: Verify the data in Hive
Finally, we have populated the hive partitioned table with the data. Now, we will check the data and see how many partitions (s) has been created.
hive> select * from cust_partition limit 3;
Let’s check partitions of the table.
hive> show partitions cust_partition;
As a result, three partitions have been created based on country value.
Wrapping Up
Here, we have covered how to load JSON data into a Hive partitioned table. The table is partitioned by one column. In addition, we can also partition it with more columns. Therefore, in that case, we need to update the table’s DDL. In order to update DDL, mention all the columns name with the data type in the partitioned block. The same partitioned columns separated by ‘,’ (comma), need to be passed in the partitionBy function of spark.