- สำรวจข้อมูล
- User Defined Functions
- Decorator Syntax [Python Only]
- Vectorized UDF
- Examples
If you can use the built-in functions in sql.functions
, you certainly should use them as they less likely to have bugs and can be optimized by Catalyst.
However, the built-in functions don’t contain everything you need, and sometimes you have to write custom functions, known as User Defined Functions.
1. สำรวจข้อมูล
ใช้ตัวอย่างไฟล์ csv จาก Read data in CSV format
csvFile= "dbfs:/mnt/training/sample.tsv" rawDF = (spark.read .option("sep", "\t") .csv(csvFile, header=True, inferSchema=True, multiLine=True, escape='"') ) display(rawDF)
Let’s select the columns id
and name
, as well as filter out all the rows that contain nulls
.
tempDF = (rawDF.select("id", "name") .dropna() ) display(tempDF)
2. User Defined Functions
We’ve seen many built-in functions (e.g. avg
, lit
, col
, etc.). However, sometimes you might need a specific function that is not provided, so let’s look at how to define your own User Defined Function.
For example, let’s say we want to get the some initial from our name
field. Let’s start by writing that function in local Python/Scala.
def firstInitialFunction(name): return name[2:4] firstInitialFunction("Jack")
Out: 'ck'
Now we have to define a UDF that wraps the function. This will serialize our function and send it to the executors so that we can use it in our DataFrame.
firstInitialUDF = udf(firstInitialFunction)
หรือจะกำหนด return type ไว้ก็ได้ แบบนี้
from pyspark.sql.types import * firstInitialUDF = udf(firstInitialFunction, StringType())
หรือ
firstInitialUDF = udf(firstInitialFunction, "string")
from pyspark.sql.functions import col display(tempDF.select(firstInitialUDF(col("name"))))
We can also create a UDF using spark.sql.register
, which will create the UDF in the SQL namespace.
tempDF.createOrReplaceTempView("tempDF") spark.udf.register("sql_udf", firstInitialFunction)
%sql select sql_udf(name) as initial from tempDF
3. Decorator Syntax [Python Only]
Alternatively, you can define a UDF using decorator syntax in Python with the dataType the function will return.
However, you cannot call the local Python function anymore (e.g. decoratorUDF("Jack")
will not work)
%python # Our input/output is a sting @udf("string") def decoratorUDF(name): return name[2:4]
display(airbnbDF.select(decoratorUDF(col("name"))))
UDF Drawbacks:
- UDFs cannot be optimized by the Catalyst Optimizer
- The function has to be serialized and sent out to the executors
- In the case of Python, there is even more overhead – we have to spin up a Python interpreter on every Executor to run the UDF (e.g. Python UDFs much slower than Scala UDFs)
4. Vectorized UDF
Vectorized UDFs utilize Apache Arrow to speed up computation. Let’s see how that helps improve our processing time.
Apache Arrow, is an in-memory columnar data format that is used in Spark to efficiently transfer data between JVM and Python processes. See more here.
%python from pyspark.sql.functions import pandas_udf # We have a string input/output @pandas_udf("string") def vectorizedUDF(name): return name.str[2:4]
display(airbnbDF.select(vectorizedUDF(col("name"))))
We can also register these Vectorized UDFs to the SQL namespace.
%python spark.udf.register("sql_vectorized_udf", vectorizedUDF)
%sql select sql_vectorized_udf(name) as initial from tempDF
5. Examples
# UDF spark.conf.set("spark.sql.shuffle.partitions", 8) @udf("String") def lowercase(string): return string.lower() @udf("String") def removeHyphen(string): return string.replace("-", "") dedupedDF = (df .select("*", lowercase("firstname").alias("lcFirstName"), lowercase("middleName").alias("lcMiddleName"), lowercase("lastName").alias("lcLastName"), removeHyphen("ssn").alias("ssnNums") ) .dropDuplicates(["lcFirstName", "lcMiddleName", "lcLastName", "ssnNums", "gender", "birthDate", "salary"]) .drop("lcFirstName", "lcMiddleName", "lcLastName", "ssnNums") )
# Pandas UDF approach - also concatenating fields from pyspark.sql.functions import pandas_udf spark.conf.set("spark.sql.shuffle.partitions", 8) @pandas_udf("string") def to_lower(first_name, middle_name, last_name): return (first_name + middle_name + last_name).str.lower() @pandas_udf("string") def remove_hyphen(ssn): return ssn.str.replace("-", "") dedupedDF = (df .select("*", to_lower("firstname", "middleName", "lastName").alias("lcFirstMiddleLast"), removeHyphen("ssn").alias("ssnNums") ) .dropDuplicates(["lcFirstMiddleLast", "ssnNums", "gender", "birthDate", "salary"]) .drop("lcFirstMiddleLast", "ssnNums") )
# write file (dedupedDF.write .mode("overwrite") .parquet(destFile) ) dedupedDF = spark.read.parquet(destFile) print(f"Total Records: {dedupedDF.count()}")