Work with user-defined functions

  1. สำรวจข้อมูล
  2. User Defined Functions
  3. Decorator Syntax [Python Only]
  4. Vectorized UDF
  5. Examples

If you can use the built-in functions in sql.functions, you certainly should use them as they less likely to have bugs and can be optimized by Catalyst.

However, the built-in functions don’t contain everything you need, and sometimes you have to write custom functions, known as User Defined Functions.

1. สำรวจข้อมูล

ใช้ตัวอย่างไฟล์ csv จาก Read data in CSV format

csvFile= "dbfs:/mnt/training/sample.tsv"

rawDF = (spark.read
    .option("sep", "\t")
    .csv(csvFile, header=True, inferSchema=True, multiLine=True, escape='"')
)
display(rawDF)

Let’s select the columns id and name, as well as filter out all the rows that contain nulls.

tempDF = (rawDF.select("id", "name")
    .dropna()
)
display(tempDF)

2. User Defined Functions

We’ve seen many built-in functions (e.g. avglitcol, etc.). However, sometimes you might need a specific function that is not provided, so let’s look at how to define your own User Defined Function.

For example, let’s say we want to get the some initial from our name field. Let’s start by writing that function in local Python/Scala.

def firstInitialFunction(name):
  return name[2:4]

firstInitialFunction("Jack")
Out: 'ck'

Now we have to define a UDF that wraps the function. This will serialize our function and send it to the executors so that we can use it in our DataFrame.

firstInitialUDF = udf(firstInitialFunction)

หรือจะกำหนด return type ไว้ก็ได้ แบบนี้

from pyspark.sql.types import *

firstInitialUDF = udf(firstInitialFunction, StringType())

หรือ

firstInitialUDF = udf(firstInitialFunction, "string")
from pyspark.sql.functions import col
display(tempDF.select(firstInitialUDF(col("name"))))

We can also create a UDF using spark.sql.register, which will create the UDF in the SQL namespace.

tempDF.createOrReplaceTempView("tempDF")

spark.udf.register("sql_udf", firstInitialFunction)
%sql
select sql_udf(name) as initial from tempDF

3. Decorator Syntax [Python Only]

Alternatively, you can define a UDF using decorator syntax in Python with the dataType the function will return.

However, you cannot call the local Python function anymore (e.g. decoratorUDF("Jack") will not work)

%python
# Our input/output is a sting
@udf("string")
def decoratorUDF(name):
  return name[2:4]
display(airbnbDF.select(decoratorUDF(col("name"))))

UDF Drawbacks:

  • UDFs cannot be optimized by the Catalyst Optimizer
  • The function has to be serialized and sent out to the executors
  • In the case of Python, there is even more overhead – we have to spin up a Python interpreter on every Executor to run the UDF (e.g. Python UDFs much slower than Scala UDFs)

4. Vectorized UDF

Benchmark

Vectorized UDFs utilize Apache Arrow to speed up computation. Let’s see how that helps improve our processing time.

Apache Arrow, is an in-memory columnar data format that is used in Spark to efficiently transfer data between JVM and Python processes. See more here.

%python
from pyspark.sql.functions import pandas_udf

# We have a string input/output
@pandas_udf("string")
def vectorizedUDF(name):
  return name.str[2:4]
display(airbnbDF.select(vectorizedUDF(col("name"))))

We can also register these Vectorized UDFs to the SQL namespace.

%python
spark.udf.register("sql_vectorized_udf", vectorizedUDF)
%sql
select sql_vectorized_udf(name) as initial from tempDF

5. Examples

# UDF
spark.conf.set("spark.sql.shuffle.partitions", 8)

@udf("String")
def lowercase(string):
  return string.lower()

@udf("String")
def removeHyphen(string):
  return string.replace("-", "")


dedupedDF = (df
  .select("*", 
          lowercase("firstname").alias("lcFirstName"),
          lowercase("middleName").alias("lcMiddleName"),
          lowercase("lastName").alias("lcLastName"),
          removeHyphen("ssn").alias("ssnNums")
         )
  .dropDuplicates(["lcFirstName", "lcMiddleName", "lcLastName", "ssnNums", "gender", "birthDate", "salary"])
  .drop("lcFirstName", "lcMiddleName", "lcLastName", "ssnNums")
)
# Pandas UDF approach - also concatenating fields
from pyspark.sql.functions import pandas_udf

spark.conf.set("spark.sql.shuffle.partitions", 8)

@pandas_udf("string")
def to_lower(first_name, middle_name, last_name):
  return (first_name + middle_name + last_name).str.lower()

@pandas_udf("string")
def remove_hyphen(ssn):
  return ssn.str.replace("-", "")

dedupedDF = (df
  .select("*", 
          to_lower("firstname", "middleName", "lastName").alias("lcFirstMiddleLast"), 
          removeHyphen("ssn").alias("ssnNums")
         )
  .dropDuplicates(["lcFirstMiddleLast", "ssnNums", "gender", "birthDate", "salary"])
  .drop("lcFirstMiddleLast", "ssnNums")
)
# write file

(dedupedDF.write
   .mode("overwrite")
   .parquet(destFile)
)
dedupedDF = spark.read.parquet(destFile)
print(f"Total Records: {dedupedDF.count()}")