Transpose Data in Spark DataFrame using PySpark

Transpose Data in Spark DataFrame using PySpark

Requirement

Let’s take a scenario where we have already loaded data into an RDD/Dataframe. We got the rows data into columns and columns data into rows. The requirement is to transpose the data i.e. change rows into columns and columns into rows.

Sample Data

We will use below sample data. You can also download the file attached below:

sample_data
 
  1. empno,ename,designation,manager,hire_date,sal,deptno
  2. 7369,SMITH,CLERK,9902,2010-12-17,800.00,20
  3. 7499,ALLEN,SALESMAN,9698,2011-02-20,1600.00,30
  4. 7521,WARD,SALESMAN,9698,2011-02-22,1250.00,30
  5. 9566,JONES,MANAGER,9839,2011-04-02,2975.00,20
  6. 7654,MARTIN,SALESMAN,9698,2011-09-28,1250.00,30
  7. 9698,BLAKE,MANAGER,9839,2011-05-01,2850.00,30
  8. 9782,CLARK,MANAGER,9839,2011-06-09,2450.00,10
  9. 9788,SCOTT,ANALYST,9566,2012-12-09,3000.00,20
  10. 9839,KING,PRESIDENT,NULL,2011-11-17,5000.00,10
  11. 7844,TURNER,SALESMAN,9698,2011-09-08,1500.00,30
  12. 7876,ADAMS,CLERK,9788,2012-01-12,1100.00,20
  13. 7900,JAMES,CLERK,9698,2011-12-03,950.00,30
  14. 9902,FORD,ANALYST,9566,2011-12-03,3000.00,20
  15. 7934,MILLER,CLERK,9782,2012-01-23,1300.00,10
emp_data

Components Involved

  • PySpark
  • DataFrame

Solution

We are having data in text format. Let’s first load it into an RDD:

Step 1: Load data

First, open the pyspark to load data into an RDD.

 
 
empRDD = sc.textFile("file:////root/bdp/spark/data/emp_data.txt")

Here, my source file is located in local path under /root/bdp/data and sc is Spark Context which has already been created while opening PySpark.

Creating a Header RDD with below columns:

 
 
  1. headerList = ['empno', 'ename', 'designation', 'manager', 'hire_date', 'sal', 'deptno']

Let’s check the RDD values using below command:

empRDD.take(10)

 
 
  1. [u'empno:7369,7499,7521,9566,7654,9698,9782,9788,9839,7844,7876,7900,9902,7934', u'ename:SMITH,ALLEN,WARD,JONES,MARTIN,BLAKE,CLARK,SCOTT,KING,TURNER,ADAMS,JAMES,FORD,MILLER', u'designation:CLEARK,SALESMAN,SALESMAN,MANAGER,SALESMAN,MANAGER,MANAGER,ANALYST,PRESIDENT,SALESMAN,CLEARK,CLEARK,ANALYST,CLEARK', u'manager:9902,9698,9698,9839,9698,9839,9839,9566,NULL,9698,9788,9698,9566,9782', u'hire_date:2010-12-17,2011-02-20,2011-02-20,2011-04-02,2011-09-28,2011-05-01,2011-06-09,2012-12-09,2011-11-17,2011-09-08,2012-01-12,2011-12-03,2011-12-03,2012-01-23', u'sal:800.00,1600.00,1250.00,2975.00,1250.00,2850.00,2450.00,3000.00,5000.00,1500.00,1100.00,950.00,3000.00,1300.00', u'deptno:20,30,30,20,30,30,10,20,10,30,20,30,20,10']

Step2: Transpose

Split the data available in rdd empRDD.

 
 
empMapRDD = empRDD.map(lambda line : (line.split(':')[0], line.split(':')[1].split(',')))

empMapRDD.take(10)

 
 
  1. [(u'empno', [u'7369', u'7499', u'7521', u'9566', u'7654', u'9698', u'9782', u'9788', u'9839', u'7844', u'7876', u'7900', u'9902', u'7934']), (u'ename', [u'SMITH', u'ALLEN', u'WARD', u'JONES', u'MARTIN', u'BLAKE', u'CLARK', u'SCOTT', u'KING', u'TURNER', u'ADAMS', u'JAMES', u'FORD', u'MILLER']), (u'designation', [u'CLEARK', u'SALESMAN', u'SALESMAN', u'MANAGER', u'SALESMAN', u'MANAGER', u'MANAGER', u'ANALYST', u'PRESIDENT', u'SALESMAN', u'CLEARK', u'CLEARK', u'ANALYST', u'CLEARK']), (u'manager', [u'9902', u'9698', u'9698', u'9839', u'9698', u'9839', u'9839', u'9566', u'NULL', u'9698', u'9788', u'9698', u'9566', u'9782']), (u'hire_date', [u'2010-12-17', u'2011-02-20', u'2011-02-20', u'2011-04-02', u'2011-09-28', u'2011-05-01', u'2011-06-09', u'2012-12-09', u'2011-11-17', u'2011-09-08', u'2012-01-12', u'2011-12-03', u'2011-12-03', u'2012-01-23']), (u'sal', [u'800.00', u'1600.00', u'1250.00', u'2975.00', u'1250.00', u'2850.00', u'2450.00', u'3000.00', u'5000.00', u'1500.00', u'1100.00', u'950.00', u'3000.00', u'1300.00']), (u'deptno', [u'20', u'30', u'30', u'20', u'30', u'30', u'10', u'20', u'10', u'30', u'20', u'30', u'20', u'10'])]

Let’s see this values in a structural way:

 
 
empMapRDD.toDF().show()

 
 
empMapRDDT1 = empMapRDD.values() \
                 .zipWithIndex() \
                 .flatMap(lambda (x, i) : [(i, j, e) for j, e in enumerate(x)])

In this step, we are converting index of the value to index of a row, index of column and value.

Command used:

Values(): It will take map key’s value i.e. will take [u’7369′, u’7499′, u’7521′, u’9566′, u’7654′, u’9698′, u’9782′, u’9788′, u’9839′, u’7844′, u’7876′, u’7900′, u’9902′, u’7934′] from (u’empno’, [u’7369′, u’7499′, u’7521′, u’9566′, u’7654′, u’9698′, u’9782′, u’9788′, u’9839′, u’7844′, u’7876′, u’7900′, u’9902′, u’7934′])

ZipWithIndex(): It will set each row with an index value

flatMap(): It will transform each value

 
 
empMapRDDT1.toDF().show()

 
 
empMapRDDT2 = empMapRDDT1.map(lambda (i, j, x) : (j, (i, x))).groupByKey()
empMapRDDT3 = empMapRDDT2.map(lambda (i, x) : sorted(list(x), cmp=lambda (i1,e1),(i2,e2) : cmp(i1, i2)))
empMapRDDT4 = empMapRDDT3.map(lambda x : map(lambda (i, e) : e, x))

These 3 steps are grouping the values based on the key(s), sorting and keeping the value at the appropriate index position.

If we check the last RDD value in DF with the header,

 
 
empMapRDDT4.toDF(headerList).show()

Wrapping Up

In this post, we have seen transposing of data in a data frame. Sometimes we do get data in such a way where we would like to transpose the data after loading into Dataframe. So, in that scenario, we can use this. We have also understood some functions like map, flatMap, values, zipWithIndex, sorted etc.

1
0

Join in hive with example

Requirement You have two table named as A and B. and you want to perform all types of join in ...
Read More

Join in pyspark with example

Requirement You have two table named as A and B. and you want to perform all types of join in ...
Read More

Join in spark using scala with example

Requirement You have two table named as A and B. and you want to perform all types of join in ...
Read More

Java UDF to convert String to date in PIG

About Code Many times it happens like you have received data from many systems and each system operates on a ...
Read More
/ java udf, Pig, pig, pig udf, string to date, udf

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.