Load JSON Data into Hive Partitioned table using PySpark

Load JSON Data into Hive Partitioned table using PySpark

Requirement

In the last post, we have demonstrated how to load JSON data in Hive non-partitioned table. This time having the same sample JSON data. The requirement is to load JSON Data into Hive Partitioned table using Spark. The hive table will be partitioned by some column(s). The below task will get achieve with fulfilling the requirement.

  • Parse JSON data and read it.
  • Process the data with Business Logic (If any)
  • Stored in a hive partition table.

Components Involved

To achieve the requirement, below components will be used:

  • Hive – It is used to store data in non-partitioned and in ORC format.
  • Spark SQL – It is used to load the JSON data, process and store into the hive table.

Solution

Step 1: Get JSON Sample data

First of all, let’s have a look at the sample data. The sample of JSON formatted data:

Json Sample Data
 
  1. [{"id":1,"first_name":"Steven","last_name":"Garrett","email":"sgarrett0@amazon.co.uk","gender":"Male","designation":"Design Engineer","phone":"62-(454)694-4544","country":"Indonesia"},{"id":2,"first_name":"Brandon","last_name":"Green","email":"bgreen1@fc2.com","gender":"Male","designation":"Graphic Designer","phone":"7-(587)965-1714","country":"Russia"},{"id":3,"first_name":"Martha","last_name":"Bennett","email":"mbennett2@abc.net.au","gender":"Female","designation":"Recruiting Manager","phone":"48-(582)234-3809","country":"Poland"},{"id":4,"first_name":"Samuel","last_name":"Lopez","email":"slopez3@usgs.gov","gender":"Male","designation":"Automation Specialist I","phone":"33-(654)376-1795","country":"France"},{"id":5,"first_name":"Judy","last_name":"Bishop","email":"jbishop4@163.com","gender":"Female","designation":"Librarian","phone":"81-(509)760-1241","country":"Japan"},{"id":6,"first_name":"Anna","last_name":"Morales","email":"amorales5@eventbrite.com","gender":"Female","designation":"Assistant Professor","phone":"33-(675)922-1030","country":"France"},{"id":7,"first_name":"Benjamin","last_name":"Walker","email":"bwalker6@furl.net","gender":"Male","designation":"Computer Systems Analyst II","phone":"86-(249)310-6467","country":"China"},{"id":8,"first_name":"Sean","last_name":"Perkins","email":"sperkins7@usatoday.com","gender":"Male","designation":"Nurse","phone":"1-(504)398-8997","country":"Canada"}]

Download sample data from below link:

sample json data

Step 2: Verify hive database

Let’s verify first, the database exists or not. Hive database where target table will get created:

Hive DB
 
hive> use bdp_db;
OK
Time taken: 0.024 seconds

So the hive database named ‘bdp_db’ exists. Now let’s see tables are presents or not:

Hive Show Table
 
hive> show tables;
OK
Time taken: 0.274 seconds

Here, it does not have any table. So let’s create our hive table under this database.

Step 3: Create Hive Partitioned Table

This is required to create hive table.  This hive table will get used to loading the JSON data. Here, we are creating a hive table which is partitioned by COUNTRY.

Create Hive Partitioned Table
 
hive>DROP table IF EXISTS cust_partition;
CREATE TABLE cust_partition(
designation STRING,
email STRING,
first_name STRING,
gender STRING,
id INT,
last_name STRING,
phone STRING
)
PARTITIONED BY
(country STRING)
STORED AS ORC
TBLPROPERTIES
('hive.exec.dynamic.partition.mode'='nonstrict',
'hive.exec.dynamic.partition'='true',
'orc.compress'='SNAPPY');

Here, first I have used the drop statement to drop a table if the table exists. In the create statement, have taken country column as a partition column.

Step 4: Execution on Spark-shell

We are done with hive table creation. Now, moving to the next step. Start the pyspark using command pyspark.

 
 
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.6.1
      /_/
Using Python version 2.7.12 (default, Jun 29 2016 11:08:50)
SparkContext available as sc, HiveContext available as sqlContext.

Here, if you see the last line, spark context available as sc.

Load JSON to Dataframe

import json in df
 
>>> import json
>>> from pyspark.sql import HiveContext 
>>> hiveContext = HiveContext(sc)

First of all, I have imported JSON package to load the JSON data. Next, created hive context to access hive table.

 
 
>>>hiveContext.setConf("hive.exec.dynamic.partition", "true")
>>>hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")

In addition, I have overwritten two hive properties. These properties are required for partition. Otherwise, you will get an error like “Dynamic partition strict mode requires at least one static partition column“.

 
 
>>> jsonDF = hiveContext.read.json('file:///home/bdp/My_Work_Book/Spark/jsondata.json')

Now load the JSON file. It will return a data frame. The variable named ‘jsonDF’ holds the return data frame. Let’s check the schema of the created dataframe. We can use printSchema() function to see the schema.

df schema
 
>>> jsonDF.printSchema()

Show Dataframe data

The show() function is used to show the dataframe record(s). In addition, it takes an argument to limit the records.

 
 
>>> jsonDF.show(3)

Here, it is showing only 3 records because I have given 3 in show function.

Insert Dataframe to Hive Table

Now, we have dataframe. Next, insert this dataframe into the created hive table.

insert into hive partitioned table
 
>>>jsonDF.write.mode("append").partitionBy('country').insertInto("bdp_db.cust_partition")

Here, I have used insertInto function to store dataframe in the hive. In addition, we can also use the saveAsTable function. I have created hive table partitioned by country. So while inserting dataframe into hive table,  passing country as a partition.

Step 5: Verify the data in Hive

Finally, we have populated the hive partitioned table with the data. Now, we will check the data and how many partition(s) has been created.

select data
 
hive>  select * from cust_partition limit 3;

Let’s check partitions of the table.

Get Partitions
 
hive> show partitions cust_partition;

As a result, three partitions have been created based on the country value.

Wrapping Up

Here we have covered how to load JSON data into a Hive partitioned table. The table is partitioned by one column. In addition, we can also partition it by more columns. Therefore, in that case, we need to update the table’s DDL. In order to update DDL, mention all the columns name with the data type in the partitioned block. The same partitioned columns separated by ‘,’ (comma), will get passed to the partitionBy function in spark.

87
0

Join in hive with example

Requirement You have two table named as A and B. and you want to perform all types of join in ...
Read More

Join in pyspark with example

Requirement You have two table named as A and B. and you want to perform all types of join in ...
Read More

Join in spark using scala with example

Requirement You have two table named as A and B. and you want to perform all types of join in ...
Read More

Java UDF to convert String to date in PIG

About Code Many times it happens like you have received data from many systems and each system operates on a ...
Read More
/ java udf, Pig, pig, pig udf, string to date, udf

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.