How to add new column in Spark Dataframe

Requirement

When we ingest data from source to Hadoop data lake, we used to add some additional columns with the existing data source. These columns basically help to validate and analyze the data. So, in this post, we will walk through how we can add some additional columns with the source data.

In addition to this, we will also check how to drop an existing column and rename the column in the spark data frame.

Sample Data

 [
  {
    "empno":"7369",
    "ename":"SMITH",
    "designation":"CLERK",
    "manager":"7902",
    "hire_date":"12/17/1980",
    "sal":"800",
    "deptno":"20"  
  },
  {
    "empno":"7499",
    "ename":"ALLEN",
    "designation":"SALESMAN",
    "manager":"7698",
    "hire_date":"2/20/1981",
    "sal":"1600",
    "deptno":"30"  
  },
  {
    "empno":"7521",
    "ename":"WARD",
    "designation":"SALESMAN",
    "manager":"7698",
    "hire_date":"2/22/1981",
    "sal":"1250",
    "deptno":"30"  
  },
  {
    "empno":"7566",
    "ename":"TURNER",
    "designation":"MANAGER",
    "manager":"7839",
    "hire_date":"4/2/1981",
    "sal":"2975",
    "deptno":"20"  
  },
  {
    "empno":"7654",
    "ename":"MARTIN",
    "designation":"SALESMAN",
    "manager":"7698",
    "hire_date":"9/28/1981",
    "sal":"1250",
    "deptno":"30"  
  },
  {
    "empno":"7698",
    "ename":"MILLER",
    "designation":"MANAGER",
    "manager":"7839",
    "hire_date":"5/1/1981",
    "sal":"2850",
    "deptno":"30"  
  },
  {
    "empno":"7782",
    "ename":"CLARK",
    "designation":"MANAGER",
    "manager":"7839",
    "hire_date":"6/9/1981",
    "sal":"2450",
    "deptno":"10"  
  },
  {
    "empno":"7788",
    "ename":"SCOTT",
    "designation":"ANALYST",
    "manager":"7566",
    "hire_date":"12/9/1982",
    "sal":"3000",
    "deptno":"20"  
  },
  {
    "empno":"7839",
    "ename":"KING",
    "designation":"PRESIDENT",
    "manager":"NULL",
    "hire_date":"11/17/1981",
    "sal":"5000",
    "deptno":"10"  
  }
]

Solution

Step 1: Set Up

We will use the given sample data in the code. You can download the data and keep at any location. In my case, I have kept the file at ‘/home/bdp/data/employees_multiLine.json’.

Step 2: Write Code

First load the data into a data frame.

 // Load json data:
scala> val data = sqlContext.read.json(sc.wholeTextFiles("file:///home/bdp/data/employees_multiLine.json").values)         
data.printSchema
scala> data.show

Add New Column in dataframe:

 scala> val ingestedDate = java.time.LocalDate.now
scala> val jsonDfWithDate = data.withColumn("inegstedDate", lit(ingestedDate.toString()))

lit: Used to cast into literal value

Here, we have added a new column in data frame with a value.

Drop Column in DataFrame

  scala> val jsonDfWithoutManager = jsonDfWithDate.drop("manager")
scala> jsonDfWithoutManager.printSchema
scala> jsonDfWithoutManager.show

Rename column in DataFrame

 scala> val jsonWithRename = jsonDfWithoutManager.withColumnRenamed("ingestedDate", "ingestedDateTime")
scala> jsonWithRename.printSchema
scala> jsonWithRename.show

You can download the code from the below link:

Wrapping Up

In this post, we have learned to add, drop and rename an existing column in the spark data frame. This is quite a common task we do whenever process the data using spark data frame.

Sharing is caring!

Subscribe to our newsletter
Loading

Leave a Reply