Find max value in Spark RDD using Scala

Requirement

Suppose we are having a source file, which contains basic information about Employees like employee number, employee name, designation, salary etc. The requirement is to find max value in spark RDD using Scala. With this requirement, we will find out the maximum salary, the second maximum salary of an employee.

Components Involved

  • Spark RDD
  • Scala
  • IntelliJ
  • SBT

Sample Data

Below is the sample data for this requirement. The source file is in text format with a comma (‘,’) separated.

empno,ename,designation,manager,hire_date,sal,deptno
7369,SMITH,CLERK,7902,1980-12-17,800.00,20
7499,ALLEN,SALESMAN,7698,1981-02-20,1600.00,30
7521,WARD,SALESMAN,7698,1981-02-22,1250.00,30
7566,JONES,MANAGER,7839,1981-04-02,2975.00,20
7654,MARTIN,SALESMAN,7698,1981-09-28,1250.00,30
7698,BLAKE,MANAGER,7839,1981-05-01,2850.00,30
7782,CLARK,MANAGER,7839,1981-06-09,2450.00,10
7788,SCOTT,ANALYST,7566,1982-12-09,3000.00,20
7839,KING,PRESIDENT,NULL,1981-11-17,5000.00,10
7844,TURNER,SALESMAN,7698,1981-09-08,1500.00,30
7876,ADAMS,CLERK,7788,1983-01-12,1100.00,20
7900,JAMES,CLERK,7698,1981-12-03,950.00,30
7902,FORD,ANALYST,7566,1981-12-03,3000.00,20
7934,MILLER,CLERK,7782,1982-01-23,1300.00,10

Download the data from here:

emp_data

Solution

We will create a spark application with the MaxValueInSpark using IntelliJ and SBT. If you want to setup IntelliJ on your system, then you can check this post.

Step 1: Create Spark Application

First of all, open IntelliJ. Once it opened, Go to File -> New -> Project -> Choose SBT

Click next and provide all the details like Project name and choose scala version. In my case, I have given project name MaxValueInSpark and have selected 2.10.4 as scala version.

Step 2: Resolve Dependency

Adding below dependency:

libraryDependencies++=Seq(
"org.apache.spark"%"spark-core_2.10"%"1.6.0",
"org.apache.spark"%"spark-sql_2.10"%"1.6.0"
)

It will download all the required packages.

Step 3: Write Code

In this step, we will write the code to get the maximum salary, minimum salary, second maximum salary, second minimum salary, salary with the employee name. Let’s start with loading dataset in RDD.

valemp_data=sc.textFile("src\\main\\resources\\emp_data.txt")

In this line of code, we are loading employee data which is available at local path src\\main\\resources. It will return an RDD.

Once data has been loaded, read the first record of the file using function first

valemp_header=emp_data.first()
println(emp_header)

 

first() – Return the first record.

The first record might be the header of the file. So let’s remove it from out RDD.

 valemp_data_without_header=emp_data.filter(line=>!line.equals(emp_header))

Now, we have another RDD which is without the header. Let’s check how many partitions have been created.

 println("No.ofpartition="+emp_data_without_header.partitions.size)

 

partitions – It will return the number of partitions of a RDD.

Here, I have used partitions function which is a predefined function which returns a number of partitions of RDD.
In order to find max salary, we are going to use two different approaches. One approach will use the max function which will give us max value.

 valemp_salary_list=emp_data_without_header.map{x=>x.split(',')}.map{x=>(x(5).toDouble)}
println("Highestsalaty:"+emp_salary_list.max())

Here, at first, I have split the record and taken the only salary from the RDD. On the list of salary data, I have used max function which returned the max value among them.
The second approach to sortBy function.

 valmax_salary=emp_salary_list.distinct.sortBy(x=>x.toDouble,false,1)
print(max_salary.take(1).foreach(println))

 

sortBy : This function takes three argument (value, ascending = true, number of partition). By default, sort happens in ascending order.

Here, at first, I have taken only distinct value from the salary. After that sorted the records in descending order and taken the 1st record which is the max value. As we were having 2 partitions, if we don’t provide partition 1, then it will sort in each partition.
Similarly, if we make sorting true instead of false, It will return minimum salary.

 valmin_salary=emp_salary_list.distinct.sortBy(x=>x.toDouble,true,1)
print(min_salary.take(1).foreach(println))

Below code snippet give second highest salary.

 valsecond_highest_salary=max_salary.zipWithIndex().filter(index=>index._2==1)
print("Secondhighestsalary=",second_highest_salary.foreach(println))

 

zipWithIndex() : It will zip the RDD with indices. It will provide an index for every record.

Here, we are using zipWithIndex() function which will zip our RDD with index and then we are taking the second record using a filter.

Next, we will find out an employee who has the highest salary.

 valsalaryWithEmployeeName=emp_data_without_header.map{x=>x.split(',')}.map{x=>(x(5).toDouble,x(1))}
valmaxSalaryEmployee=salaryWithEmployeeName.groupByKey.takeOrdered(1)(Ordering[Double].reverse.on(_._1))
print(maxSalaryEmployee.foreach(println))

 

GroupByKey – Return a collection of value for the same key.

TakeOrdered – Return the first N record on the basis of mentioned order.

Here, we have first created a key value with the values of salary and employee name and then have taken the first record by applying for the descending order of the salary.
Once all the code was done, let’s bind together and the code will look like:

 import org.apache.spatrk.{SparkConf, SparkContext}

object MaxSalary {
  def main(args: Array[String]): Unit = {
    // Setup configuration and create spark context
    val conf = new SparkConf().setAppName("Emp Dept Assignment").setMaster("local[*]")
    val sc = new SparkContext(conf)

    // Load the source data file into RDD
    val emp_data = sc.textFile("src\\main\\resources\\emp_data.txt")
    println(emp_data.foreach(println))

    // Find first record of the file
    val emp_header = emp_data.first()
    println(emp_header)

    // Remove header from RDD
    val emp_data_without_header = emp_data.filter(line => !line.equals(emp_header))
    println(emp_data_without_header.foreach(println))

    // Get no. of partition of an RDD
    println("No. of partition = " + emp_data_without_header.partitions.size)

    // Find max salary of an employe 1st Other approach
    // Using max function on RDD
    val emp_salary_list = emp_data_without_header.map{x => x.split(',')}.map{x => (x(5).toDouble)}
    println("Highest salaty:"+ emp_salary_list.max())

    // Using sortBy function
    val max_salary = emp_salary_list.distinct.sortBy(x => x.toDouble, false, 1)
    print(max_salary.take(1).foreach(println))

    // Find minimum salary
    val min_salary = emp_salary_list.distinct.sortBy(x => x.toDouble, true, 1)
    print(min_salary.take(1).foreach(println))

    // Find Second highest salary
    val second_highest_salary = max_salary.zipWithIndex().filter(index => index._2 == 1).map(_._1)
    print(second_highest_salary.foreach(println))

    // Find second min salary
    val second_min_salary = min_salary.zipWithIndex().filter(index => index._2 == 1).map(_._1)
    print(second_min_salary.foreach(println))

    // Employee who have max salary
    val salaryWithEmployeeName = emp_data_without_header.map{x => x.split(',')}.map{x => (x(5).toDouble, x(1))}
    val maxSalaryEmployee = salaryWithEmployeeName.groupByKey.takeOrdered(1)(Ordering[Double].reverse.on(_._1))
    print(maxSalaryEmployee.foreach(println))
  }
}

Step 4: Execute the application

Once code part is done, let’s execute the application. In order to execute the application right click on the script and choose Run

Step 5: Output

You will able to see the output of the application in the console of the IntelliJ.

Wrapping Up

In this post, we have created a spark application with Scala to find out the maximum salary, second maximum salary, minimum salary, second minimum salary from an employee data set. We have seen multiple ways to find out the max and min salary. We have executed in local and validated the output. If you want to download the application then you can download from here. If you don’t have an IntelliJ IDE on your local system, then you try on spark shell with the same code.

Sharing is caring!

Subscribe to our newsletter
Loading

Leave a Reply