Transpose Data in Spark DataFrame using PySpark

Requirement

Let’s take a scenario where we have already loaded data into an RDD/Dataframe. We got the rows data into columns and columns data into rows. The requirement is to transpose the data i.e. change rows into columns and columns into rows.

Sample Data

We will use below sample data. You can also download the file attached below:

 empno,ename,designation,manager,hire_date,sal,deptno
7369,SMITH,CLERK,9902,2010-12-17,800.00,20
7499,ALLEN,SALESMAN,9698,2011-02-20,1600.00,30
7521,WARD,SALESMAN,9698,2011-02-22,1250.00,30
9566,JONES,MANAGER,9839,2011-04-02,2975.00,20
7654,MARTIN,SALESMAN,9698,2011-09-28,1250.00,30
9698,BLAKE,MANAGER,9839,2011-05-01,2850.00,30
9782,CLARK,MANAGER,9839,2011-06-09,2450.00,10
9788,SCOTT,ANALYST,9566,2012-12-09,3000.00,20
9839,KING,PRESIDENT,NULL,2011-11-17,5000.00,10
7844,TURNER,SALESMAN,9698,2011-09-08,1500.00,30
7876,ADAMS,CLERK,9788,2012-01-12,1100.00,20
7900,JAMES,CLERK,9698,2011-12-03,950.00,30
9902,FORD,ANALYST,9566,2011-12-03,3000.00,20
7934,MILLER,CLERK,9782,2012-01-23,1300.00,10
emp_data

Components Involved

  • PySpark
  • DataFrame

Solution

We are having data in text format. Let’s first load it into an RDD:

Step 1: Load data

First, open the pyspark to load data into an RDD.

 empRDD = sc.textFile("file:////root/bdp/spark/data/emp_data.txt")

Here, my source file is located in local path under /root/bdp/data and sc is Spark Context which has already been created while opening PySpark.

Creating a Header RDD with below columns:

 headerList = ['empno', 'ename', 'designation', 'manager', 'hire_date', 'sal', 'deptno']

Let’s check the RDD values using below command:

empRDD.take(10)

 [u'empno:7369,7499,7521,9566,7654,9698,9782,9788,9839,7844,7876,7900,9902,7934', u'ename:SMITH,ALLEN,WARD,JONES,MARTIN,BLAKE,CLARK,SCOTT,KING,TURNER,ADAMS,JAMES,FORD,MILLER', u'designation:CLEARK,SALESMAN,SALESMAN,MANAGER,SALESMAN,MANAGER,MANAGER,ANALYST,PRESIDENT,SALESMAN,CLEARK,CLEARK,ANALYST,CLEARK', u'manager:9902,9698,9698,9839,9698,9839,9839,9566,NULL,9698,9788,9698,9566,9782', u'hire_date:2010-12-17,2011-02-20,2011-02-20,2011-04-02,2011-09-28,2011-05-01,2011-06-09,2012-12-09,2011-11-17,2011-09-08,2012-01-12,2011-12-03,2011-12-03,2012-01-23', u'sal:800.00,1600.00,1250.00,2975.00,1250.00,2850.00,2450.00,3000.00,5000.00,1500.00,1100.00,950.00,3000.00,1300.00', u'deptno:20,30,30,20,30,30,10,20,10,30,20,30,20,10']

Step2: Transpose

Split the data available in rdd empRDD.

 empMapRDD = empRDD.map(lambda line : (line.split(':')[0], line.split(':')[1].split(',')))

empMapRDD.take(10)

 [(u'empno', [u'7369', u'7499', u'7521', u'9566', u'7654', u'9698', u'9782', u'9788', u'9839', u'7844', u'7876', u'7900', u'9902', u'7934']), (u'ename', [u'SMITH', u'ALLEN', u'WARD', u'JONES', u'MARTIN', u'BLAKE', u'CLARK', u'SCOTT', u'KING', u'TURNER', u'ADAMS', u'JAMES', u'FORD', u'MILLER']), (u'designation', [u'CLEARK', u'SALESMAN', u'SALESMAN', u'MANAGER', u'SALESMAN', u'MANAGER', u'MANAGER', u'ANALYST', u'PRESIDENT', u'SALESMAN', u'CLEARK', u'CLEARK', u'ANALYST', u'CLEARK']), (u'manager', [u'9902', u'9698', u'9698', u'9839', u'9698', u'9839', u'9839', u'9566', u'NULL', u'9698', u'9788', u'9698', u'9566', u'9782']), (u'hire_date', [u'2010-12-17', u'2011-02-20', u'2011-02-20', u'2011-04-02', u'2011-09-28', u'2011-05-01', u'2011-06-09', u'2012-12-09', u'2011-11-17', u'2011-09-08', u'2012-01-12', u'2011-12-03', u'2011-12-03', u'2012-01-23']), (u'sal', [u'800.00', u'1600.00', u'1250.00', u'2975.00', u'1250.00', u'2850.00', u'2450.00', u'3000.00', u'5000.00', u'1500.00', u'1100.00', u'950.00', u'3000.00', u'1300.00']), (u'deptno', [u'20', u'30', u'30', u'20', u'30', u'30', u'10', u'20', u'10', u'30', u'20', u'30', u'20', u'10'])]

Let’s see this values in a structural way:

 empMapRDD.toDF().show()

 empMapRDDT1 = empMapRDD.values() \
                 .zipWithIndex() \
                 .flatMap(lambda (x, i) : [(i, j, e) for j, e in enumerate(x)])

In this step, we are converting index of the value to index of a row, index of column and value.

Command used:

Values(): It will take map key’s value i.e. will take [u’7369′, u’7499′, u’7521′, u’9566′, u’7654′, u’9698′, u’9782′, u’9788′, u’9839′, u’7844′, u’7876′, u’7900′, u’9902′, u’7934′] from (u’empno’, [u’7369′, u’7499′, u’7521′, u’9566′, u’7654′, u’9698′, u’9782′, u’9788′, u’9839′, u’7844′, u’7876′, u’7900′, u’9902′, u’7934′])

ZipWithIndex(): It will set each row with an index value

flatMap(): It will transform each value

 empMapRDDT1.toDF().show()

 empMapRDDT2 = empMapRDDT1.map(lambda (i, j, x) : (j, (i, x))).groupByKey()
empMapRDDT3 = empMapRDDT2.map(lambda (i, x) : sorted(list(x), cmp=lambda (i1,e1),(i2,e2) : cmp(i1, i2)))

empMapRDDT4 = empMapRDDT3.map(lambda x : map(lambda (i, e) : e, x))

These 3 steps are grouping the values based on the key(s), sorting and keeping the value at the appropriate index position.

If we check the last RDD value in DF with the header,

 empMapRDDT4.toDF(headerList).show()

Wrapping Up

In this post, we have seen transposing of data in a data frame. Sometimes we do get data in such a way where we would like to transpose the data after loading into Dataframe. So, in that scenario, we can use this. We have also understood some functions like map, flatMap, values, zipWithIndex, sorted etc.

Sharing is caring!

Subscribe to our newsletter
Loading

1 Comment

  1. rodrigo soto

    Muy buen post!!, pero me ocurrio un error…
    soy capaz de leer el archivo y mostrar el RDD pero no me muestra de la misma forma que en el post al momento de hacer el take(10), habra alguna forma de obtener el scripts? creo que me falto una etapa-
    saludos

Leave a Reply