Read Nested JSON in Spark DataFrame

Requirement

In our Read JSON file in Spark post, we have read a simple JSON file into a Spark Dataframe. In this post, we are moving to handle an advanced JSON data type. We will read nested JSON in spark Dataframe.

Sample Data

We are going to use below sample data set for this exercise.

 {
    "filename": "orderDetails",
    "datasets": [
        {
            "orderId": "ord1001",
            "customerId": "cust5001",
            "orderDate": "2021-12-24 00.00.00.000",
            "shipmentDetails": {
                "street": "M.G.Road",
                "city": "Delhi",
                "state": "New Delhi",
                "postalCode": "110040",
                "country": "India"
            },
            "orderDetails": [
              {
                "productId": "prd9001",
                "quantity": 2,
                "sequence": 1,
                "totalPrice": {
                  "gross": 550,
                  "net": 500,
                  "tax": 50
                }
              },
              {
                "productId": "prd9002",
                "quantity": 3,
                "sequence": 2,
                "totalPrice": {
                  "gross": 300,
                  "net": 240,
                  "tax": 60
                }
              }
            ]
        },
        {
            "orderId": "ord1002",
            "customerId": "cust5002",
            "orderDate": "2021-12-25 00.00.00.000",
            "shipmentDetails": {
                "street": "Malad",
                "city": "Mumbai",
                "state": "Maharastra",
                "postalCode": "400064",
                "country": "India"
            },
            "orderDetails": [
              {
                "productId": "prd9001",
                "quantity": 1,
                "sequence": 1,
                "totalPrice": {
                  "gross": 275,
                  "net": 250,
                  "tax": 25
                }
              },
              {
                "productId": "prd9004",
                "quantity": 4,
                "sequence": 2,
                "totalPrice": {
                  "gross": 1000,
                  "net": 900,
                  "tax": 100
                }
              }
            ]
        }
    ]
}

Solution

In this exercise, we are going to perform step-by-step for each layer of JSON data. It will help to understand the data and logic in sync. 

Step 1: Load JSON data into Spark Dataframe using API

In this step, we will first load the JSON file using the existing spark API. 

 val ordersDf = spark.read.format("json")
                         .option("inferSchema", "true")
                         .option("multiLine", "true")
                         .load("/FileStore/tables/orders_sample_datasets.json")

After loading the JSON data in data frame, the data has been loaded with some complex data types like Array. We will extract the element and make it available at a column level. Let’s perform further steps in order to achieve this.

Step 2: Explode Array datasets in Spark Dataframe

In this step, we have used explode function of spark. This will flatten the array elements.

For using explode, need to import org.apache.spark.sql.functions._

import org.apache.spark.sql.functions._
var parseOrdersDf = ordersDf.withColumn("orders", explode($"datasets"))

Here, we have performed the explode on datasets column as it is having an Array type. After performing explode on the datasets column, we can see it has flattened the data into multiple rows.

Step 3: Fetch each order using GetItem on Explored columns

Once explode, we can retrieve individual attributes from the exploded data using getItem.

 import org.apache.spark.sql.functions._
var parseOrdersDf = ordersDf.withColumn("orders", explode($"datasets"))

// Step 3: Fetch Each Order using getItem on explode column
parseOrdersDf = parseOrdersDf.withColumn("customerId", $"orders".getItem("customerId"))
                             .withColumn("orderId", $"orders".getItem("orderId"))
                             .withColumn("orderDate", $"orders".getItem("orderDate"))
                             .withColumn("orderDetails", $"orders".getItem("orderDetails"))
                             .withColumn("shipmentDetails", $"orders".getItem("shipmentDetails"))

Here, we have used getItem to retrieve customerId, OrderId, orderDate, ShipmentDate attributes from orderDetails. 

Now, if you see the output, after extracting each element from orders, we are seeing orderDetails are having an array of data and shipmentdetails have object data. Let’s work on these 2 column data in the next step.

Step 4: Explode Order details Array Data

In this step, we have performed the explode on orderDetails as it is having array data.

 parseOrdersDf = parseOrdersDf.withColumn("orderDetails", explode($"orderDetails"))

After exploding, we can see that it has been flattened into multiple rows. 

Step 5: Fetch Orders Details and Shipment Details

In this step, we will fetch data from object type from the Order and Shipment Details column using getItem function.

parseOrdersDf = parseOrdersDf.withColumn("productId", $"orderDetails".getItem("productId"))
                             .withColumn("quantity", $"orderDetails".getItem("quantity"))
                             .withColumn("sequence", $"orderDetails".getItem("sequence"))
                             .withColumn("totalPrice", $"orderDetails".getItem("totalPrice"))
                             .withColumn("city", $"shipmentDetails".getItem("city"))
                             .withColumn("country", $"shipmentDetails".getItem("country"))
                             .withColumn("postalcode", $"shipmentDetails".getItem("postalCode"))
                             .withColumn("street", $"shipmentDetails".getItem("street"))
                             .withColumn("state", $"shipmentDetails".getItem("state"))

After fetching order details, now totalPrice is having 2 objects – gross, net, and tax. Let’s make it available in the column format in the data frame.

Step 6: Convert totalPrice to column

In this step, we will fetch the gross, net, and tax amount from totalPrice using the below code.

 parseOrdersDf = parseOrdersDf.withColumn("gross", $"totalprice".getItem("gross"))
                             .withColumn("net", $"totalprice".getItem("net"))
                             .withColumn("tax", $"totalprice".getItem("tax"))

7 
parseOrdersDf 
di splay (parseOrdersDf) 
(I) Spark Jobs 
. wi thC01umn ( net , 
. wi thC01umn( tax 
$"totalpri ce" . getltem (t' gross") ) 
$ "totatpri ceti . getltem( "net") ) 
$ totatpri ce" . getltem( "tax 
parseOrdersDf: org.apache.spark.sql.DataFrame = [datasets: array, filename: string ... 18 more fields] 
Table Data Profile 
productld 
d"} prd9001 
d"} prd9002 
quantity 
2 
3 
sequence 
1 
2 
totalPrice 
{"gross" 
{"gross" 
: 550 " 
, net": 500, •tax": 50} 
: 300 " 
, net": 240, •tax": 60} 
city 
Delhi 
Delhi 
country 
India 
India 
postalcode 
110040 
110040 
M.G.Road 
M.G.Road 
New 
New

We have parsed JSON data into a data frame. Currently, we have kept all the columns in the data frame. Now, we can either delete unwanted columns like dataset, filename or select only required columns from the data frame.

Step 7: Final DataFrame with selected columns

Here, we will retrieve the required columns from the Dataframe using the SELECT function.

We can also check the data type of each column. In case, if any data type required to change, we can cast it into the required data type.

Full Code Snippet

import org.apache.spark.sql.functions._

// Step 1: Load Nested JSON data into Spark Dataframe
val ordersDf = spark.read.format("json")
                         .option("inferSchema", "true")
                         .option("multiLine", "true")
                         .load("/FileStore/tables/orders_sample_datasets.json")

// Step 2: Explode - 
var parseOrdersDf = ordersDf.withColumn("orders", explode($"datasets"))

// Step 3: Fetch Each Order using getItem on explode column
parseOrdersDf = parseOrdersDf.withColumn("customerId", $"orders".getItem("customerId"))
                             .withColumn("orderId", $"orders".getItem("orderId"))
                             .withColumn("orderDate", $"orders".getItem("orderDate"))
                             .withColumn("orderDetails", $"orders".getItem("orderDetails"))
                             .withColumn("shipmentDetails", $"orders".getItem("shipmentDetails"))

// Step 4: Explode orderDetails column to flatten all the rows
parseOrdersDf = parseOrdersDf.withColumn("orderDetails", explode($"orderDetails"))

// Step 5: Fetch attributes from object and make it available in a column
parseOrdersDf = parseOrdersDf.withColumn("productId", $"orderDetails".getItem("productId"))
                             .withColumn("quantity", $"orderDetails".getItem("quantity"))
                             .withColumn("sequence", $"orderDetails".getItem("sequence"))
                             .withColumn("totalPrice", $"orderDetails".getItem("totalPrice"))
                             .withColumn("city", $"shipmentDetails".getItem("city"))
                             .withColumn("country", $"shipmentDetails".getItem("country"))
                             .withColumn("postalcode", $"shipmentDetails".getItem("postalCode"))
                             .withColumn("street", $"shipmentDetails".getItem("street"))
                             .withColumn("state", $"shipmentDetails".getItem("state"))

// Step 6: Fetch gross, net and tax from totalprice object
parseOrdersDf = parseOrdersDf.withColumn("gross", $"totalprice".getItem("gross"))
                             .withColumn("net", $"totalprice".getItem("net"))
                             .withColumn("tax", $"totalprice".getItem("tax"))

// Step 7: Select required columns from the dataframe
val jsonParseOrdersDf = parseOrdersDf.select($"orderId"
                                           ,$"customerId"
                                           ,$"orderDate"
                                           ,$"productId"
                                           ,$"quantity"
                                           ,$"sequence"
                                           ,$"gross"
                                           ,$"net"
                                           ,$"tax"
                                           ,$"street"
                                           ,$"city"
                                           ,$"state"
                                           ,$"postalcode"
                                           ,$"country")

display(jsonParseOrdersDf)

Wrapping Up

The JSON is a widely used file format. It is good to have a clear understanding of how to parse nested JSON and load it into a data frame as this is the first step of the process. In this post, we tried to explain step by step how to deal with nested JSON data in the Spark data frame.

Sharing is caring!

Subscribe to our newsletter
Loading

Leave a Reply