%sql
DROP TABLE IF EXISTS jack_db.sample_csv;
CREATE TABLE jack_db.sample_csv
(
id string
, name string
, weight string
)
USING csv
OPTIONS(path "/mnt/training/sample.csv", header "true");
สร้างตารางจากโฟลเดอร์ที่เก็บไฟล์ .csv
%sql
DROP TABLE IF EXISTS jack_db.sample_csv;
CREATE TABLE jack_db.sample_csv
(
id string
, name string
, weight string
)
USING csv
OPTIONS(path "/mnt/training/sample_csv/", header "true");
The Databricks SQL Connector for Python is a Python library that allows you to use Python code to run SQL commands on Databricks clusters and Databricks SQL endpoints.
Requirements
A development machine running Python 3.7 or higher.
Gather the following information for the cluster or SQL endpoint that you want to use:
Cluster – The server hostname of the cluster. You can get this from the Server Hostname value in the Advanced Options > JDBC/ODBC tab for your cluster.
SQL endpoint – The server hostname of the SQL endpoint. You can get this from the Server Hostname value in the Connection Details tab for your SQL endpoint.
The number of personal access tokens per user is limited to 600 per workspace.
Click Settings in the lower left corner of your Databricks workspace.
Click User Settings.
Go to the Access Tokens tab.
Click the Generate New Token button.
Optionally enter a description (comment) and expiration period.
Install the Databricks SQL Connector for Python library
Install the Databricks SQL Connector for Python library on your development machine by running pip install databricks-sql-connector.
$ pip install databricks-sql-connector
Query data
The following code example demonstrates how to call the Databricks SQL Connector for Python to run a basic SQL command on a cluster or SQL endpoint.
from databricks import sql
with sql.connect(server_hostname="<server-hostname>",
http_path="<http-path>",
access_token="<access-token>") as connection:
with connection.cursor() as cursor:
cursor.execute("SELECT * FROM <database-name>.<table-name> LIMIT 2")
result = cursor.fetchall()
for row in result:
print(row)
Insert data
from databricks import sql
with sql.connect(server_hostname="...", http_path="...", access_token="...") as connection:
with connection.cursor() as cursor:
cursor.execute("CREATE TABLE IF NOT EXISTS squares (x int, x_squared int)")
squares = [(i, i * i) for i in range(100)]
values = ",".join([f"({x}, {y})" for (x, y) in squares])
cursor.execute(f"INSERT INTO squares VALUES {values}")
Query metadata
from databricks import sql
with sql.connect(server_hostname="...", http_path="...", access_token="...") as connection:
with connection.cursor() as cursor:
cursor.columns(schema_name="default", table_name="squares")
print(cursor.fetchall())
Cursor and connection management
from databricks import sql
connection = sql.connect(server_hostname="...", http_path="...", access_token="...")
cursor = connection.cursor()
cursor.execute("SELECT * from range(10)")
print(cursor.fetchall())
cursor.close()
connection.close()
Configure logging
import logging
from databricks import sql
logging.getLogger('databricks.sql').setLevel(logging.DEBUG)
sql.connect(...)
While we show creating a table in the next section, Spark SQL queries can run directly on a directory of data, for delta use the following syntax: SELECT * FROM delta./path/to/delta_directory
display(spark.sql("SELECT * FROM delta.`{}` LIMIT 5".format(DataPath)))
display(spark.sql("SELECT * FROM delta.`/mnt/training/delta/sample_csv/` LIMIT 5"))
CREATE A Table Using Delta Lake
Create a table called sample_data using DELTA out of the above data.
The notation is:
CREATE TABLE <table-name>
USING DELTA
LOCATION <path-do-data>
Tables created with a specified LOCATION are considered unmanaged by the metastore. Unlike a managed table, where no path is specified, an unmanaged table’s files are not deleted when you DROP the table. However, changes to either the registered table or the files will be reflected in both locations.
Managed tables require that the data for your table be stored in DBFS. Unmanaged tables only store metadata in DBFS.
Since Delta Lake stores schema (and partition) info in the _delta_log directory, we do not have to specify partition columns!
spark.sql("""
DROP TABLE IF EXISTS sample_data
""")
spark.sql("""
CREATE TABLE sample_data
USING DELTA
LOCATION '{}'
""".format(DataPath))
%sql
SELECT count(*) FROM sample_data
Metadata
Since we already have data backing customer_data_delta in place, the table in the Hive metastore automatically inherits the schema, partitioning, and table properties of the existing data.
Note that we only store table name, path, database info in the Hive metastore, the actual schema is stored in the _delta_log directory as shown below.
display(dbutils.fs.ls(DataPath + "/_delta_log"))
Metadata is displayed through DESCRIBE DETAIL <tableName>.
As long as we have some data in place already for a Delta Lake table, we can infer schema.
%sql
DESCRIBE DETAIL sample_data
Key Takeaways
Saving to Delta Lake is as easy as saving to Parquet, but creates an additional log file.
Using Delta Lake to create tables is straightforward and you do not need to specify schemas.
2. Delta Lake Batch Operations – Append
In this section, we’ll load a small amount of new data and show how easy it is to append this to our existing Delta table.
With Delta Lake, you can easily append new data without schema-on-read issues.
Changes to Delta Lake files will immediately be reflected in registered Delta tables.
3. Delta Lake Batch Operations – Upsert
To UPSERT means to “UPdate” and “inSERT”. In other words, UPSERT is literally TWO operations. It is not supported in traditional data lakes, as running an UPDATE could invalidate data that is accessed by the subsequent INSERT operation.
Using Delta Lake, however, we can do UPSERTS. Delta Lake combines these operations to guarantee atomicity to
We’ll register this as a temporary view so that this table doesn’t persist in DBFS (but we can still use SQL to query it).
upsertDF.createOrReplaceTempView("upsert_data")
%sql
SELECT * FROM upsert_data
We can use UPSERT to simultaneously INSERT our new data and UPDATE our previous records.
%sql
MERGE INTO sample_data -- Delta table
USING upsert_data -- another table
ON sample_data.ID = upsert_data.ID
-- AND
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
Notice how this data is seamlessly incorporated into sample_data
%sql
SELECT * FROM sample_data ORDER BY ID
Upsert syntax
Upserting, or merging, in Delta Lake provides fine-grained updates of your data. The following syntax shows how to perform an Upsert:
MERGE INTO customers -- Delta table
USING updates
ON customers.customerId = source.customerId
WHEN MATCHED THEN
UPDATE SET address = updates.address
WHEN NOT MATCHED
THEN INSERT (customerId, address) VALUES (updates.customerId, updates.address)
ACID Transactions: Data lakes typically have multiple data pipelines reading and writing data concurrently, and data engineers have to go through a tedious process to ensure data integrity, due to the lack of transactions. Delta Lake brings ACID transactions to your data lakes. It provides serializability, the strongest level of isolation level.
Scalable Metadata Handling: In big data, even the metadata itself can be “big data”. Delta Lake treats metadata just like data, leveraging Spark’s distributed processing power to handle all its metadata. As a result, Delta Lake can handle petabyte-scale tables with billions of partitions and files at ease.
Time Travel (data versioning): Delta Lake provides snapshots of data enabling developers to access and revert to earlier versions of data for audits, rollbacks or to reproduce experiments.
Open Format: All data in Delta Lake is stored in Apache Parquet format enabling Delta Lake to leverage the efficient compression and encoding schemes that are native to Parquet.
Unified Batch and Streaming Source and Sink: A table in Delta Lake is both a batch table, as well as a streaming source and sink. Streaming data ingest, batch historic backfill, and interactive queries all just work out of the box.
Schema Enforcement: Delta Lake provides the ability to specify your schema and enforce it. This helps ensure that the data types are correct and required columns are present, preventing bad data from causing data corruption.
Schema Evolution: Big data is continuously changing. Delta Lake enables you to make changes to a table schema that can be applied automatically, without the need for cumbersome DDL.
100% Compatible with Apache Spark API: Developers can use Delta Lake with their existing data pipelines with minimal change as it is fully compatible with Spark, the commonly used big data processing engine.
2. Key Concepts: Delta Lake Architecture
Throughout our Delta Lake discussions, we’ll often refer to the concept of Bronze/Silver/Gold tables. These levels refer to the state of data refinement as data flows through a processing pipeline.
These levels are conceptual guidelines, and implemented architectures may have any number of layers with various levels of enrichment. Below are some general ideas about the state of data in each level.
Bronze tables
Raw data (or very little processing)
Data will be stored in the Delta format (can encode raw bytes as a column)
Silver tables
Data that is directly queryable and ready for insights
Bad records have been handled, types have been enforced
Gold tables
Highly refined views of the data
Aggregate tables for BI
Feature tables for data scientists
For different workflows, things like schema enforcement and deduplication may happen in different places.
If you can use the built-in functions in sql.functions, you certainly should use them as they less likely to have bugs and can be optimized by Catalyst.
However, the built-in functions don’t contain everything you need, and sometimes you have to write custom functions, known as User Defined Functions.
We’ve seen many built-in functions (e.g. avg, lit, col, etc.). However, sometimes you might need a specific function that is not provided, so let’s look at how to define your own User Defined Function.
For example, let’s say we want to get the some initial from our name field. Let’s start by writing that function in local Python/Scala.
Now we have to define a UDF that wraps the function. This will serialize our function and send it to the executors so that we can use it in our DataFrame.
firstInitialUDF = udf(firstInitialFunction)
หรือจะกำหนด return type ไว้ก็ได้ แบบนี้
from pyspark.sql.types import *
firstInitialUDF = udf(firstInitialFunction, StringType())
UDFs cannot be optimized by the Catalyst Optimizer
The function has to be serialized and sent out to the executors
In the case of Python, there is even more overhead – we have to spin up a Python interpreter on every Executor to run the UDF (e.g. Python UDFs much slower than Scala UDFs)
Vectorized UDFs utilize Apache Arrow to speed up computation. Let’s see how that helps improve our processing time.
Apache Arrow, is an in-memory columnar data format that is used in Spark to efficiently transfer data between JVM and Python processes. See more here.
%python
from pyspark.sql.functions import pandas_udf
# We have a string input/output
@pandas_udf("string")
def vectorizedUDF(name):
return name.str[2:4]
tempDF = (spark # Our SparkSession & Entry Point
.read # Our DataFrameReader
.parquet(parquetDir) # Returns an instance of DataFrame
)
print(tempDF) # Python hack to see the data type
If you look at the API docs, count() is described like this: “Returns the number of rows in the Dataset.” count() will trigger a job to process the request and return a value.
We can now count all records in our DataFrame like this:
total = tempDF.count()
print("Record Count: {0:,}".format( total ))
Record Count: 5,000
3. cache() & persist()
The ability to cache data is one technique for achieving better performance with Apache Spark.
This is because every action requires Spark to read the data from its source (Azure Blob, Amazon S3, HDFS, etc.) but caching moves that data into the memory of the local executor for “instant” access.
cache() is just an alias for persist().
(tempDF
.cache() # Mark the DataFrame as cached
.count() # Materialize the cache
)
If you re-run that command, it should take significantly less time.
total = tempDF.count()
print("Record Count: {0:,}".format( total ))
Performance considerations of Caching Data
When Caching Data you are placing it on the workers of the cluster.
Caching takes resources, before moving a notebook into production please check and verify that you are appropriately using cache.
And as a quick side note, you can remove a cache by calling the DataFrame‘s unpersist() method but, it is not necessary.
4. show(..)
show() is a function that will allow us to print the data to the console. In the case of Python, we have one method with two optional parameters.
show(..) method effectively has two optional parameters:
n: The number of records to print to the console, the default being 20.
truncate: If true, columns wider than 20 characters will be truncated, where the default is true.
Notice how “nothing” happened – that is no job was triggered.
This is because we are simply defining the second step in our transformations.
Read in the parquet file (represented by tempDF).
Limit those records to just the first 5 (represented by limitedDF).
It’s not until we induce an action that a job is triggered and the data is processed
7. select(..)
The select(..) command is one of the most powerful and most commonly used transformations.
select(..) is described like this: “Returns a new Dataset by computing the given Column expression for each element.“
Just like limit(..), select(..)
does not trigger a job
returns a new DataFrame
simply defines the next transformation in a sequence of transformations.
# Transform the data by selecting only three columns
onlyThreeDF = (tempDF
.select("id", "first_name", "email")
)
# Now let's take a look at what the schema looks like
onlyThreeDF.printSchema()
Instead of selecting everything we wanted, drop(..) allows us to specify the columns we don’t want.
drop(..) is described like this: “Returns a new Dataset with a column dropped.“
# Transform the data by drop some column
droppedDF = (tempDF
.drop("registration_dttm")
)
# Now let's take a look at what the schema looks like
droppedDF.printSchema()
Again, drop(..) is just one more transformation – that is no job is triggered.
9. distinct() & dropDuplicates()
These two transformations do the same thing. In fact, they are aliases for one another.
You can see this by looking at the source code for these two methods
distinct(..) and dropDuplicates(..) are described like this: “Returns a new Dataset that contains only the unique rows from this Dataset….“
distinctDF = (pagecountsEnAllDF # Our original DataFrame
.select("gender") # Drop all columns except the "gender" column
.distinct() # Reduce the set of all records to just the distinct column.
)