SQL reference for Databricks SQL

Auto increment id

Databricks has IDENTITY columns for hosted Spark

[ GENERATED ALWAYS AS ( expr ) |
  GENERATED { ALWAYS | BY DEFAULT } AS IDENTITY 
  [ ( [ START WITH start ] [ INCREMENT BY step ] ) ] 
]

This works on Delta tables. Example:

create table gen1 (
     id long GENERATED ALWAYS AS IDENTITY
   , t string
)

Requires Runtime version 10.4 or above.

hash function (Databricks SQL)

Databricks – Create Table

สร้างตารางจากไฟล์ .csv

%sql
DROP TABLE IF EXISTS jack_db.sample_csv;

CREATE TABLE jack_db.sample_csv
(
  id       string
  , name   string
  , weight string
)
USING csv
OPTIONS(path "/mnt/training/sample.csv", header "true");

สร้างตารางจากโฟลเดอร์ที่เก็บไฟล์ .csv

%sql
DROP TABLE IF EXISTS jack_db.sample_csv;

CREATE TABLE jack_db.sample_csv
(
  id       string
  , name   string
  , weight string
)
USING csv
OPTIONS(path "/mnt/training/sample_csv/", header "true");

ถ้าใต้โฟลเดอร์ sample_csv/ ต้องการเก็บเป็นโฟลเดอร์ย่อย เช่น ptn=2021/, ptn=2022/ สามารถทำได้เลย ไม่ต้องใส่ PARTITIONED BY (column_name) เข้าไปในคิวรี CREATE TABLE โดย databricks จะรู้เองว่า ptn คือคอลัมน์ และทุกรายการของไฟล์ที่อยู่ในโฟลเดอร์ ptn=2021/ จะมีค่า ptn=2021 ทั้งหมด (ถึงแม้ในไฟล์จริงๆ ptn จะมีค่าอื่นก็ตาม)

เหมือนว่า databricks อ่านโครงสร้างไดเร็กทอรี ตอนสั่ง CREATE TABLE ดังนั้นการวางไฟล์/ไดเร็กทอรี เพิ่มเติม ต้องวางให้โครงสร้างเหมือนเดิม

Databricks SQL Connector for Python

The Databricks SQL Connector for Python is a Python library that allows you to use Python code to run SQL commands on Databricks clusters and Databricks SQL endpoints.

Requirements

Get started

Gather the following information for the cluster or SQL endpoint that you want to use:

Generate a personal access token

The number of personal access tokens per user is limited to 600 per workspace.

  1. Click User Settings Icon Settings in the lower left corner of your Databricks workspace.
  2. Click User Settings.
  3. Go to the Access Tokens tab.
  4. Click the Generate New Token button.
  5. Optionally enter a description (comment) and expiration period.

Install the Databricks SQL Connector for Python library

Install the Databricks SQL Connector for Python library on your development machine by running pip install databricks-sql-connector.

$ pip install databricks-sql-connector

Query data

The following code example demonstrates how to call the Databricks SQL Connector for Python to run a basic SQL command on a cluster or SQL endpoint.

from databricks import sql

with sql.connect(server_hostname="<server-hostname>",
                 http_path="<http-path>",
                 access_token="<access-token>") as connection:
    with connection.cursor() as cursor:
        cursor.execute("SELECT * FROM <database-name>.<table-name> LIMIT 2")
        result = cursor.fetchall()

        for row in result:
          print(row)

Insert data

from databricks import sql

with sql.connect(server_hostname="...", http_path="...", access_token="...") as connection:
  with connection.cursor() as cursor:
    cursor.execute("CREATE TABLE IF NOT EXISTS squares (x int, x_squared int)")
    squares = [(i, i * i) for i in range(100)]
    values = ",".join([f"({x}, {y})" for (x, y) in squares])

    cursor.execute(f"INSERT INTO squares VALUES {values}")

Query metadata

from databricks import sql

with sql.connect(server_hostname="...", http_path="...", access_token="...") as connection:
  with connection.cursor() as cursor:
    cursor.columns(schema_name="default", table_name="squares")
    print(cursor.fetchall())

Cursor and connection management

from databricks import sql

connection = sql.connect(server_hostname="...", http_path="...", access_token="...")
cursor = connection.cursor()

cursor.execute("SELECT * from range(10)")
print(cursor.fetchall())

cursor.close()
connection.close()

Configure logging

import logging

from databricks import sql

logging.getLogger('databricks.sql').setLevel(logging.DEBUG)
sql.connect(...)

Convert a JSON string to a dict

Example1

สร้างตัวแปรแบบ str แล้วค่อยทำเป็น dict ด้วย json.loads()

import json

json_string = """
{
    "id": "abc123",
    "name": "Bob"
}
"""
json_object = json.loads(json_string)

print(type(json_object))  # <class 'dict'>
print(json_object["id"])  # abc123

data = json.dumps(json_object)
print(type(data))         # <class 'str'>
print(data)               # {"id": "abc123", "name": "Bob"}

Example2

import json

jsonString = '{"a":54, "b": {"c":87}}'
aDict = json.loads(jsonString)

print(aDict)
print(aDict['a'])
print(aDict['b'])
print(aDict['b']['c'])
{'a': 54, 'b': {'c': 87}}
54
{'c': 87}
87

Example3

import json

name = "Jack"

json_string = """
{
    "id": "abc123",
    "name": ""
}
"""

json_object = json.loads(json_string) # <class 'str'>
json_object["name"] = name

print(type(json_object))  # <class 'dict'>
print(json_object)        # {'id': 'abc123', 'name': 'Jack'}

data = json.dumps(json_object)
print(type(data))         # <class 'str'>
print(data)               # {"id": "abc123", "name": "Jack"}

Example4

สร้างตัวแปรแบบ dict แล้วค่อยทำเป็น str ด้วย json.dumps()

import json

json_dict = {}
json_dict['id'] = '456'
json_dict['name'] = 'Jack'

print(type(json_dict))    # <class 'dict'>
print(json_dict)          # {'id': 'abc123', 'name': 'Jack'}

data = json.dumps(json_dict)
print(type(data))         # <class 'str'>
print(data)               # {"id": "abc123", "name": "Jack"}

Python and REST APIs

  1. ติดตั้ง requests
  2. ทดลองเรียกแบบ GET
  3. ทดลองเรียกแบบ GET ที่เป็น trust a self signed SSL certificate
  4. ทดลองเรียกแบบ POST

ทดลองเรียกแบบ POST

1. ติดตั้ง requests

python -m pip install requests

2. ทดลองเรียกแบบ GET

import requests
api_url = "https://jsonplaceholder.typicode.com/todos/1"
response = requests.get(api_url)

print(response.status_code)
print(response.headers["Content-Type"])
print(response.json())
200
application/json; charset=utf-8
{'userId': 1, 'id': 1, 'title': 'delectus aut autem', 'completed': False}

3. ทดลองเรียกแบบ GET ที่เป็น trust a self signed SSL certificate

เช่น สร้างโปรเจ็กส์ webapi ขึ้นมาด้วย .Net 6 แล้วรันแบบ localhost ก็จะเป็นแบบ self signed SSL certificate

import requests
api_url = "https://localhost:7237/WeatherForecast"
response = requests.get(api_url)

print(response.status_code)
print(response.headers["Content-Type"])
print(response.json())

จะได้ error

requests.exceptions.SSLError: HTTPSConnectionPool(host='localhost', port=7237): Max retries exceeded with url: /WeatherForecast (Caused by SSLError(SSLCertVerificationError(1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: self signed certificate (_ssl.c:1129)')))

วิธีแก้

ให้ทำการ export certificate ไฟล์ ออกมาจาก chrome ก่อน โดยไปที่ More tools | Developer tools

ไปที่ Security | Overview แล้วคลิกที่ View certificate

ที่แท็ป Details เลือก Copy to File…

แล้วเลือก Base64 encoded X.509 (.CER)

ปรับโคีด Python ให้อ้างถึงไฟล์ .cer

import requests
api_url = "https://localhost:7237/WeatherForecast"
response = requests.get(api_url, verify='./webapiCS.cer')

print(response.status_code)
print(response.headers["Content-Type"])
print(response.json())
200
application/json; charset=utf-8
[{'date': '2022-05-18T15:16:09.823195+07:00', 'temperatureC': -3, 'temperatureF': 27, 'summary': 'Mild'}, {'date': '2022-05-19T15:16:09.823412+07:00', 'temperatureC': -7, 'temperatureF': 20, 'summary': 'Cool'}, {'date': '2022-05-20T15:16:09.823414+07:00', 'temperatureC': -17, 'temperatureF': 2, 'summary': 'Warm'}, {'date': '2022-05-21T15:16:09.8234141+07:00', 'temperatureC': 7, 'temperatureF': 44, 'summary': 'Warm'}, {'date': '2022-05-22T15:16:09.8234142+07:00', 'temperatureC': -4, 'temperatureF': 25, 'summary': 'Hot'}]

หรือแก้ไขด้วยการไม่ต้อง verify

r = requests.get(url, verify=False)

4. ทดลองเรียกแบบ POST

import requests
api_url = "https://jsonplaceholder.typicode.com/todos"
todo = {"userId": 1, "title": "Buy milk", "completed": False}
response = requests.post(api_url, json=todo)
response.json()

print(response.status_code)
print(response.headers["Content-Type"])
print(response.json())
201
application/json; charset=utf-8
{'userId': 1, 'title': 'Buy milk', 'completed': False, 'id': 201}

หรือ

import requests
import json
api_url = "https://jsonplaceholder.typicode.com/todos"
todo = {"userId": 1, "title": "Buy milk", "completed": False}
headers =  {"Content-Type":"application/json"}
response = requests.post(api_url, data=json.dumps(todo), headers=headers)
response.json()

print(response.status_code)
print(response.headers["Content-Type"])
print(response.json())

Jupyter Notebook Shortcuts

FunctionKeyboard ShortcutMenu Tools
Save notebookEsc + sFile → Save and Checkpoint
Create new cellEsc + a (above), Esc + b (below)Insert→ cell above Insert → cell below
Run CellCtrl + enterCell → Run Cell
Copy CellcCopy Key
Paste CellvPaste Key
Cut Cellx
Undoz
Interrupt KernelEsc + i iKernel → Interrupt
Restart KernelEsc + 0 0Kernel → Restart
Find and replace on your code but not the outputsEsc + fN/A
merge multiple cellsShift + MN/A
When placed before a function Information about a function from its documentation?N/A

Delta Lake Batch Operations

  1. Delta Lake Batch Operations – Create
  2. Delta Lake Batch Operations – Append
  3. Delta Lake Batch Operations – Upsert
  4. Merge ด้วย Python

You will notice that throughout this course, there is a lot of context switching between PySpark/Scala and SQL. This is because:

  • read and write operations are performed on DataFrames using PySpark or Scala
  • table creates and queries are performed directly off Delta Lake tables using SQL

1. Delta Lake Batch Operations – Create

Creating Delta Lakes is as easy as changing the file type while performing a write. In this section, we’ll read from a CSV and write to Delta.

%fs ls /mnt/training/delta/sample_csv/
inputPath = "/mnt/training/sample.tsv"
DataPath =  "/mnt/training/delta/sample_csv/"

#remove directory if it exists
dbutils.fs.rm(DataPath, True)
  • Read the data into a DataFrame. We supply the schema.
  • Use overwrite mode so that there will not be an issue in rewriting the data in case you end up running the cell again.
  • Partition on Country (name) because there are only a few unique countries and because we will use Country as a predicate in a WHERE clause.

Then write the data to Delta Lake.

pip install pyspark
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType

inputSchema = StructType([
  StructField("ID", IntegerType(), True),
  StructField("Name", StringType(), True),
  StructField("Weight", IntegerType(), True)
])

rawDataDF = (spark.read
  .option("header", "true")
  .option("sep", "\t")
  .schema(inputSchema)
  .csv(inputPath)
)

# write to Delta Lake
rawDataDF.write.mode("overwrite").format("delta").partitionBy("Name").save(DataPath)
rawDataDF.printSchema()
display(rawDataDF)
%fs ls /mnt/training/delta/sample_csv/
แยกเก็บเป็น directory ตาม partition ที่กำหนด (Name)

While we show creating a table in the next section, Spark SQL queries can run directly on a directory of data, for delta use the following syntax:
SELECT * FROM delta./path/to/delta_directory

display(spark.sql("SELECT * FROM delta.`{}` LIMIT 5".format(DataPath)))
display(spark.sql("SELECT * FROM delta.`/mnt/training/delta/sample_csv/` LIMIT 5"))

CREATE A Table Using Delta Lake

Create a table called sample_data using DELTA out of the above data.

The notation is:

CREATE TABLE <table-name>
USING DELTA
LOCATION <path-do-data>

Tables created with a specified LOCATION are considered unmanaged by the metastore. Unlike a managed table, where no path is specified, an unmanaged table’s files are not deleted when you DROP the table. However, changes to either the registered table or the files will be reflected in both locations.

Managed tables require that the data for your table be stored in DBFS. Unmanaged tables only store metadata in DBFS.

Since Delta Lake stores schema (and partition) info in the _delta_log directory, we do not have to specify partition columns!

spark.sql("""
  DROP TABLE IF EXISTS sample_data
""")
spark.sql("""
  CREATE TABLE sample_data
  USING DELTA
  LOCATION '{}'
""".format(DataPath))
%sql
SELECT count(*) FROM sample_data

Metadata

Since we already have data backing customer_data_delta in place, the table in the Hive metastore automatically inherits the schema, partitioning, and table properties of the existing data.

Note that we only store table name, path, database info in the Hive metastore, the actual schema is stored in the _delta_log directory as shown below.

display(dbutils.fs.ls(DataPath + "/_delta_log"))

Metadata is displayed through DESCRIBE DETAIL <tableName>.

As long as we have some data in place already for a Delta Lake table, we can infer schema.

%sql
DESCRIBE DETAIL sample_data

Key Takeaways

Saving to Delta Lake is as easy as saving to Parquet, but creates an additional log file.

Using Delta Lake to create tables is straightforward and you do not need to specify schemas.

2. Delta Lake Batch Operations – Append

In this section, we’ll load a small amount of new data and show how easy it is to append this to our existing Delta table.

inputPath2 = "/mnt/training/sample2.tsv"

newDataDF = (spark
  .read
  .option("header", "true")
  .option("sep", "\t")
  .schema(inputSchema)
  .csv(inputPath2)
)

Do a simple count of number of new items to be added to production data.

newDataDF.count()
display(newDataDF)

APPEND Using Delta Lake

Adding to our existing Delta Lake is as easy as modifying our write statement and specifying the append mode.

(newDataDF
  .write
  .format("delta")
  .partitionBy("Name")
  .mode("append")
  .save(DataPath)
)

Perform a simple count query to verify the number of records and notice it is correct.

The changes to our files have been immediately reflected in the table that we’ve registered.

%sql
SELECT count(*) FROM sample_data

create กับ append คนละ session บางทีข้อมูลเดิมหาย!?!

Key Takeaways

With Delta Lake, you can easily append new data without schema-on-read issues.

Changes to Delta Lake files will immediately be reflected in registered Delta tables.

3. Delta Lake Batch Operations – Upsert

To UPSERT means to “UPdate” and “inSERT”. In other words, UPSERT is literally TWO operations. It is not supported in traditional data lakes, as running an UPDATE could invalidate data that is accessed by the subsequent INSERT operation.

Using Delta Lake, however, we can do UPSERTS. Delta Lake combines these operations to guarantee atomicity to

  • INSERT a row
  • if the row already exists, UPDATE the row.
inputPath3 = "/mnt/training/sample3.tsv"
upsertDF = (spark
  .read
  .option("header", "true")
  .option("sep", "\t")
  .schema(inputSchema)
  .csv(inputPath3)
)
display(upsertDF)

We’ll register this as a temporary view so that this table doesn’t persist in DBFS (but we can still use SQL to query it).

upsertDF.createOrReplaceTempView("upsert_data")
%sql
SELECT * FROM upsert_data

We can use UPSERT to simultaneously INSERT our new data and UPDATE our previous records.

%sql
MERGE INTO sample_data   -- Delta table
USING upsert_data        -- another table
ON sample_data.ID = upsert_data.ID
  -- AND 
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED
  THEN INSERT *

Notice how this data is seamlessly incorporated into sample_data

%sql
SELECT * FROM sample_data ORDER BY ID

Upsert syntax

Upserting, or merging, in Delta Lake provides fine-grained updates of your data. The following syntax shows how to perform an Upsert:

MERGE INTO customers -- Delta table
USING updates
ON customers.customerId = source.customerId
WHEN MATCHED THEN
    UPDATE SET address = updates.address
WHEN NOT MATCHED
    THEN INSERT (customerId, address) VALUES (updates.customerId, updates.address)

See update table data syntax documentation.

Additional Topics & Resources

4. Merge ด้วย Python

เดิมอ่านตารางมาเป็น dataframe

tempDF = spark.read.table("sample_data")

คราวนี้อ่านมาเป็น Delta Table

  • NameError: name ‘DeltaTable’ is not defined
pip install delta-spark
from delta.tables import *

deltaTable = DeltaTable.forPath(spark, "/mnt/training/delta/sample_csv/")

เตรียมข้อมูลใหม่ที่จะนำมา merge

inputPath4 = "/mnt/training/sample4.tsv"
newData = (spark
  .read
  .option("header", "true")
  .option("sep", "\t")
  .schema(inputSchema)
  .csv(inputPath4)
)
newData.createOrReplaceTempView("new_data")

สั่ง Merge โดย update และ insert เฉพาะคอลัมน์ name

from pyspark.sql.functions import col

deltaTable.alias("sample_data") \
  .merge(
    newData.alias("new_data"),
    "sample_data.id = new_data.id") \
  .whenMatchedUpdate(set = { "name": col("new_data.name") }) \
  .whenNotMatchedInsert(values = { "name": col("new_data.name") }) \
  .execute()

deltaTable.toDF().show()

คอลัมน์ Weight จะไม่มีการ update และถ้ามีการ insert ก็จะมีค่าเป็น null

%sql
SELECT * FROM sample_data ORDER BY ID DESC

แต่ถ้าแบบนี้จะ update และ insert ทุกคอลัมน์

from pyspark.sql.functions import col

deltaTable.alias("sample_data") \
  .merge(
    newData.alias("new_data"),
    "sample_data.id = new_data.id") \
  .whenMatchedUpdateAll() \
  .whenNotMatchedInsertAll() \
  .execute()

deltaTable.toDF().show()

Open Source Delta Lake

Delta Lake is an open-source storage layer that brings ACID transactions to Apache Spark™ and big data workloads.

  1. Key Features
  2. Key Concepts: Delta Lake Architecture

1. Key Features

Quick start intro to Delta Lake.

ACID Transactions: Data lakes typically have multiple data pipelines reading and writing data concurrently, and data engineers have to go through a tedious process to ensure data integrity, due to the lack of transactions. Delta Lake brings ACID transactions to your data lakes. It provides serializability, the strongest level of isolation level.

Scalable Metadata Handling: In big data, even the metadata itself can be “big data”. Delta Lake treats metadata just like data, leveraging Spark’s distributed processing power to handle all its metadata. As a result, Delta Lake can handle petabyte-scale tables with billions of partitions and files at ease.

Time Travel (data versioning): Delta Lake provides snapshots of data enabling developers to access and revert to earlier versions of data for audits, rollbacks or to reproduce experiments.

Open Format: All data in Delta Lake is stored in Apache Parquet format enabling Delta Lake to leverage the efficient compression and encoding schemes that are native to Parquet.

Unified Batch and Streaming Source and Sink: A table in Delta Lake is both a batch table, as well as a streaming source and sink. Streaming data ingest, batch historic backfill, and interactive queries all just work out of the box.

Schema Enforcement: Delta Lake provides the ability to specify your schema and enforce it. This helps ensure that the data types are correct and required columns are present, preventing bad data from causing data corruption.

Schema Evolution: Big data is continuously changing. Delta Lake enables you to make changes to a table schema that can be applied automatically, without the need for cumbersome DDL.

100% Compatible with Apache Spark API: Developers can use Delta Lake with their existing data pipelines with minimal change as it is fully compatible with Spark, the commonly used big data processing engine.

2. Key Concepts: Delta Lake Architecture

Throughout our Delta Lake discussions, we’ll often refer to the concept of Bronze/Silver/Gold tables. These levels refer to the state of data refinement as data flows through a processing pipeline.

These levels are conceptual guidelines, and implemented architectures may have any number of layers with various levels of enrichment. Below are some general ideas about the state of data in each level.

  • Bronze tables
    • Raw data (or very little processing)
    • Data will be stored in the Delta format (can encode raw bytes as a column)
  • Silver tables
    • Data that is directly queryable and ready for insights
    • Bad records have been handled, types have been enforced
  • Gold tables
    • Highly refined views of the data
    • Aggregate tables for BI
    • Feature tables for data scientists

For different workflows, things like schema enforcement and deduplication may happen in different places.

Work with user-defined functions

  1. สำรวจข้อมูล
  2. User Defined Functions
  3. Decorator Syntax [Python Only]
  4. Vectorized UDF
  5. Examples

If you can use the built-in functions in sql.functions, you certainly should use them as they less likely to have bugs and can be optimized by Catalyst.

However, the built-in functions don’t contain everything you need, and sometimes you have to write custom functions, known as User Defined Functions.

1. สำรวจข้อมูล

ใช้ตัวอย่างไฟล์ csv จาก Read data in CSV format

csvFile= "dbfs:/mnt/training/sample.tsv"

rawDF = (spark.read
    .option("sep", "\t")
    .csv(csvFile, header=True, inferSchema=True, multiLine=True, escape='"')
)
display(rawDF)

Let’s select the columns id and name, as well as filter out all the rows that contain nulls.

tempDF = (rawDF.select("id", "name")
    .dropna()
)
display(tempDF)

2. User Defined Functions

We’ve seen many built-in functions (e.g. avglitcol, etc.). However, sometimes you might need a specific function that is not provided, so let’s look at how to define your own User Defined Function.

For example, let’s say we want to get the some initial from our name field. Let’s start by writing that function in local Python/Scala.

def firstInitialFunction(name):
  return name[2:4]

firstInitialFunction("Jack")
Out: 'ck'

Now we have to define a UDF that wraps the function. This will serialize our function and send it to the executors so that we can use it in our DataFrame.

firstInitialUDF = udf(firstInitialFunction)

หรือจะกำหนด return type ไว้ก็ได้ แบบนี้

from pyspark.sql.types import *

firstInitialUDF = udf(firstInitialFunction, StringType())

หรือ

firstInitialUDF = udf(firstInitialFunction, "string")
from pyspark.sql.functions import col
display(tempDF.select(firstInitialUDF(col("name"))))

We can also create a UDF using spark.sql.register, which will create the UDF in the SQL namespace.

tempDF.createOrReplaceTempView("tempDF")

spark.udf.register("sql_udf", firstInitialFunction)
%sql
select sql_udf(name) as initial from tempDF

3. Decorator Syntax [Python Only]

Alternatively, you can define a UDF using decorator syntax in Python with the dataType the function will return.

However, you cannot call the local Python function anymore (e.g. decoratorUDF("Jack") will not work)

%python
# Our input/output is a sting
@udf("string")
def decoratorUDF(name):
  return name[2:4]
display(airbnbDF.select(decoratorUDF(col("name"))))

UDF Drawbacks:

  • UDFs cannot be optimized by the Catalyst Optimizer
  • The function has to be serialized and sent out to the executors
  • In the case of Python, there is even more overhead – we have to spin up a Python interpreter on every Executor to run the UDF (e.g. Python UDFs much slower than Scala UDFs)

4. Vectorized UDF

Benchmark

Vectorized UDFs utilize Apache Arrow to speed up computation. Let’s see how that helps improve our processing time.

Apache Arrow, is an in-memory columnar data format that is used in Spark to efficiently transfer data between JVM and Python processes. See more here.

%python
from pyspark.sql.functions import pandas_udf

# We have a string input/output
@pandas_udf("string")
def vectorizedUDF(name):
  return name.str[2:4]
display(airbnbDF.select(vectorizedUDF(col("name"))))

We can also register these Vectorized UDFs to the SQL namespace.

%python
spark.udf.register("sql_vectorized_udf", vectorizedUDF)
%sql
select sql_vectorized_udf(name) as initial from tempDF

5. Examples

# UDF
spark.conf.set("spark.sql.shuffle.partitions", 8)

@udf("String")
def lowercase(string):
  return string.lower()

@udf("String")
def removeHyphen(string):
  return string.replace("-", "")


dedupedDF = (df
  .select("*", 
          lowercase("firstname").alias("lcFirstName"),
          lowercase("middleName").alias("lcMiddleName"),
          lowercase("lastName").alias("lcLastName"),
          removeHyphen("ssn").alias("ssnNums")
         )
  .dropDuplicates(["lcFirstName", "lcMiddleName", "lcLastName", "ssnNums", "gender", "birthDate", "salary"])
  .drop("lcFirstName", "lcMiddleName", "lcLastName", "ssnNums")
)
# Pandas UDF approach - also concatenating fields
from pyspark.sql.functions import pandas_udf

spark.conf.set("spark.sql.shuffle.partitions", 8)

@pandas_udf("string")
def to_lower(first_name, middle_name, last_name):
  return (first_name + middle_name + last_name).str.lower()

@pandas_udf("string")
def remove_hyphen(ssn):
  return ssn.str.replace("-", "")

dedupedDF = (df
  .select("*", 
          to_lower("firstname", "middleName", "lastName").alias("lcFirstMiddleLast"), 
          removeHyphen("ssn").alias("ssnNums")
         )
  .dropDuplicates(["lcFirstMiddleLast", "ssnNums", "gender", "birthDate", "salary"])
  .drop("lcFirstMiddleLast", "ssnNums")
)
# write file

(dedupedDF.write
   .mode("overwrite")
   .parquet(destFile)
)
dedupedDF = spark.read.parquet(destFile)
print(f"Total Records: {dedupedDF.count()}")

Work with DataFrames

  1. Create a DataFrame
  2. count()
  3. cache() & persist()
  4. show(..)
  5. display(..)
  6. limit(..)
  7. select(..)
  8. drop(..)
  9. distinct() & dropDuplicates()
  10. dropDuplicates(columns…)
  11. filter()
  12. DataFrames vs SQL & Temporary Views

1. Create a DataFrame

  • We can read the Parquet files into a DataFrame.
  • We’ll start with the object spark, an instance of SparkSession and the entry point to Spark 2.0 applications.
  • From there we can access the read object which gives us an instance of DataFrameReader.

ใช้ตัวอย่างไฟล์ parquet จาก Read data in Parquet format

parquetDir = "/mnt/training/sample_parquet/"
tempDF = (spark             # Our SparkSession & Entry Point
  .read                     # Our DataFrameReader
  .parquet(parquetDir)      # Returns an instance of DataFrame
)
print(tempDF)               # Python hack to see the data type
DataFrame[registration_dttm: timestamp, id: int, first_name: string, last_name: string, email: string, gender: string, ip_address: string, cc: string, country: string, birthdate: string, salary: double, title: string, comments: string]

2. count()

If you look at the API docs, count() is described like this: “Returns the number of rows in the Dataset.count() will trigger a job to process the request and return a value.

We can now count all records in our DataFrame like this:

total = tempDF.count()

print("Record Count: {0:,}".format( total ))
Record Count: 5,000

3. cache() & persist()

The ability to cache data is one technique for achieving better performance with Apache Spark.

This is because every action requires Spark to read the data from its source (Azure Blob, Amazon S3, HDFS, etc.) but caching moves that data into the memory of the local executor for “instant” access.

cache() is just an alias for persist().

(tempDF
  .cache()         # Mark the DataFrame as cached
  .count()         # Materialize the cache
) 

If you re-run that command, it should take significantly less time.

total = tempDF.count()

print("Record Count: {0:,}".format( total ))

Performance considerations of Caching Data

When Caching Data you are placing it on the workers of the cluster.

Caching takes resources, before moving a notebook into production please check and verify that you are appropriately using cache.

And as a quick side note, you can remove a cache by calling the DataFrame‘s unpersist() method but, it is not necessary.

4. show(..)

show() is a function that will allow us to print the data to the console. In the case of Python, we have one method with two optional parameters.

show(..) method effectively has two optional parameters:

  • n: The number of records to print to the console, the default being 20.
  • truncate: If true, columns wider than 20 characters will be truncated, where the default is true.
tempDF.show()
tempDF.show(2, True)
+-------------------+---+----------+---------+----------------+------+--------------+----------------+---------+---------+---------+----------------+--------+
|  registration_dttm| id|first_name|last_name|           email|gender|    ip_address|              cc|  country|birthdate|   salary|           title|comments|
+-------------------+---+----------+---------+----------------+------+--------------+----------------+---------+---------+---------+----------------+--------+
|2016-02-03 07:55:29|  1|    Amanda|   Jordan|ajordan0@com.com|Female|   1.197.201.2|6759521864920116|Indonesia| 3/8/1971| 49756.53|Internal Auditor|   1E+02|
|2016-02-03 17:04:03|  2|    Albert|  Freeman| afreeman1@is.gd|  Male|218.111.175.34|                |   Canada|1/16/1968|150280.17|   Accountant IV|        |
+-------------------+---+----------+---------+----------------+------+--------------+----------------+---------+---------+---------+----------------+--------+
only showing top 2 rows

Note: The function show(..) is an action which triggers a job.

5. display(..)

Instead of calling show(..) on an existing DataFrame we can instead pass our DataFrame to the display(..) command:

display(tempDF)

show(..) vs display(..)

  • show(..) is part of core spark – display(..) is specific to our notebooks.
  • show(..) is ugly – display(..) is pretty.
  • show(..) has parameters for truncating both columns and rows – display(..) does not.
  • show(..) is a function of the DataFrame/Dataset class – display(..) works with a number of different objects.
  • display(..) is more powerful – with it, you can…
    • Download the results as CSV
    • Render line charts, bar chart & other graphs, maps and more.
    • See up to 1000 records at a time.

display(..) is an action which triggers a job.

6. limit(..)

limit(..) is described like this: “Returns a new Dataset by taking the first n rows…

show(..), like many actions, does not return anything. On the other hand, transformations like limit(..) return a new DataFrame:

limitedDF = tempDF.limit(5) # "limit" the number of records to the first 5

limitedDF # Python hack to force printing of the data type
Out[26]: DataFrame[registration_dttm: timestamp, id: int, first_name: string, last_name: string, email: string, gender: string, ip_address: string, cc: string, country: string, birthdate: string, salary: double, title: string, comments: string]

Nothing Happened

  • Notice how “nothing” happened – that is no job was triggered.
  • This is because we are simply defining the second step in our transformations.
    1. Read in the parquet file (represented by tempDF).
    2. Limit those records to just the first 5 (represented by limitedDF).
  • It’s not until we induce an action that a job is triggered and the data is processed

7. select(..)

The select(..) command is one of the most powerful and most commonly used transformations.

select(..) is described like this: “Returns a new Dataset by computing the given Column expression for each element.

Just like limit(..)select(..)

  • does not trigger a job
  • returns a new DataFrame
  • simply defines the next transformation in a sequence of transformations.
# Transform the data by selecting only three columns
onlyThreeDF = (tempDF
  .select("id", "first_name", "email")
)
# Now let's take a look at what the schema looks like
onlyThreeDF.printSchema()
root
 |-- id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- email: string (nullable = true)
onlyThreeDF.show(5, False)
+---+----------+------------------------+
|id |first_name|email                   |
+---+----------+------------------------+
|1  |Amanda    |ajordan0@com.com        |
|2  |Albert    |afreeman1@is.gd         |
|3  |Evelyn    |emorgan2@altervista.org |
|4  |Denise    |driley3@gmpg.org        |
|5  |Carlos    |cburns4@miitbeian.gov.cn|
+---+----------+------------------------+
only showing top 5 rows

8. drop(..)

Instead of selecting everything we wanted, drop(..) allows us to specify the columns we don’t want.

drop(..) is described like this: “Returns a new Dataset with a column dropped.

# Transform the data by drop some column
droppedDF = (tempDF
  .drop("registration_dttm")
)
# Now let's take a look at what the schema looks like
droppedDF.printSchema()

Again, drop(..) is just one more transformation – that is no job is triggered.

9. distinct() & dropDuplicates()

These two transformations do the same thing. In fact, they are aliases for one another.

  • You can see this by looking at the source code for these two methods
  • def distinct(): Dataset[T] = dropDuplicates()
  • See Dataset.scala

distinct(..) and dropDuplicates(..) are described like this: “Returns a new Dataset that contains only the unique rows from this Dataset….

distinctDF = (pagecountsEnAllDF     # Our original DataFrame
  .select("gender")                 # Drop all columns except the "gender" column
  .distinct()                       # Reduce the set of all records to just the distinct column.
)
distinctDF = pagecountsEnAllDF.distinct()
print("Distinct count: " + str(distinctDF.count()))
display(distinctDF)
total = distinctDF.count()     
print("Distinct Projects: {0:,}".format( total ))
Distinct Projects: 3

10. dropDuplicates(columns…)

The method dropDuplicates(..) has a second variant that accepts one or more columns.

  • The distinction is not performed across the entire record unlike distinct() or even dropDuplicates().
  • The distinction is based only on the specified columns.
  • This allows us to keep all the original columns in our DataFrame.

11. filter()

กรณีนี้ จะไม่เอา row ที่ คอลัมน์ Name มีค่า product4

df2 = df.filter(df.Name != 'product4')

12. DataFrames vs SQL & Temporary Views

The DataFrames API is built upon an SQL engine.

As such we can “convert” a DataFrame into a temporary view (or table) and then use it in “standard” SQL.

Let’s start by creating a temporary view from a previous DataFrame.

tempDF.createOrReplaceTempView("family")

Now that we have a temporary view (or table) we can start expressing our queries and transformations in SQL:

%sql

SELECT *
FROM family

And we can just as easily express in SQL the distinct list of projects, and just because we can, we’ll sort that list:

%sql

SELECT DISTINCT gender
FROM family
ORDER BY gender

And converting from SQL back to a DataFrame is just as easy:

tableDF = spark.sql("SELECT DISTINCT gender FROM family ORDER BY gender") 
display(tableDF)