Transpose Data in Spark DataFrame using PySpark

Requirement

Let’s take a scenario where we have already loaded data into an RDD/Dataframe. We got the rows data into columns and columns data into rows. The requirement is to transpose the data i.e. change rows into columns and columns into rows.

Sample Data

We will use below sample data. You can also download the file attached below:

sample_data
 
  1. empno,ename,designation,manager,hire_date,sal,deptno
  2. 7369,SMITH,CLERK,9902,2010-12-17,800.00,20
  3. 7499,ALLEN,SALESMAN,9698,2011-02-20,1600.00,30
  4. 7521,WARD,SALESMAN,9698,2011-02-22,1250.00,30
  5. 9566,JONES,MANAGER,9839,2011-04-02,2975.00,20
  6. 7654,MARTIN,SALESMAN,9698,2011-09-28,1250.00,30
  7. 9698,BLAKE,MANAGER,9839,2011-05-01,2850.00,30
  8. 9782,CLARK,MANAGER,9839,2011-06-09,2450.00,10
  9. 9788,SCOTT,ANALYST,9566,2012-12-09,3000.00,20
  10. 9839,KING,PRESIDENT,NULL,2011-11-17,5000.00,10
  11. 7844,TURNER,SALESMAN,9698,2011-09-08,1500.00,30
  12. 7876,ADAMS,CLERK,9788,2012-01-12,1100.00,20
  13. 7900,JAMES,CLERK,9698,2011-12-03,950.00,30
  14. 9902,FORD,ANALYST,9566,2011-12-03,3000.00,20
  15. 7934,MILLER,CLERK,9782,2012-01-23,1300.00,10
emp_data

Components Involved

  • PySpark
  • DataFrame

Solution

We are having data in text format. Let’s first load it into an RDD:

Step 1: Load data

First, open the pyspark to load data into an RDD.

 
 
  1. empRDD = sc.textFile("file:////root/bdp/spark/data/emp_data.txt")

Here, my source file is located in local path under /root/bdp/data and sc is Spark Context which has already been created while opening PySpark.

Creating a Header RDD with below columns:

 
 
  1. headerList = ['empno', 'ename', 'designation', 'manager', 'hire_date', 'sal', 'deptno']

Let’s check the RDD values using below command:

empRDD.take(10)

 
 
  1. [u'empno:7369,7499,7521,9566,7654,9698,9782,9788,9839,7844,7876,7900,9902,7934', u'ename:SMITH,ALLEN,WARD,JONES,MARTIN,BLAKE,CLARK,SCOTT,KING,TURNER,ADAMS,JAMES,FORD,MILLER', u'designation:CLEARK,SALESMAN,SALESMAN,MANAGER,SALESMAN,MANAGER,MANAGER,ANALYST,PRESIDENT,SALESMAN,CLEARK,CLEARK,ANALYST,CLEARK', u'manager:9902,9698,9698,9839,9698,9839,9839,9566,NULL,9698,9788,9698,9566,9782', u'hire_date:2010-12-17,2011-02-20,2011-02-20,2011-04-02,2011-09-28,2011-05-01,2011-06-09,2012-12-09,2011-11-17,2011-09-08,2012-01-12,2011-12-03,2011-12-03,2012-01-23', u'sal:800.00,1600.00,1250.00,2975.00,1250.00,2850.00,2450.00,3000.00,5000.00,1500.00,1100.00,950.00,3000.00,1300.00', u'deptno:20,30,30,20,30,30,10,20,10,30,20,30,20,10']

Step2: Transpose

Split the data available in rdd empRDD.

 
 
  1. empMapRDD = empRDD.map(lambda line : (line.split(':')[0], line.split(':')[1].split(',')))

empMapRDD.take(10)

 
 
  1. [(u'empno', [u'7369', u'7499', u'7521', u'9566', u'7654', u'9698', u'9782', u'9788', u'9839', u'7844', u'7876', u'7900', u'9902', u'7934']), (u'ename', [u'SMITH', u'ALLEN', u'WARD', u'JONES', u'MARTIN', u'BLAKE', u'CLARK', u'SCOTT', u'KING', u'TURNER', u'ADAMS', u'JAMES', u'FORD', u'MILLER']), (u'designation', [u'CLEARK', u'SALESMAN', u'SALESMAN', u'MANAGER', u'SALESMAN', u'MANAGER', u'MANAGER', u'ANALYST', u'PRESIDENT', u'SALESMAN', u'CLEARK', u'CLEARK', u'ANALYST', u'CLEARK']), (u'manager', [u'9902', u'9698', u'9698', u'9839', u'9698', u'9839', u'9839', u'9566', u'NULL', u'9698', u'9788', u'9698', u'9566', u'9782']), (u'hire_date', [u'2010-12-17', u'2011-02-20', u'2011-02-20', u'2011-04-02', u'2011-09-28', u'2011-05-01', u'2011-06-09', u'2012-12-09', u'2011-11-17', u'2011-09-08', u'2012-01-12', u'2011-12-03', u'2011-12-03', u'2012-01-23']), (u'sal', [u'800.00', u'1600.00', u'1250.00', u'2975.00', u'1250.00', u'2850.00', u'2450.00', u'3000.00', u'5000.00', u'1500.00', u'1100.00', u'950.00', u'3000.00', u'1300.00']), (u'deptno', [u'20', u'30', u'30', u'20', u'30', u'30', u'10', u'20', u'10', u'30', u'20', u'30', u'20', u'10'])]

Let’s see this values in a structural way:

 
 
  1. empMapRDD.toDF().show()

 
 
  1. empMapRDDT1 = empMapRDD.values() \
  2.                  .zipWithIndex() \
  3.                  .flatMap(lambda (x, i) : [(i, j, e) for j, e in enumerate(x)])

In this step, we are converting index of the value to index of a row, index of column and value.

Command used:

Values(): It will take map key’s value i.e. will take [u’7369′, u’7499′, u’7521′, u’9566′, u’7654′, u’9698′, u’9782′, u’9788′, u’9839′, u’7844′, u’7876′, u’7900′, u’9902′, u’7934′] from (u’empno’, [u’7369′, u’7499′, u’7521′, u’9566′, u’7654′, u’9698′, u’9782′, u’9788′, u’9839′, u’7844′, u’7876′, u’7900′, u’9902′, u’7934′])

ZipWithIndex(): It will set each row with an index value

flatMap(): It will transform each value

 
 
  1. empMapRDDT1.toDF().show()

 
 
  1. empMapRDDT2 = empMapRDDT1.map(lambda (i, j, x) : (j, (i, x))).groupByKey()
  2. empMapRDDT3 = empMapRDDT2.map(lambda (i, x) : sorted(list(x), cmp=lambda (i1,e1),(i2,e2) : cmp(i1, i2)))
  3. empMapRDDT4 = empMapRDDT3.map(lambda x : map(lambda (i, e) : e, x))

These 3 steps are grouping the values based on the key(s), sorting and keeping the value at the appropriate index position.

If we check the last RDD value in DF with the header,

 
 
  1. empMapRDDT4.toDF(headerList).show()

Wrapping Up

In this post, we have seen transposing of data in a data frame. Sometimes we do get data in such a way where we would like to transpose the data after loading into Dataframe. So, in that scenario, we can use this. We have also understood some functions like map, flatMap, values, zipWithIndex, sorted etc.

Load CSV file into hive AVRO table

Requirement You have comma separated(CSV) file and you want to create Avro table in hive on top of it, then ...
Read More

Load CSV file into hive PARQUET table

Requirement You have comma separated(CSV) file and you want to create Parquet table in hive on top of it, then ...
Read More

Hive Most Asked Interview Questions With Answers – Part II

What is bucketing and what is the use of it? Answer: Bucket is an optimisation technique which is used to ...
Read More
/ hive, hive interview, interview-qa

Spark Interview Questions Part-1

Suppose you have a spark dataframe which contains millions of records. You need to perform multiple actions on it. How ...
Read More

1 Comment

  1. Rodrigo Soto

    Muy buen post!!, pero me ocurrio un error…
    soy capaz de leer el archivo y mostrar el RDD pero no me muestra de la misma forma que en el post al momento de hacer el take(10), habra alguna forma de obtener el scripts? creo que me falto una etapa-
    saludos

Leave a Reply