Requirement
In our Read JSON file in Spark post, we have read a simple JSON file into a Spark Dataframe. In this post, we are moving to handle an advanced JSON data type. We will read nested JSON in spark Dataframe.
Sample Data
We are going to use below sample data set for this exercise.
{ "filename": "orderDetails", "datasets": [ { "orderId": "ord1001", "customerId": "cust5001", "orderDate": "2021-12-24 00.00.00.000", "shipmentDetails": { "street": "M.G.Road", "city": "Delhi", "state": "New Delhi", "postalCode": "110040", "country": "India" }, "orderDetails": [ { "productId": "prd9001", "quantity": 2, "sequence": 1, "totalPrice": { "gross": 550, "net": 500, "tax": 50 } }, { "productId": "prd9002", "quantity": 3, "sequence": 2, "totalPrice": { "gross": 300, "net": 240, "tax": 60 } } ] }, { "orderId": "ord1002", "customerId": "cust5002", "orderDate": "2021-12-25 00.00.00.000", "shipmentDetails": { "street": "Malad", "city": "Mumbai", "state": "Maharastra", "postalCode": "400064", "country": "India" }, "orderDetails": [ { "productId": "prd9001", "quantity": 1, "sequence": 1, "totalPrice": { "gross": 275, "net": 250, "tax": 25 } }, { "productId": "prd9004", "quantity": 4, "sequence": 2, "totalPrice": { "gross": 1000, "net": 900, "tax": 100 } } ] } ] }
Solution
In this exercise, we are going to perform step-by-step for each layer of JSON data. It will help to understand the data and logic in sync.
Step 1: Load JSON data into Spark Dataframe using API
In this step, we will first load the JSON file using the existing spark API.
val ordersDf = spark.read.format("json") .option("inferSchema", "true") .option("multiLine", "true") .load("/FileStore/tables/orders_sample_datasets.json")
After loading the JSON data in data frame, the data has been loaded with some complex data types like Array. We will extract the element and make it available at a column level. Let’s perform further steps in order to achieve this.
Step 2: Explode Array datasets in Spark Dataframe
In this step, we have used explode function of spark. This will flatten the array elements.
For using explode, need to import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._ var parseOrdersDf = ordersDf.withColumn("orders", explode($"datasets"))
Here, we have performed the explode on datasets column as it is having an Array type. After performing explode on the datasets column, we can see it has flattened the data into multiple rows.
Step 3: Fetch each order using GetItem on Explored columns
Once explode, we can retrieve individual attributes from the exploded data using getItem.
import org.apache.spark.sql.functions._ var parseOrdersDf = ordersDf.withColumn("orders", explode($"datasets")) // Step 3: Fetch Each Order using getItem on explode column parseOrdersDf = parseOrdersDf.withColumn("customerId", $"orders".getItem("customerId")) .withColumn("orderId", $"orders".getItem("orderId")) .withColumn("orderDate", $"orders".getItem("orderDate")) .withColumn("orderDetails", $"orders".getItem("orderDetails")) .withColumn("shipmentDetails", $"orders".getItem("shipmentDetails"))
Here, we have used getItem to retrieve customerId, OrderId, orderDate, ShipmentDate attributes from orderDetails.
Now, if you see the output, after extracting each element from orders, we are seeing orderDetails are having an array of data and shipmentdetails have object data. Let’s work on these 2 column data in the next step.
Step 4: Explode Order details Array Data
In this step, we have performed the explode on orderDetails as it is having array data.
parseOrdersDf = parseOrdersDf.withColumn("orderDetails", explode($"orderDetails"))
After exploding, we can see that it has been flattened into multiple rows.
Step 5: Fetch Orders Details and Shipment Details
In this step, we will fetch data from object type from the Order and Shipment Details column using getItem function.
parseOrdersDf = parseOrdersDf.withColumn("productId", $"orderDetails".getItem("productId")) .withColumn("quantity", $"orderDetails".getItem("quantity")) .withColumn("sequence", $"orderDetails".getItem("sequence")) .withColumn("totalPrice", $"orderDetails".getItem("totalPrice")) .withColumn("city", $"shipmentDetails".getItem("city")) .withColumn("country", $"shipmentDetails".getItem("country")) .withColumn("postalcode", $"shipmentDetails".getItem("postalCode")) .withColumn("street", $"shipmentDetails".getItem("street")) .withColumn("state", $"shipmentDetails".getItem("state"))
After fetching order details, now totalPrice is having 2 objects – gross, net, and tax. Let’s make it available in the column format in the data frame.
Step 6: Convert totalPrice to column
In this step, we will fetch the gross, net, and tax amount from totalPrice using the below code.
parseOrdersDf = parseOrdersDf.withColumn("gross", $"totalprice".getItem("gross")) .withColumn("net", $"totalprice".getItem("net")) .withColumn("tax", $"totalprice".getItem("tax"))
We have parsed JSON data into a data frame. Currently, we have kept all the columns in the data frame. Now, we can either delete unwanted columns like dataset, filename or select only required columns from the data frame.
Step 7: Final DataFrame with selected columns
Here, we will retrieve the required columns from the Dataframe using the SELECT function.
We can also check the data type of each column. In case, if any data type required to change, we can cast it into the required data type.
Full Code Snippet
import org.apache.spark.sql.functions._ // Step 1: Load Nested JSON data into Spark Dataframe val ordersDf = spark.read.format("json") .option("inferSchema", "true") .option("multiLine", "true") .load("/FileStore/tables/orders_sample_datasets.json") // Step 2: Explode - var parseOrdersDf = ordersDf.withColumn("orders", explode($"datasets")) // Step 3: Fetch Each Order using getItem on explode column parseOrdersDf = parseOrdersDf.withColumn("customerId", $"orders".getItem("customerId")) .withColumn("orderId", $"orders".getItem("orderId")) .withColumn("orderDate", $"orders".getItem("orderDate")) .withColumn("orderDetails", $"orders".getItem("orderDetails")) .withColumn("shipmentDetails", $"orders".getItem("shipmentDetails")) // Step 4: Explode orderDetails column to flatten all the rows parseOrdersDf = parseOrdersDf.withColumn("orderDetails", explode($"orderDetails")) // Step 5: Fetch attributes from object and make it available in a column parseOrdersDf = parseOrdersDf.withColumn("productId", $"orderDetails".getItem("productId")) .withColumn("quantity", $"orderDetails".getItem("quantity")) .withColumn("sequence", $"orderDetails".getItem("sequence")) .withColumn("totalPrice", $"orderDetails".getItem("totalPrice")) .withColumn("city", $"shipmentDetails".getItem("city")) .withColumn("country", $"shipmentDetails".getItem("country")) .withColumn("postalcode", $"shipmentDetails".getItem("postalCode")) .withColumn("street", $"shipmentDetails".getItem("street")) .withColumn("state", $"shipmentDetails".getItem("state")) // Step 6: Fetch gross, net and tax from totalprice object parseOrdersDf = parseOrdersDf.withColumn("gross", $"totalprice".getItem("gross")) .withColumn("net", $"totalprice".getItem("net")) .withColumn("tax", $"totalprice".getItem("tax")) // Step 7: Select required columns from the dataframe val jsonParseOrdersDf = parseOrdersDf.select($"orderId" ,$"customerId" ,$"orderDate" ,$"productId" ,$"quantity" ,$"sequence" ,$"gross" ,$"net" ,$"tax" ,$"street" ,$"city" ,$"state" ,$"postalcode" ,$"country") display(jsonParseOrdersDf)
Wrapping Up
The JSON is a widely used file format. It is good to have a clear understanding of how to parse nested JSON and load it into a data frame as this is the first step of the process. In this post, we tried to explain step by step how to deal with nested JSON data in the Spark data frame.