Table

1. เตรียมตารางด้วย SQL

สร้างตารางตัวอย่างชื่อ table_name (เป็น managed table)

%sql 
CREATE OR REPLACE TABLE table_name (
  id BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1 INCREMENT BY 1),
  name STRING,
  age INT,
  flag BOOLEAN,
  timestamp TIMESTAMP
)

คำสั่งต่างๆที่เกี่ยวข้องกับตาราง

%sql
SHOW CREATE TABLE table_name
%sql
DESCRIBE TABLE table_name
%sql
DESCRIBE HISTORY table_name

เพิ่มข้อมูลลงตาราง table_name (ที่ timestamp บวกเวลาไป 7 ชั่วโมง)

%sql
INSERT INTO
  table_name (name, age, flag, timestamp)
VALUES(
    'jack',
    '18',
    false,
    TIMESTAMPADD(HOUR, + 7, CURRENT_TIMESTAMP())
  )

ดูข้อมูลตาราง table_name

%sql
SELECT
  *
FROM
  table_name

หรือดูด้วย Delta Time Travel

%sql
SELECT
  *
FROM
  table_name VERSION AS OF 1
+----+------+-----+-------+------------------------------+
| id | name | age | flag  | timestamp                    |
+----+------+-----+-------+------------------------------+
| 1  | jack | 18  | false | 2022-09-07T16:29:05.071+0000 |
| 2  | tip  | 14  | false | 2022-09-07T17:17:00.285+0000 |
+----+------+-----+-------+------------------------------+

ลบตาราง table_name

%sql
DROP TABLE table_name

2. Spark SQL (Python)

เตรียมคิวรี

%python
query = """
SELECT * 
FROM   table_name
"""

รันคิวรี

%python
df = spark.sql(query)

ดู schema

%python
df.printSchema()
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- flag: boolean (nullable = true)
 |-- timestamp: timestamp (nullable = true)
%python
print(df)
DataFrame[id: bigint, name: string, age: int, flag: boolean, timestamp: timestamp]

ดูข้อมูล

%python
display(df)
+----+------+-----+-------+------------------------------+
| id | name | age | flag  | timestamp                    |
+----+------+-----+-------+------------------------------+
| 1  | jack | 18  | false | 2022-09-07T16:29:05.071+0000 |
| 2  | tip  | 14  | false | 2022-09-07T17:17:00.285+0000 |
+----+------+-----+-------+------------------------------+
%python
df.show()
+---+----+---+-----+--------------------+
| id|name|age| flag|           timestamp|
+---+----+---+-----+--------------------+
|  1|jack| 18|false|2022-09-07 17:10:...|
|  2| tip| 14|false|2022-09-07 17:17:...|
+---+----+---+-----+--------------------+
%python
# Select only the "name" column
df.select("name").show()
+----+
|name|
+----+
|jack|
| tip|
+----+
%python
# Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + 1).show()
+----+---------+
|name|(age + 1)|
+----+---------+
|jack|       19|
| tip|       15|
+----+---------+
%python
# Select people older than 15
df.filter(df['age'] > 15).show()
+---+----+---+-----+--------------------+
| id|name|age| flag|           timestamp|
+---+----+---+-----+--------------------+
|  1|jack| 18|false|2022-09-07 17:10:...|
+---+----+---+-----+--------------------+
%python
# Count people by age
df.groupBy("age").count().show()
+---+-----+
|age|count|
+---+-----+
| 18|    1|
| 14|    1|
+---+-----+
%python
print(type(df.collect()))
df.collect()
[Row(id=1, name='jack', age=18, flag=False, timestamp=datetime.datetime(2022, 9, 7, 17, 10, 34, 193000)),
 Row(id=2, name='tip', age=14, flag=False, timestamp=datetime.datetime(2022, 9, 7, 17, 17, 0, 285000))]
%python
for row in df.collect():
  print(row['name'])
jack
tip

Using Spark to Write Data to a Single CSV File

Apache Spark is a system designed to work with very large datasets. Its default behavior reflects the assumption that you will be working with a large dataset that is split across many nodes in a cluster.

When you use Apache Spark to write a dataframe to disk, you will notice that it writes the data into multiple files. Let’s look at an example and see this in action.

# First, we just read in some sample data so we have a Spark dataframe
df = spark.read.option("header", "true").csv("dbfs:/databricks-datasets/atlas_higgs/atlas_higgs.csv")

# Now, let's write this data out in CSV format so we can see how Spark writes the files
df.write.format("csv").mode("overwrite").save("/my-output/default-csv")

Now let’s take a look at the CSV files that Spark wrote…

dbutils.fs.ls("/my-output/default-csv")
Out[22]: [FileInfo(path='dbfs:/my-output/default-csv/_SUCCESS', name='_SUCCESS', size=0),
 FileInfo(path='dbfs:/my-output/default-csv/_committed_3363429043923895909', name='_committed_3363429043923895909', size=1256),
 FileInfo(path='dbfs:/my-output/default-csv/_started_3363429043923895909', name='_started_3363429043923895909', size=0),
 FileInfo(path='dbfs:/my-output/default-csv/part-00000-tid-3363429043923895909-2caa68d8-1164-41ee-9c37-2bce95052501-31-1-c000.csv', name='part-00000-tid-3363429043923895909-2caa68d8-1164-41ee-9c37-2bce95052501-31-1-c000.csv', size=4193821),
 FileInfo(path='dbfs:/my-output/default-csv/part-00001-tid-3363429043923895909-2caa68d8-1164-41ee-9c37-2bce95052501-32-1-c000.csv', name='part-00001-tid-3363429043923895909-2caa68d8-1164-41ee-9c37-2bce95052501-32-1-c000.csv', size=4194469),
 FileInfo(path='dbfs:/my-output/default-csv/part-00002-tid-3363429043923895909-2caa68d8-1164-41ee-9c37-2bce95052501-33-1-c000.csv', name='part-00002-tid-3363429043923895909-2caa68d8-1164-41ee-9c37-2bce95052501-33-1-c000.csv', size=4194236),
 FileInfo(path='dbfs:/my-output/default-csv/part-00003-tid-3363429043923895909-2caa68d8-1164-41ee-9c37-2bce95052501-34-1-c000.csv', name='part-00003-tid-3363429043923895909-2caa68d8-1164-41ee-9c37-2bce95052501-34-1-c000.csv', size=4194352),
...

You will notice that our dataset was not written to one, single CSV file in a nice, tidy format. Instead, the rows are spread out across a bunch of different CSV files. Spark can work easily with these multiple files. However, if you want to share this data with other systems, having multiple files can be cumbersome.

Before we look at how to change Spark’s behavior, we need to understand why Spark writes the data this way.

The key thing to always remember about Spark is that the data is always spread out across multiple computers. The data doesn’t reside in the memory of just one computer. It has been divided into multiple partitions, and those partitions are distributed across many computers.

When you tell Spark to write your data, it completes this operation in parallel. The driver tells all of the nodes to start writing their data at the same time. So each node in the cluster starts writing all of the partitions that it has at the same time all of the other nodes are writing all of their partitions. Therefore, Spark can’t write the data to just one file because all of the nodes would be tripping over each other. They would each try to write to the same file and end up overwriting the data that other nodes had written.

To solve this problem, Spark saves the data from each partition to its own file. Therefore, the number of files that get written is equal to the number of partitions that Spark created for your data.

Changing Spark’s Behavior

While Spark is designed to work with large, mult-terabyte datasets that could never fit into the memory of just one computer, we sometimes use it to work with smaller datasets. And sometime this dataset is relatively small… just a couple of gigabytes or even a few hundred megabytes. If you find yourself working with a small dataset like this, you can get Spark to write the data to just one file.

That last point is very important and bears repeating. To make this work, all of the data must be loaded into the memory of just one computer. Therefore, this technique only works on small datasets. If the nodes in your cluster each have 16GB of RAM, then you can probably make this work with 10GB of data or less. If you have a dataset that is bigger than the amount of RAM on each node, you cannot use this technique because you will risk crashing your cluster.

Fortunately, our sample dataset above is less than 100MB. So, keeping in mind the important limitation described above, this dataset should easily fit in the memory of just one PC. So let’s proceed with writing out our dataset to just one CSV file. There are a couple of ways to achieve this, and we will look at both of them.

Option 1: Use the coalesce Feature

The Spark Dataframe API has a method called coalesce that tells Spark to shuffle your data into the specified number of partitions. Since our dataset is small, we use this to tell Spark to rearrange our data into a single partition before writing out the data.

Note, though, that there is a performance penalty for this. Before writing the data, Spark must shuffle the data from all of the nodes to a single partition on a single node. This takes time and puts traffic on the cluster’s network. For a ver small dataset (like the one here in our example), this is a small penalty, but it will increase as the size of your data increases.

df\
.coalesce(1)\
.write\
.format("csv")\
.mode("overwrite")\
.save("/my-output/coalesce-csv")

Let’s take a look at the files created by Spark after using the coalesce method.

dbutils.fs.ls("/my-output/coalesce-csv")
Out[27]: [FileInfo(path='dbfs:/my-output/coalesce-csv/_SUCCESS', name='_SUCCESS', size=0),
 FileInfo(path='dbfs:/my-output/coalesce-csv/_committed_8239842462067349322', name='_committed_8239842462067349322', size=112),
 FileInfo(path='dbfs:/my-output/coalesce-csv/_started_8239842462067349322', name='_started_8239842462067349322', size=0),
 FileInfo(path='dbfs:/my-output/coalesce-csv/part-00000-tid-8239842462067349322-52e5d421-3f6b-4768-a979-71ac9a0c9ee2-45-1-c000.csv', name='part-00000-tid-8239842462067349322-52e5d421-3f6b-4768-a979-71ac9a0c9ee2-45-1-c000.csv', size=55253165)]

You will notice that Spark still wrote the data into a directory, and that directory has multiple files. There are the Spark control files (e.g. the “SUCCESS” file, the “started” file, and the “committed” file). But there is only Cone SV file containing our data. Unfortunately, this file does not have a friendly name. If we want to share this file, we may want to rename it to something shorter. We can Python to clean up the control files and rename the data file.

data_location = "/my-output/coalesce-csv/"

files = dbutils.fs.ls(data_location)
csv_file = [x.path for x in files if x.path.endswith(".csv")][0]
dbutils.fs.mv(csv_file, data_location.rstrip('/') + ".csv")
dbutils.fs.rm(data_location, recurse = True)
Out[44]: True

Now let’s take one more look at our files to see that we have just one CSV file with a nice, friendly name.

dbutils.fs.ls("/my-output")
Out[45]: [FileInfo(path='dbfs:/my-output/coalesce-csv.csv', name='coalesce-csv.csv', size=55253165),
 FileInfo(path='dbfs:/my-output/default-csv/', name='default-csv/', size=0)]

Option 2: Use collect and Pandas

If you’ve used Python for data science work, you may be familiar with the pandas package. This popular tool allows you to create in-memory dataframes on a single computer. If your Spark dataframe is small enough to fit into the RAM of your cluster’s driver node, then you can simply convert your Spark dataframe to a pandas dataframe. Then you can use the standard pandas functionality to save your pandas dataframe to a single CSV file.

pd = df.toPandas()
pd.to_csv("/dbfs/my-output/pandas.csv")

And now if we look at our output directory, we will see our new CSV file.

dbutils.fs.ls("/my-output")
Out[52]: [FileInfo(path='dbfs:/my-output/coalesce-csv.csv', name='coalesce-csv.csv', size=55253165),
 FileInfo(path='dbfs:/my-output/default-csv/', name='default-csv/', size=0),
 FileInfo(path='dbfs:/my-output/pandas.csv', name='pandas.csv', size=56892564)]

That was super easy! But you must be very careful with this approach. It will only work with small datasets. If you try to convert a large dataframe to a pandas dataframe, you could crash the driver node of your cluster. Make sure your driver node has enough RAM to hold the entire dataset.

One other note on this approach. You will notice that throughout this notebook we have written data to the DBFS. We’ve done this using paths relative to the root of the DBFS, like: /my-output/coalesce-csv. In Databricks, Spark and the dbutils tool are all “DBFS-aware”. Whenever you supply a filepath to these tools, it assumes that you want to use the DBFS. Non-Spark tools (like the pandas tool) are not “DBFS-aware”. Whenever you give them a filepath, they assume you want to use the filesystem of the driver node. Therefore, you must add /dbfs/ to the beginning of your filepath so these tools will look in the right place. For example, when we used the to_csv method from the pandas package, we had to use /dbfs/my-output/pandas.csv as our location.

Retail Revenue & Supply Chain – Databricks SQL

Analyze key retail and supply chain performance indicators for a fictional enterprise.

  1. Counter – Overall Customer Count
  2. Counter – TPCH – Number Suppliers
  3. Map – National Revenue Map
  4. Bar- National Revenue Trends
  5. Table – Customer Value
  6. Line – Order Revenue

1. Counter – Overall Customer Count

SELECT
  COUNT(distinct(c_custkey))
FROM
  `samples`.`tpch`.`customer`

2. Counter – TPCH – Number Suppliers

SELECT
  COUNT(distinct(s_suppkey)) AS num_suppliers
FROM
  `samples`.`tpch`.`supplier`

3. Map – National Revenue Map

SELECT
    initcap(n_name) AS `Nation`, 
    SUM(l_extendedprice * (1 - l_discount) * (length(n_name)/100)) AS revenue
FROM
    `samples`.`tpch`.`customer`,
    `samples`.`tpch`.`orders`,
    `samples`.`tpch`.`lineitem`,
    `samples`.`tpch`.`supplier`,
    `samples`.`tpch`.`nation`,
    `samples`.`tpch`.`region`
WHERE
    c_custkey = o_custkey
    AND l_orderkey = o_orderkey
    AND l_suppkey = s_suppkey
    AND c_nationkey = s_nationkey
    AND s_nationkey = n_nationkey
    AND n_regionkey = r_regionkey
GROUP BY
    INITCAP(n_name)
ORDER BY
    revenue DESC;

4. Bar- National Revenue Trends

SELECT
    year(o_orderdate) AS year,
    n_name AS nation,
    sum(l_extendedprice * (1 - l_discount) * (((length(n_name))/100) + (year(o_orderdate)-1993)/100)) AS revenue
FROM
    `samples`.`tpch`.`customer`,
    `samples`.`tpch`.`orders`,
    `samples`.`tpch`.`lineitem`,
    `samples`.`tpch`.`supplier`,
    `samples`.`tpch`.`nation`,
    `samples`.`tpch`.`region`
WHERE
    c_custkey = o_custkey
    AND l_orderkey = o_orderkey
    AND l_suppkey = s_suppkey
    AND c_nationkey = s_nationkey
    AND s_nationkey = n_nationkey
    AND n_regionkey = r_regionkey
    AND n_name in ('ARGENTINA', 'UNITED KINGDOM', 'FRANCE','BRAZIL', 'CHINA', 'UNITED STATES', 'JAPAN', 'JORDAN')
    AND o_orderdate >= DATE '1994-01-01'
GROUP BY
    1,2
ORDER BY
    nation ASC LIMIT 1000;

5. Table – Customer Value

SELECT
  customer_id AS `Customer ID #`,
  concat(
    '<div class="bg-',
    CASE
      WHEN total_revenue BETWEEN 0
      AND 1500000 THEN 'success'
      WHEN total_revenue BETWEEN 1500001
      AND 3000000 THEN 'warning'
      WHEN total_revenue BETWEEN 3000001
      AND 5000000 THEN 'danger'
      ELSE 'danger'
    END,
    '  text-center"> $',
    format_number(total_revenue, 0),
    '</div>'
  ) AS `Total Customer Revenue`
FROM
  (
    SELECT
      o_custkey AS customer_id,
      sum(o_totalprice) as total_revenue
    FROM
      `samples`.`tpch`.`orders`
    GROUP BY
      1
    HAVING
      total_revenue > 0
  )
ORDER BY
  1
LIMIT
  400

6. Line – Order Revenue

SELECT
  o_orderdate AS Date,
  o_orderpriority AS Priority,
  sum(o_totalprice) AS `Total Price`
FROM
  `samples`.`tpch`.`orders`
WHERE
  o_orderdate > '1994-01-01'
  AND o_orderdate < '1994-01-31'
GROUP BY
  1,
  2
ORDER BY
  1,
  2

NYC Taxi Trip Analysis – Databricks SQL

Explore NYC taxi rides over a one month time frame.

  1. ตาราง nyctaxi.trips
  2. Counter – Total Trips
  3. Table – Route Revenues
  4. Chart – Pickup Hour Distribution
  5. Scatter – Daily Fare to Distance Analysis

1. ตาราง nyctaxi.trips

SHOW CREATE TABLE samples.nyctaxi.trips
CREATE TABLE samples.nyctaxi.trips (
  tpep_pickup_datetime TIMESTAMP,
  tpep_dropoff_datetime TIMESTAMP,
  trip_distance DOUBLE,
  fare_amount DOUBLE,
  pickup_zip INT,
  dropoff_zip INT
) USING delta LOCATION 'dbfs:/databricks-datasets/nyctaxi-with-zipcodes/subsampled'
SELECT * 
FROM samples.nyctaxi.trips 
LIMIT 5
#tpep_pickup_datetimetpep_dropoff_datetimetrip_distancefare_amountpickup_zipdropoff_zip
12016-02-14 16:52:13.0002016-02-14 17:16:04.0004.9419.001028210171
22016-02-04 18:44:19.0002016-02-04 18:46:00.0000.283.501011010110
32016-02-17 17:13:57.0002016-02-17 17:17:55.0000.705.001010310023
42016-02-18 10:36:07.0002016-02-18 10:41:45.0000.806.001002210017
52016-02-22 14:14:41.0002016-02-22 14:31:52.0004.5117.001011010282

2. Counter – Total Trips

USE CATALOG SAMPLES;
SELECT
  count(*) as total_trips
FROM
  `samples`.`nyctaxi`.`trips`
WHERE
  tpep_pickup_datetime BETWEEN TIMESTAMP '{{ pickup_date.start }}'
  AND TIMESTAMP '{{ pickup_date.end }}'
  AND pickup_zip IN ({{ pickupzip }})

Counter

3. Table – Route Revenues

USE CATALOG SAMPLES;
SELECT
  T.route as `Route`,
  T.frequency as `Route Frequency`,
  T.total_fare as `Total Fares`
FROM
  (
    SELECT
      concat(pickup_zip, '-', dropoff_zip) AS route,
      count(*) as frequency,
      SUM(fare_amount) as total_fare
    FROM
      `samples`.`nyctaxi`.`trips`
    WHERE
      tpep_pickup_datetime BETWEEN TIMESTAMP '{{ pickup_date.start }}'
      AND TIMESTAMP '{{ pickup_date.end }}'
      AND pickup_zip IN ({{ pickupzip }})
    GROUP BY
      1
  ) T
ORDER BY
  1 ASC
LIMIT
  200

Table

4. Chart – Pickup Hour Distribution

USE CATALOG SAMPLES;
SELECT
  CASE
    WHEN T.pickup_hour = 0 THEN '00:00'
    WHEN T.pickup_hour = 1 THEN '01:00'
    WHEN T.pickup_hour = 2 THEN '02:00'
    WHEN T.pickup_hour = 3 THEN '03:00'
    WHEN T.pickup_hour = 4 THEN '04:00'
    WHEN T.pickup_hour = 5 THEN '05:00'
    WHEN T.pickup_hour = 6 THEN '06:00'
    WHEN T.pickup_hour = 7 THEN '07:00'
    WHEN T.pickup_hour = 8 THEN '08:00'
    WHEN T.pickup_hour = 9 THEN '09:00'
    WHEN T.pickup_hour = 10 THEN '10:00'
    WHEN T.pickup_hour = 11 THEN '11:00'
    WHEN T.pickup_hour = 12 THEN '12:00'
    WHEN T.pickup_hour = 13 THEN '13:00'
    WHEN T.pickup_hour = 14 THEN '14:00'
    WHEN T.pickup_hour = 15 THEN '15:00'
    WHEN T.pickup_hour = 16 THEN '16:00'
    WHEN T.pickup_hour = 17 THEN '17:00'
    WHEN T.pickup_hour = 18 THEN '18:00'
    WHEN T.pickup_hour = 19 THEN '19:00'
    WHEN T.pickup_hour = 20 THEN '20:00'
    WHEN T.pickup_hour = 21 THEN '21:00'
    WHEN T.pickup_hour = 22 THEN '22:00'
    WHEN T.pickup_hour = 23 THEN '23:00'
    ELSE 'N/A'
  END AS `Pickup Hour`,
  T.num AS `Number of Rides`
FROM
  (
    SELECT
      hour(tpep_pickup_datetime) AS pickup_hour,
      COUNT(*) AS num
    FROM
      `samples`.`nyctaxi`.`trips`
    WHERE
      tpep_pickup_datetime BETWEEN TIMESTAMP '{{ pickup_date.start }}'
      AND TIMESTAMP '{{ pickup_date.end }}'
      AND pickup_zip IN ({{ pickupzip }})
    GROUP BY
      1
  ) T

Chart

5. Scatter – Daily Fare to Distance Analysis

USE CATALOG SAMPLES;
SELECT
  T.weekday,
  CASE
    WHEN T.weekday = 1 THEN 'Sunday'
    WHEN T.weekday = 2 THEN 'Monday'
    WHEN T.weekday = 3 THEN 'Tuesday'
    WHEN T.weekday = 4 THEN 'Wednesday'
    WHEN T.weekday = 5 THEN 'Thursday'
    WHEN T.weekday = 6 THEN 'Friday'
    WHEN T.weekday = 7 THEN 'Saturday'
    ELSE 'N/A'
  END AS day_of_week,
  T.fare_amount,
  T.trip_distance
FROM
  (
    SELECT
      dayofweek(tpep_pickup_datetime) as weekday,
      *
    FROM
      `samples`.`nyctaxi`.`trips`
    WHERE
      (
        pickup_zip in ({{ pickupzip }})
        OR pickup_zip in (10018)
      )
      AND tpep_pickup_datetime BETWEEN TIMESTAMP '{{ pickup_date.start }}'
      AND TIMESTAMP '{{ pickup_date.end }}'
      AND trip_distance < 10
  ) T
ORDER BY
  T.weekday

Scatter

Databricks – Date and Time

current_date function (Databricks SQL)

Returns the current date at the start of query evaluation.

current_date()

Arguments

This function takes no arguments.

Returns

A DATE.

The braces are optional.

Examples

> SELECT current_date()
 2022-08-23
> SELECT current_date;
 2022-08-23

now function (Databricks SQL)

Returns the current timestamp at the start of query evaluation.

now()

Arguments

This function takes no arguments.

Returns

A TIMESTAMP.

Examples

> SELECT now()
 2022-08-23T04:57:51.871+0000
> SELECT current_timestamp()
> SELECT current_timestamp

to_date function (Databricks SQL)

Returns expr cast to a date using an optional formatting.

to_date(expr [, fmt] )

Returns

A DATE.

> SELECT to_date('2022-08-24 07:00:00');
 2022-08-24
> SELECT to_date('2022-08-24', 'yyyy-MM-dd');
 2022-08-24

from_unixtime function (Databricks SQL)

Returns unixTime in fmt.

from_unixtime(unixTime [, fmt])

Arguments

  • unixTime: A BIGINT expression representing seconds elapsed since 1969-12-31 at 16:00:00 (แต่เหมือนจะเป็น 1970-01-01 at 00:00:00).
  • fmt: An optional STRING expression with a valid format.

Returns

A STRING.

See Datetime patterns (Databricks SQL) for valid formats. The ‘yyyy-MM-dd HH:mm:ss’ pattern is used if omitted.

Examples

> SELECT from_unixtime(0);
 1970-01-01 00:00:00
> SELECT from_unixtime(0, 'yyyy-MM-dd HH:mm:ss');
 1970-01-01 00:00:00

to_unix_timestamp function (Databricks SQL)

Returns the timestamp in expr as a UNIX timestamp.

to_unix_timestamp(expr [, fmt] )

Arguments

  • expr: A STRING expression representing a timestamp.
  • fmt: An optional format STRING expression.

Returns

A BIGINT.

If fmt is supplied, it must conform with Datetime patterns (Databricks SQL).

If fmt is not supplied, the function is a synonym for cast(expr AS TIMESTAMP).

If fmt is malformed or its application does not result in a well formed timestamp, the function raises an error.

Examples

> SELECT to_unix_timestamp(current_date())
 1661299200
> SELECT to_unix_timestamp('2022-08-24', 'yyyy-MM-dd')
 1661299200
> SELECT to_unix_timestamp(current_timestamp())
 1661328640
> SELECT  to_unix_timestamp('2022-08-24 08:11:00', 'yyyy-MM-dd HH:mm:ss')
 1661328660
> SELECT to_unix_timestamp(current_timestamp()) - to_unix_timestamp(current_date())
 29230

Databricks SQL – Basic SQL

  1. Retrieving Data
  2. Column Expressions
  3. Updating Data
  4. Subqueries
  5. Joins
  6. Aggregations

1. Retrieving Data

SELECT

This simple command retrieves data from a table. The “*” represents “Select All,” so the command is selecting all data from the table

However, note that only 1,000 rows were retrieved. Databricks SQL defaults to only retrieving 1,000 rows from a table. If you wish to retrieve more, deselect the checkbox “LIMIT 1000”.

USE demo;
SELECT * FROM customers;

SELECT … AS

By adding the AS keyword, we can change the name of the column in the results.

Note that the column customer_name has been renamed Customer

USE demo;
SELECT customer_name AS Customer
FROM customers;

DISTINCT

If we add the DISTINCT keyword, we can ensure that we do not repeat data in the table.

There are more than 1,000 records that have a state in the state field. But, we only see 51 results because there are only 51 distinct state names.

USE demo;
SELECT DISTINCT state FROM customers;

WHERE

The WHERE keyword allows us to filter the data.

We are selecting from the customers table, but we are limiting the results to those customers who have a loyalty_segment of 3.

USE demo;
SELECT * FROM customers WHERE loyalty_segment = 3;

GROUP BY

We can run a simple COUNT aggregation by adding count() and GROUP BY to our query.

GROUP BY requires an aggregating function. We will discuss more aggregations later on.

USE demo;
SELECT loyalty_segment, count(loyalty_segment)
FROM customers
GROUP BY loyalty_segment;

ORDER BY

By adding ORDER BY to the query we just ran, we can place the results in a specific order.

ORDER BY defaults to ordering in ascending order. We can change the order to descending by adding DESC after the ORDER BY clause.

USE demo;
SELECT loyalty_segment, count(loyalty_segment)
FROM customers
GROUP BY loyalty_segment
ORDER BY loyalty_segment;

2. Column Expressions

Mathematical Expressions of Two Columns

In our queries, we can run calculations on the data in our tables. This can range from simple mathematical calculations to more complex computations involving built-in functions.

The results show that the Calculated Discount, the one we generated using Column Expressions, matches the Discounted Price.

USE demo;
SELECT sales_price - sales_price * promo_disc AS Calculated_Discount,
discounted_price AS Discounted_Price 
FROM promo_prices;

Built-In Functions — String Column Manipulation

There are many, many Built-In Functions. We are going to talk about just a handful, so you can get a feel for how they work.

We are going to use a built-in function called lower(). This function takes a string expression and returns the same expression with all characters changed to lowercase. Let’s have a look.

USE demo;
SELECT lower(city) AS City 
FROM customers;

Although the letters are now all lowercase, they are not the way the need to be. We want to have the first letter of each word capitalized.

USE demo;
SELECT initcap(city) AS City 
FROM customers;

Date Functions

We want to use a function to make the date more human-readable. Let’s use from_unixtime().

The date looks better, but let’s adjust the formatting. Formatting options for many of the date and time functions are available here.

USE demo;
SELECT from_unixtime(promo_began, 'd MMM, y') AS Beginning_Date 
FROM promo_prices;

Date Calculations

In this code, we are using the function current_date() to get today’s date. We are then nesting from_unixtime() inside to_date in order to convert promo_began to a date object. We can then run the calculation.

USE demo;
SELECT current_date() - to_date(from_unixtime(promo_began)) FROM promo_prices;

CASE … WHEN

Often, it is important for us to use conditional logic in our queries. CASE … WHEN provides us this ability.

This statement allows us to change numeric values that represent loyalty segments into human-readable strings. It is certainly true that this association would more-likely occur using a join on two tables, but we can still see the logic behind CASE … WHEN

USE demo;
SELECT customer_name, loyalty_segment,
    CASE 
        WHEN loyalty_segment = 0 THEN 'Rare'
        WHEN loyalty_segment = 1 THEN 'Occasional'
        WHEN loyalty_segment = 2 THEN 'Frequent'
        WHEN loyalty_segment = 3 THEN 'Daily'
    END AS Loyalty 
FROM customers;

3. Updating Data

UPDATE

Let’s make those changes.

The UPDATE does exactly what it sounds like: It updates the table based on the criteria specified.

USE demo;
UPDATE customers SET city = initcap(lower(city));

SELECT city FROM customers;

INSERT INTO

In addition to updating data, we can insert new data into the table.

INSERT INTO is a command for inserting data into a table.

USE demo;
INSERT INTO loyalty_segments 
    (loyalty_segment_id, loyalty_segment_description, unit_threshold, valid_from, valid_to)
VALUES
    (4, 'level_4', 100, current_date(), Null);

SELECT * FROM loyalty_segments;

INSERT TABLE

INSERT TABLE is a command for inserting entire tables into other tables. There are two tables suppliers and source_suppliers that currently have the exact same data.

After selecting from the table again, we note that the number of rows has doubled. This is because INSERT TABLE inserts all data in the source table, whether or not there are duplicates.

USE demo;
INSERT INTO suppliers TABLE source_suppliers;

SELECT * FROM suppliers;

INSERT OVERWRITE

If we want to completely replace the contents of a table, we can use INSERT OVERWRITE.

After running INSERT OVERWRITE and then retrieving a count(*) from the table, we see that we are back to the original count of rows in the table. INSERT OVERWRITE has replaced all the rows.

USE demo;
INSERT OVERWRITE suppliers TABLE source_suppliers;
SELECT * FROM suppliers;

4. Subqueries

Let’s create two new tables.

These two command use subqueries to SELECT from the customers table using specific criteria. The results are then fed into CREATE OR REPLACE TABLE and CREATE OR REPLACE TABLE statements. Incidentally, this type of statement is often called a CTAS statement for CREATE OR REPLACE TABLE … AS.

USE demo;
CREATE OR REPLACE TABLE high_loyalty_customers AS
    SELECT * FROM customers WHERE loyalty_segment = 3;
CREATE OR REPLACE TABLE low_loyalty_customers AS
    SELECT * FROM customers WHERE loyalty_segment = 1;

5. Joins

We are now going to run a couple of JOIN queries. The first is the most common JOIN, an INNER JOIN. Since INNER JOIN is the default, we can just write JOIN.

In this statement, we are joining the customers table and the loyalty_segments tables. When the loyalty_segment from the customers table matches the loyalty_segment_id from the loyalty_segments table, the rows are combined. We are then able to view the customer_name, loyalty_segment_description, and unit_threshold from both tables.

USE demo;
SELECT
    customer_name,
    loyalty_segment_description,
    unit_threshold
FROM customers
INNER JOIN loyalty_segments
ON customers.loyalty_segment = loyalty_segments.loyalty_segment_id;

CROSS JOIN

Even though the CROSS JOIN isn’t used very often, I wanted to demonstrate it.

First of all, note the use of UNION ALL. All this does is combine the results of all three queries, so we can view them all in one results set. The Customers row shows the count of rows in the customers table. Likewise, the Sales row shows the count of the sales table. Crossed shows the number of rows after performing the CROSS JOIN.

USE demo;
SELECT "Sales", count(*) FROM sales
UNION ALL
SELECT "Customers", count(*) FROM customers
UNION ALL
SELECT "Crossed", count(*) FROM customers
  CROSS JOIN sales;

6. Aggregations

Now, let’s move into aggregations. There are many aggregating functions you can use in your queries. Here are just a handful.

Again, we are viewing the results of a handful of queries using a UNION ALL.

USE demo;
SELECT "Sum" Function_Name, sum(units_purchased) AS Value
FROM customers 
WHERE state = 'CA'
UNION ALL
SELECT "Min", min(discounted_price) AS Lowest_Discounted_Price 
FROM promo_prices
UNION ALL
SELECT "Max", max(discounted_price) AS Highest_Discounted_Price 
FROM promo_prices
UNION ALL
SELECT "Avg", avg(total_price) AS Mean_Total_Price 
FROM sales
UNION ALL
SELECT "Standard Deviation", std(total_price) AS SD_Total_Price 
FROM sales
UNION ALL
SELECT "Variance", variance(total_price) AS Variance_Total_Price
FROM sales;

Databricks SQL – Delta Commands

SELECT on Delta Tables

So far, the SQL commands we have used are generic to most flavors of SQL. In the next few queries, we are going to look at commands that are specific to using SELECT on Delta tables.

Delta tables keep a log of changes that we can view by running the command below.

After running DESCRIBE HISTORY, we can see that we are on version number 0 and we can see a timestamp of when this change was made.

USE demo;
DESCRIBE HISTORY customers;

SELECT on Delta Tables — Updating the Table

We are going to make a change to the table.

The code uses an UPDATE statement to make a change to the table. We will be discussing UPDATE later on. For now, we just need to understand that a change was made to the table. We also reran our DESCRIBE HISTORY command, and note that we have a new version in the log, with a new timestamp.

USE demo;
UPDATE customers SET loyalty_segment = 10 WHERE loyalty_segment = 0;
DESCRIBE HISTORY customers;

SELECT on Delta Tables — VERSION AS OF

We can now use a special predicate for use with Delta tables: VERSION AS OF

By using VERSION AS OF, we can SELECT from specific versions of the table. This feature of Delta tables is called “Time Travel,” and is very powerful.

We can also use TIMESTAMP AS OF to SELECT based on a table’s state on a specific date, and you can find more information in the documentation.

USE demo;
SELECT loyalty_segment FROM customers VERSION AS OF 1;

MERGE INTO

Certainly, there are times when we want to insert new data but ensure we don’t re-insert matched data. This is where we use MERGE INTO. MERGE INTO will merge two tables together, but you specify in which column to look for matched data and what to do when a match is found. Let’s run the code and examine the command in more detail.

USE demo;
MERGE INTO suppliers
    USING source_suppliers
    ON suppliers.SUPPLIER_ID = source_suppliers.SUPPLIER_ID
    WHEN NOT MATCHED THEN INSERT *;
SELECT count(*) FROM suppliers;

SQL warehouses

What is a Databricks SQL warehouse?

This article introduces SQL warehouses (formerly SQL endpoints) and describes how to work with them using the Databricks SQL UI. A SQL warehouse is a compute resource that lets you run SQL commands on data objects within Databricks SQL. Compute resources are infrastructure resources that provide processing capabilities in the cloud.

SQL endpoints name changed to SQL warehouses

Databricks changed the name from SQL endpoint to SQL warehouse because it is more than just an API entry point for running SQL commands. A SQL warehouse is a compute resource for all your data warehousing needs, an integral part of the Lakehouse Platform.

Other compute resource types include Azure Databricks clusters. To work with SQL warehouses using the API, see SQL Warehouses APIs 2.0.

Cluster size

The table in this section maps SQL warehouse cluster sizes to Azure Databricks cluster driver size and worker counts. The driver size only applies to Classic SQL warehouses.

Cluster sizeInstance type for driver (applies only to Classic warehouses)Worker count
2X-SmallStandard_E8ds_v41
X-SmallStandard_E8ds_v42
SmallStandard_E16ds_v44
MediumStandard_E32ds_v48
LargeStandard_E32ds_v416
X-LargeStandard_E64ds_v432
2X-LargeStandard_E64ds_v464
3X-LargeStandard_E64ds_v4128
4X-LargeStandard_E64ds_v4256

The instance size of all workers is Standard_E8ds_v4.

Each driver and worker has eight 128 GB Standard LRS managed disks attached. Attached disks are charged hourly.

Databricks SQL – Ingest Data

Ingest Data Using Databricks SQL

We can ingest data into Databricks SQL in a variety of ways. In this first example, we are going to start with a .csv file that exists in an object store and finish with a Delta table that contains all the data from that .csv file.

There are several things to note in these commands:

  • Since the file we are ingesting is a .csv file, we state USING csv
  • We are setting three options:
    • path – the path to the object store
    • header – whether or not the .csv file contains a header row
    • inferSchema – whether or not Databricks should infer the schema from the contents of the file
  • We are creating a second table, with default settings from the contents of the first table, using a CTAS statement (CREATE TABLE AS SELECT)).

The reason we are creating a second table that is a copy of the first table is because we want the resulting table to be in Delta format, which gives us the most options. Because the second table uses default options, it will be a Delta table.

USE demo;
DROP TABLE IF EXISTS web_events_csv;
CREATE TABLE web_events_csv 
    USING csv 
    OPTIONS (
        path='wasb://courseware@dbacademy.blob.core.windows.net/data-analysis-with-databricks/v01/web_events/web-events.csv',
        header="true",
        inferSchema="true"
    );
DROP TABLE IF EXISTS web_events;
CREATE OR REPLACE TABLE web_events AS
    SELECT * FROM web_events_csv;

The LOCATION Keyword

We can ingest data in-place using the LOCATION keyword to create an external/unmanaged table. There is a dataset, called Sales, that is currently in an object store. Running the command below creates an external table that is associated with this dataset.

The table’s data is stored in the external location, but the table itself is registered in the metastore. We can query the data just like any other table in the schema.

USE demo;
DROP TABLE IF EXISTS external_table;
CREATE TABLE external_table
    LOCATION 'wasbs://courseware@dbacademy.blob.core.windows.net/data-analysis-with-databricks/v01/sales';
SELECT * FROM external_table;

COPY INTO

We use COPY INTO to load data from a file location into a Delta table. COPY INTO is a re-triable and idempotent operation, so files in the source location that have already been loaded are skipped.

The first command creates an empty Delta table. Note that you must specify a schema when creating an empty Delta table.

The second command copies data from an object store location into the web-events table. Note that the file type for the files in the object store location is specified as “JSON”. The last part of the COPY INTO command is a file name, a list of file names, or a directory of files.

USE demo;
DROP TABLE IF EXISTS gym_logs;
CREATE TABLE gym_logs (first_timestamp DOUBLE, gym Long, last_timestamp DOUBLE, mac STRING);
COPY INTO gym_logs 
    FROM 'wasbs://courseware@dbacademy.blob.core.windows.net/data-analysis-with-databricks/v01/gym-logs'
    FILEFORMAT = JSON
    FILES = ('20191201_2.json');

We can run the same COPY INTO command as above and add a second one. The first one will be skipped because the file has already been loaded.

USE demo;
COPY INTO gym_logs 
    FROM 'wasbs://courseware@dbacademy.blob.core.windows.net/data-analysis-with-databricks/v01/gym-logs'
    FILEFORMAT = JSON
    FILES = ('20191201_2.json');
COPY INTO gym_logs
    FROM 'wasbs://courseware@dbacademy.blob.core.windows.net/data-analysis-with-databricks/v01/gym-logs'
    FILEFORMAT = JSON
    FILES = ('20191201_3.json');

Do a count on the table you just populated using COPY INTO. Then, rerun the COPY INTO code above, and do another count. Notice that ‘COPY INTO’ will not input data from files that have already been ingested using COPY INTO.

Next, let’s add another COPY INTO command, but this time, we will use the PATTERN option. This allows us to load any file that fits a specific pattern.

USE demo;
COPY INTO gym_logs 
    FROM 'wasbs://courseware@dbacademy.blob.core.windows.net/data-analysis-with-databricks/v01/gym-logs'
    FILEFORMAT = JSON
    FILES = ('20191201_2.json');
COPY INTO gym_logs
    FROM 'wasbs://courseware@dbacademy.blob.core.windows.net/data-analysis-with-databricks/v01/gym-logs'
    FILEFORMAT = JSON
    FILES = ('20191201_3.json');
COPY INTO gym_logs 
    FROM 'wasbs://courseware@dbacademy.blob.core.windows.net/data-analysis-with-databricks/v01/gym-logs'
    FILEFORMAT = JSON
    PATTERN = '20191201_[0-9].json';

Privileges

Data object owners and Databricks administrators can grant and revoke a variety of privileges on securable objects. These objects include functions, files, tables, views, and more. These privileges can be granted using SQL or using the Data Explorer.

This statement returns the privileges granted on this schema.

SHOW GRANT ON SCHEMA demo;

GRANT

If we wish to grant privileges to a user, we can run GRANT commands for the specific privileges we wish to grant. The privileges available include USAGESELECTCREATEREAD FILES, and more.

We need to start by granting USAGE to a user. We can then grant other privileges. If we want to grant all privileges, we can use GRANT ALL PRIVILEGES.

GRANT USAGE ON SCHEMA demo TO `phaisarn`;
GRANT SELECT ON SCHEMA demo TO `phaisarn`;

Examples

> GRANT CREATE ON SCHEMA <schema-name> TO `alf@melmak.et`;

> GRANT ALL PRIVILEGES ON TABLE forecasts TO finance;

> GRANT SELECT ON TABLE sample_data TO USERS;

REVOKE

We can revoke privileges on securable objects in the exact same way.

We don’t want to actually run the command because we don’t want to try to revoke our own privileges, but here is the command:

REVOKE ALL PRIVILEGES ON SCHEMA `schema_name` from `user_name`;

Examples

> REVOKE ALL PRIVILEGES ON SCHEMA default FROM `alf@melmak.et`;

> REVOKE SELECT ON TABLE t FROM aliens;

Data Explorer

While we can certainly grant and revoke privileges using SQL, we can also use the Data Explorer.

  1. Click “Data” in the sidebar menu.
  2. If needed, select your schema in the dropdown
  3. Select the table, “gym_logs” from the list
  4. Click “Permissions”
  5. Use the “Grant” and “Revoke” buttons to change permission settings.

Databricks SQL – CTEs

Common Table Expressions (CTEs)

Here is an example of a CTE. We are going to query the view we just created, but we are going to filter based on the total_price being above 20000. We are calling that sales_below_20000 and then immediately querying the DISTINCT customer_name from that CTE.

USE demo;
WITH sales_below_20000 AS
    (SELECT *
     FROM high_sales
     WHERE total_price < 20000)
SELECT DISTINCT customer_name FROM sales_below_20000;

Common table expression (CTE) (Databricks SQL)

Defines a temporary result set that you can reference possibly multiple times within the scope of a SQL statement. A CTE is used mainly in a SELECT statement.

WITH common_table_expression [, ...]

common_table_expression
  view_identifier [ ( column_identifier [, ...] ) ] [ AS ] ( query )

Examples

-- CTE with multiple column aliases
> WITH t(x, y) AS (SELECT 1, 2)
  SELECT * FROM t WHERE x = 1 AND y = 2;
   1   2

-- CTE in CTE definition
> WITH t AS (
    WITH t2 AS (SELECT 1)
    SELECT * FROM t2)
  SELECT * FROM t;
   1

-- CTE in subquery
> SELECT max(c) FROM (
    WITH t(c) AS (SELECT 1)
    SELECT * FROM t);
      1

-- CTE in subquery expression
> SELECT (WITH t AS (SELECT 1)
          SELECT * FROM t);
                1

-- CTE in CREATE VIEW statement
> CREATE VIEW v AS
    WITH t(a, b, c, d) AS (SELECT 1, 2, 3, 4)
    SELECT * FROM t;
> SELECT * FROM v;
   1   2   3   4

-- CTE names are scoped
> WITH t  AS (SELECT 1),
       t2 AS (
        WITH t AS (SELECT 2)
        SELECT * FROM t)
SELECT * FROM t2;
   2