Load Text file into Hive Table Using Spark

Requirement

Suppose the source data is in a file. The file format is a text format. The requirement is to load the text file into a hive table using Spark. In addition to this, read the data from the hive table using Spark. Therefore, let’s break the task into sub-tasks:

  • Load the text file into Hive table.
  • Read the data from the hive table.

Components Involved

To achieve the requirement, the following components are involved:

  • Hive: Used to Store data
  • Spark 1.6: Used to parse the file and load into hive table

Here, using PySpark API to load and process text data into the hive.

Sample Data

The sample data which has been taken for this demonstration looks like:

id,first_name,last_name,email,phone,city,country
1,Lillian,Slater,purus.Nullam.scelerisque@nonsollicitudin.com,0800 1111,Muridke,Andorra
2,Donna,Pennington,Donec@euaugue.ca,0959 976 7759,Salzwedel,Bermuda
3,Hashim,Sharpe,est@lacus.edu,(011391) 91113,Huntsville,Heard Island and Mcdonald Islands
4,Finn,Chang,consectetuer@eunequepellentesque.org,0800 1111,Stokrooie,Rwanda
5,Harrison,Charles,eleifend@nisi.net,0941 490 8683,Lutsel K'e,Nicaragua
6,Jack,Roth,euismod.urna@orciin.net,055 7666 2472,Pievepelago,Macedonia
7,Yvette,Mullins,enim@tellus.ca,0900 938 9309,Falkirk,Brazil
8,Genevieve,Lamb,vel.sapien.imperdiet@Etiam.net,070 9080 4993,Nieuwkerken-Waas,Cook Islands
9,Nero,Chandler,velit.Sed@Quisque.org,0845 46 45,Alcobendas,Brunei
10,Lee,Richard,porttitor@egetlaoreetposuere.ca,0896 175 3049,Sommariva Perno,Bosnia and Herzegovina

You can download the entire sample file from below link:

sample_data

Solution

Step 1: Initialization of Spark Context and Hive Context

The first step is to initialize the Spark Context and Hive Context. Spark Context will be used to work with spark core like RDD, whereas Hive Context is used to work with Data frame. As we are going to use PySpark API, both the context will get initialized automatically. So, it is not needed to create these.

Start the PySpark:

[bdp ~]$ pyspark

Here in the above picture, we can see SparkContext is available as sc and HiveContext is available as sqlContext. We will use these two for the rest steps of the demonstration.

Step 2: Set Path of Sample Data

The second step is to download the sample data (if not downloaded from the link provided at the beginning). Once you have data, move it or keep it to a local path. This local path is used while loading the data. In my case, the local path of sample data is

/home/bdp/Spark/Load_Text_file_Into_Hive_Table_Using_Spark/sample_data.txt

Step 3: Load Data into RDD

We will load the data which is available on the local path. Here, we will use the Spark Context.

# Load sample text file into a RDD
>>> textDataRDD = sc.textFile("file:///home/bdp/Spark/Load_Text_file_Into_Hive_Table_Using_Spark/sample_data.txt")

In the above command, sc is SparkContext. We are using a function called textFile and passing an argument (location of file) to load data into RDD.

Let’s verify that the returned variable is RDD or not.

# Check Header type
>>> type(textDataRDD)

Let’s see data of RDD. We can use take function on RDD to see the data.

# Display RDD value
>>> textDataRDD.take(5)

Step 4: Remove Header from RDD data

If you notice, RDD contains the header of the file. We need to remove the header from the RDD. Otherwise, it will become a record while converting RDD to Data Frame.

# Select Header from RDD
>>> header = textDataRDD.first()

Check what is the type of header:

# Check Header type
>>> type(header)

We have taken the first record from the RDD as a header. Let’s check the value:

# Print header
>>> header

Now, remove the header from the RDD

# Remove Header from RDD
>>> textDataRDD = textDataRDD.filter(lambda x:x != header)

The above filter will remove the header from the RDD records. Check the RDD records:

# Display RDD value
>>> textDataRDD.take(5)

Step 5: Convert RDD to Data Frame

The requirement is to load the data into a hive table. Here, the data frame comes into the picture. This provides the facility to interact with the hive through spark. The Hive Context will be used here.

# Convert RDD to Dataframe
>>> textDataDF = textDataRDD.toDF()

Check the type:

# Verify the Dataframe
>>> type(textDataDF)

Read the records from the data frame:

# Display dataframe
>>> textDataDF.show(5)

Here it is representing column name as _1, _2, etc. Let’s set a name for all the columns.

Step 6: Map function in Data Frame

In step 5, displaying data without any proper column name. Let’s defines the column name.

# Map in Data Frame
>>> from pyspark.sql import Row
>>> textDataDF = textDataDF.map(lambda x: Row(id = x[0], first_name = x[1], last_name = x[2], email = x[3], phone = x[4], city = x[5], country = x[6])).toDF()

Here, importing Row lib to point a row of the data frame and defining a name for each column.

Let’s check the data after defining the column name:

# Display dataframe
>>> textDataDF.show()

Step 7: Load Data into Hive table

This step is loading data frame into the hive table. Below command is used to achieve this:

# Store data frame into hive table
>>> textDataDF.write.format("ORC").saveAsTable("db_bdp.textData")

Here, we are using write format function which defines the storage format of the data in hive table and saveAsTable function which stores the data frame into a provided hive table.

Step 8: Read data from Hive Table using Spark

Lastly, we can verify the data of hive table. Below command is used to get data from hive table:

>>> result = sqlContext.sql("FROM db_bdp.textData SELECT *")

Wrapping Up

In this requirement, we have worked on both RDD and Data Frame. We have loaded the data directly to the RDD from the local path using Spark Context. Then converted RDD to Data Frame. The Hive Context has been used to interact with the hive from Spark.

Sharing is caring!

Subscribe to our newsletter
Loading

Leave a Reply