Requirement

The UDF is a user-defined function. As its name indicate, a user can create a custom function and used it wherever required. We do create UDF when the existing build-in functions not available or not able to fulfill the requirement.

Sample Data

empno

ename

designation

manager

hire_date

sal

deptno

location

9369

SMITH

CLERK

7902

12/17/1980

800

20

BANGALORE

9499

ALLEN

SALESMAN

7698

2/20/1981

1600

30

HYDERABAD

9521

WARD

SALESMAN

7698

2/22/1981

1250

30

PUNE

9566

TURNER

MANAGER

7839

4/2/1981

2975

20

MUMBAI

9654

MARTIN

SALESMAN

7698

9/28/1981

1250

30

CHENNAI

9369

SMITH

CLERK

7902

12/17/1980

800

20

KOLKATA

Solution

We will create a UDF for converting a string value into an MD5 hash value. We will see 2 approaches – with or without registering UDF.

Approach 1: With UDF Register

Step 1: Create a UDF

import java.security.MessageDigest
val md5Value=(input:String)=>{
      MessageDigest.getInstance("MD5").digest(input.getBytes)
}

Step 2: Register UDF as a Function

 spark.udf.register("getMd5Value",md5Value)

Step 3: Use created UDF function

val empDf=spark.read.option("header","true").csv("./src/main/resources/emp_data1.csv")
empDf.createOrReplaceTempView("empTbl")
val udfCall = spark.sql("select ename,getMd5Value(ename) as md5 from empTbl")
udfCall.show

Approach 2: Without UDF Register

val md5ValueFunc=udf((input:String)=>{
   MessageDigest.getInstance("MD5").digest(input.getBytes)
},ArrayType(ByteType))

val udfWithoutReg=empDf.select(col("ename"),md5ValueFunc(col("ename")))
udfWithoutReg.show(false)

Full Code

import java.security.MessageDigest
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

object UDFInSpark {
  def main(args: Array[String]): Unit = {
    // To handle ERROR SparkContext: Error initializing SparkContext.
    val config = new SparkConf().setAppName("UDF in Spark")
    config.set("spark.driver.allowMultipleContexts", "true")

    Logger.getLogger("org").setLevel(Level.OFF)
    val spark = SparkSession.builder().appName("UDF in Spark")
      .config("spark.master", "local")
      .getOrCreate()

    // Create a UDF
    val md5Value = (input: String) => {
      MessageDigest.getInstance("MD5").digest(input.getBytes)
    }

    // Register UDF to use
     spark.udf.register("getMd5Value", md5Value)

    val md5ValueFunc = udf((input: String) => {
      MessageDigest.getInstance("MD5").digest(input.getBytes)
    }, ArrayType(ByteType))

    try {
      val empDf = spark.read.option("header", "true").csv("./src/main/resources/emp_data1.csv")
      empDf.createOrReplaceTempView("empTbl")
      
      val udfInQuery = spark.sql("select ename, md5Value(ename) as md5 from empTbl")
      udfInQuery.show(false)

      // Using UDF without registering
      val udfWithoutReg = empDf.select(col("ename"), md5ValueFunc(col("ename")))
      udfWithoutReg.show(false)
    } catch {
      case e: Exception => {
        println(e.printStackTrace())
      }
    }
  }
}

Wrapping Up

In this post, we have learned to create a UDF in spark and use it. It requires some additional steps like code, register, and then use it. We have also seen 2 different approaches to using UDF in spark.

Sharing is caring!

Subscribe to our newsletter
Loading

Leave a Reply