Requirement
When we ingest data from source to Hadoop data lake, we used to add some additional columns with the existing data source. These columns basically help to validate and analyze the data. So, in this post, we will walk through how we can add some additional columns with the source data.
In addition to this, we will also check how to drop an existing column and rename the column in the spark data frame.
Sample Data
[ { "empno":"7369", "ename":"SMITH", "designation":"CLERK", "manager":"7902", "hire_date":"12/17/1980", "sal":"800", "deptno":"20" }, { "empno":"7499", "ename":"ALLEN", "designation":"SALESMAN", "manager":"7698", "hire_date":"2/20/1981", "sal":"1600", "deptno":"30" }, { "empno":"7521", "ename":"WARD", "designation":"SALESMAN", "manager":"7698", "hire_date":"2/22/1981", "sal":"1250", "deptno":"30" }, { "empno":"7566", "ename":"TURNER", "designation":"MANAGER", "manager":"7839", "hire_date":"4/2/1981", "sal":"2975", "deptno":"20" }, { "empno":"7654", "ename":"MARTIN", "designation":"SALESMAN", "manager":"7698", "hire_date":"9/28/1981", "sal":"1250", "deptno":"30" }, { "empno":"7698", "ename":"MILLER", "designation":"MANAGER", "manager":"7839", "hire_date":"5/1/1981", "sal":"2850", "deptno":"30" }, { "empno":"7782", "ename":"CLARK", "designation":"MANAGER", "manager":"7839", "hire_date":"6/9/1981", "sal":"2450", "deptno":"10" }, { "empno":"7788", "ename":"SCOTT", "designation":"ANALYST", "manager":"7566", "hire_date":"12/9/1982", "sal":"3000", "deptno":"20" }, { "empno":"7839", "ename":"KING", "designation":"PRESIDENT", "manager":"NULL", "hire_date":"11/17/1981", "sal":"5000", "deptno":"10" } ]
Solution
Step 1: Set Up
We will use the given sample data in the code. You can download the data and keep at any location. In my case, I have kept the file at ‘/home/bdp/data/employees_multiLine.json’.
Step 2: Write Code
First load the data into a data frame.
// Load json data: scala> val data = sqlContext.read.json(sc.wholeTextFiles("file:///home/bdp/data/employees_multiLine.json").values) data.printSchema scala> data.show
Add New Column in dataframe:
scala> val ingestedDate = java.time.LocalDate.now scala> val jsonDfWithDate = data.withColumn("inegstedDate", lit(ingestedDate.toString()))
lit: Used to cast into literal value
Here, we have added a new column in data frame with a value.
Drop Column in DataFrame
scala> val jsonDfWithoutManager = jsonDfWithDate.drop("manager") scala> jsonDfWithoutManager.printSchema scala> jsonDfWithoutManager.show
Rename column in DataFrame
scala> val jsonWithRename = jsonDfWithoutManager.withColumnRenamed("ingestedDate", "ingestedDateTime") scala> jsonWithRename.printSchema scala> jsonWithRename.show
You can download the code from the below link:
Wrapping Up
In this post, we have learned to add, drop and rename an existing column in the spark data frame. This is quite a common task we do whenever process the data using spark data frame.