Requirement
The UDF is a user-defined function. As its name indicate, a user can create a custom function and used it wherever required. We do create UDF when the existing build-in functions not available or not able to fulfill the requirement.
Sample Data
empno | ename | designation | manager | hire_date | sal | deptno | location |
9369 | SMITH | CLERK | 7902 | 12/17/1980 | 800 | 20 | BANGALORE |
9499 | ALLEN | SALESMAN | 7698 | 2/20/1981 | 1600 | 30 | HYDERABAD |
9521 | WARD | SALESMAN | 7698 | 2/22/1981 | 1250 | 30 | PUNE |
9566 | TURNER | MANAGER | 7839 | 4/2/1981 | 2975 | 20 | MUMBAI |
9654 | MARTIN | SALESMAN | 7698 | 9/28/1981 | 1250 | 30 | CHENNAI |
9369 | SMITH | CLERK | 7902 | 12/17/1980 | 800 | 20 | KOLKATA |
Solution
We will create a UDF for converting a string value into an MD5 hash value. We will see 2 approaches – with or without registering UDF.
Approach 1: With UDF Register
Step 1: Create a UDF
import java.security.MessageDigest val md5Value=(input:String)=>{ MessageDigest.getInstance("MD5").digest(input.getBytes) }
Step 2: Register UDF as a Function
spark.udf.register("getMd5Value",md5Value)
Step 3: Use created UDF function
val empDf=spark.read.option("header","true").csv("./src/main/resources/emp_data1.csv") empDf.createOrReplaceTempView("empTbl") val udfCall = spark.sql("select ename,getMd5Value(ename) as md5 from empTbl") udfCall.show
Approach 2: Without UDF Register
val md5ValueFunc=udf((input:String)=>{ MessageDigest.getInstance("MD5").digest(input.getBytes) },ArrayType(ByteType)) val udfWithoutReg=empDf.select(col("ename"),md5ValueFunc(col("ename"))) udfWithoutReg.show(false)
Full Code
import java.security.MessageDigest import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ object UDFInSpark { def main(args: Array[String]): Unit = { // To handle ERROR SparkContext: Error initializing SparkContext. val config = new SparkConf().setAppName("UDF in Spark") config.set("spark.driver.allowMultipleContexts", "true") Logger.getLogger("org").setLevel(Level.OFF) val spark = SparkSession.builder().appName("UDF in Spark") .config("spark.master", "local") .getOrCreate() // Create a UDF val md5Value = (input: String) => { MessageDigest.getInstance("MD5").digest(input.getBytes) } // Register UDF to use spark.udf.register("getMd5Value", md5Value) val md5ValueFunc = udf((input: String) => { MessageDigest.getInstance("MD5").digest(input.getBytes) }, ArrayType(ByteType)) try { val empDf = spark.read.option("header", "true").csv("./src/main/resources/emp_data1.csv") empDf.createOrReplaceTempView("empTbl") val udfInQuery = spark.sql("select ename, md5Value(ename) as md5 from empTbl") udfInQuery.show(false) // Using UDF without registering val udfWithoutReg = empDf.select(col("ename"), md5ValueFunc(col("ename"))) udfWithoutReg.show(false) } catch { case e: Exception => { println(e.printStackTrace()) } } } }
Wrapping Up
In this post, we have learned to create a UDF in spark and use it. It requires some additional steps like code, register, and then use it. We have also seen 2 different approaches to using UDF in spark.