Suppose the source data is in a file. The file format is a text format. The requirement is to load the text file into hive table using Spark. In addition to this, read the data from the hive table using Spark. Therefore, let’s break the task into sub-task:
- Load the text file into Hive table.
- Read the data from hive table.
To achieve the requirement, the following components are involved:
- Hive: Used to Store data
- Spark: Used to parse the file and load into hive table
Here using PySpark API to load and process text data into hive.
The sample data which has been taken for this demonstration looks like:
- 1,Lillian,Slater,purus.Nullam.email@example.com,0800 1111,Muridke,Andorra
- 2,Donna,Pennington,Donec@euaugue.ca,0959 976 7759,Salzwedel,Bermuda
- 3,Hashim,Sharpe,firstname.lastname@example.org,(011391) 91113,Huntsville,Heard Island and Mcdonald Islands
- 4,Finn,Chang,email@example.com,0800 1111,Stokrooie,Rwanda
- 5,Harrison,Charles,firstname.lastname@example.org,0941 490 8683,Lutsel K'e,Nicaragua
- 6,Jack,Roth,email@example.com,055 7666 2472,Pievepelago,Macedonia
- 7,Yvette,Mullins,firstname.lastname@example.org,0900 938 9309,Falkirk,Brazil
- 8,Genevieve,Lamb,vel.sapien.imperdiet@Etiam.net,070 9080 4993,Nieuwkerken-Waas,Cook Islands
- 9,Nero,Chandler,velit.Sed@Quisque.org,0845 46 45,Alcobendas,Brunei
- 10,Lee,Richard,email@example.com,0896 175 3049,Sommariva Perno,Bosnia and Herzegovina
You can download the entire sample file from below download link:sample_data
Step 1: Initialization of Spark Context and Hive Context
The first step is to initialize Spark Context and Hive Context. Spark Context will be used to work with spark core like RDD, whereas Hive Context will get used to working with Data frame. As we are going to use PySpark API, both the context will get initialize automatically. So there is no any extra effort needed to create these.
Start the PySpark:
[bdp ~]$ pyspark
Step 2: Set Path of Sample Data
The second step is to download the sample data (if not downloaded from the link provided at the beginning). Once you have data, move it or keep it to a local path. This local path is used while loading the data. In my case, local path of sample data is
Step 3: Load Data into RDD
In the step 3, will load the data which is available on the local path. Here, we will use Spark Context.
# Load sample text file into a RDD >>> textDataRDD = sc.textFile("file:///home/bdp/Spark/Load_Text_file_Into_Hive_Table_Using_Spark/sample_data.txt")
In the above command, sc is SparkContext. We are using a function called textFile and passing an argument (path of file location) to load data into RDD.
Let’s verify that the return variable is RDD or not.
# Check Header type >>> type(textDataRDD)
Let’s see data of RDD. We can use take function on RDD to see the data.
# Display RDD value >>> textDataRDD.take(5)
If you notice, RDD contains the header of the file. We need to remove the header from the RDD. Otherwise, it will become a record while converting RDD to Data Frame.
# Select Header from RDD >>> header = textDataRDD.first()
Check what is the type of header:
# Check Header type >>> type(header)
We have taken the first record from the RDD as a header. Let’s check the value:
# Print header >>> header
# Remove Header from RDD >>> textDataRDD = textDataRDD.filter(lambda x:x != header)
The above filter will remove the header from the RDD records. Check the RDD records:
# Display RDD value >>> textDataRDD.take(5)
The requirement is to load the data into a hive table. Here data frame comes into the picture. This provides facility to interact with hive through spark. The Hive Context will be used here.
# Convert RDD to Dataframe >>> textDataDF = textDataRDD.toDF()
Check the type:
# Verify the Dataframe >>> type(textDataDF)
# Display dataframe >>> textDataDF.show(5)
Step 6: Map function in Data Frame
In step 5, displaying data without any proper column name. Let’s defines the column name.
# Map in Data Frame >>> from pyspark.sql import Row >>> textDataDF = textDataDF.map(lambda x: Row(id = x, first_name = x, last_name = x, email = x, phone = x, city = x, country = x)).toDF()
Here, importing Row lib to point row of the data frame and defining a name for each column.
Let’s check the data after defining column name:
# Display dataframe >>> textDataDF.show()
Step 7: Load Data into Hive table
This is the step where loading data frame into hive table. Below command used to achieve this:
# Store data frame into hive table >>> textDataDF.write.format("ORC").saveAsTable("db_bdp.textData")
Here, we are using write format function which defined the storage format of the data in hive table and saveAsTable function which stores the data frame data into a provided hive table.
Step 8: Read data from Hive Table using Spark
Lastly, we can verify the data of hive table. Below command is used to get data from hive table:
>>> result = sqlContext.sql("FROM db_bdp.textData SELECT *")
The result having the data fetched from hive table.
In this requirement, we have worked on both RDD and Data Frame. We have loaded the data directly to the RDD from the local path using Spark Context. Then converted RDD to Data Frame. The Hive Context has been used to interact with hive from Spark.