Delta Lake Features

1. สร้างตาราง students

%sql
CREATE TABLE students (
  id INT, name STRING, value DOUBLE);

ดูคำสั่งสร้างตารางนี้ด้วย SHOW CREATE TABLE

%sql
SHOW CREATE TABLE students
%sql
CREATE TABLE spark_catalog.default.students (
  id INT,
  name STRING,
  value DOUBLE)
USING delta
TBLPROPERTIES (
  'delta.minReaderVersion' = '1',
  'delta.minWriterVersion' = '2')

Using DESCRIBE EXTENDED allows us to see important metadata about our table.

%sql
DESCRIBE EXTENDED students
%python
df1 = spark.sql('DESCRIBE EXTENDED students')
df1.show()
+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|                  id|                 int|   null|
|                name|              string|   null|
|               value|              double|   null|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|             Catalog|       spark_catalog|       |
|            Database|             default|       |
|               Table|            students|       |
|                Type|             MANAGED|       |
|            Location|dbfs:/user/hive/w...|       |
|            Provider|               delta|       |
|               Owner|                root|       |
| Is_managed_location|                true|       |
|    Table Properties|[delta.minReaderV...|       |
+--------------------+--------------------+-------+

DESCRIBE DETAIL is another command that allows us to explore table metadata.

%sql
DESCRIBE DETAIL students
%python
df2 = spark.sql('DESCRIBE DETAIL students')
df2.show(vertical=True)
-RECORD 0--------------------------------
 format           | delta                
 id               | 46681f33-7201-4c6... 
 name             | spark_catalog.def... 
 description      | null                 
 location         | dbfs:/user/hive/w... 
 createdAt        | 2023-04-17 16:24:... 
 lastModified     | 2023-04-17 16:24:32  
 partitionColumns | []                   
 numFiles         | 0                    
 sizeInBytes      | 0                    
 properties       | {}                   
 minReaderVersion | 1                    
 minWriterVersion | 2        

ดู Delta Lake Files

%python
display(dbutils.fs.ls('dbfs:/user/hive/warehouse/students'))
%python
li_file = dbutils.fs.ls('dbfs:/user/hive/warehouse/students')
df3 = sqlContext.createDataFrame(li_file)
df3.show()
+--------------------+--------------------+----+----------------+
|                path|                name|size|modificationTime|
+--------------------+--------------------+----+----------------+
|dbfs:/user/hive/w...|         _delta_log/|   0|   1681750558289|
+--------------------+--------------------+----+----------------+

Reviewing Delta Lake Transactions

%sql
DESCRIBE HISTORY students
%python
df4 = spark.sql('DESCRIBE HISTORY students')
df4.show(vertical=True)
-RECORD 0-----------------------------------
 version             | 0                    
 timestamp           | 2023-04-17 16:24:32  
 userId              | 8501686721698164     
 userName            | odl_user_915759@d... 
 operation           | CREATE TABLE         
 operationParameters | {isManaged -> tru... 
 job                 | null                 
 notebook            | {1477724271071511}   
 clusterId           | 0415-162149-6ai590aw 
 readVersion         | null                 
 isolationLevel      | WriteSerializable    
 isBlindAppend       | true                 
 operationMetrics    | {}                   
 userMetadata        | null                 
 engineInfo          | Databricks-Runtim... 

2. เพิ่มข้อมูล 3 ครั้ง

%sql
INSERT INTO students VALUES (1, "Yve", 1.0);
INSERT INTO students VALUES (2, "Omar", 2.5);
INSERT INTO students VALUES (3, "Elia", 3.3);
%python
df2 = spark.sql('DESCRIBE DETAIL students')
df2.show(vertical=True)
RECORD 0--------------------------------
 format           | delta                
 id               | 46681f33-7201-4c6... 
 name             | spark_catalog.def... 
 description      | null                 
 location         | dbfs:/user/hive/w... 
 createdAt        | 2023-04-17 16:24:... 
 lastModified     | 2023-04-17 16:51:01  
 partitionColumns | []                   
 numFiles         | 3                    
 sizeInBytes      | 2613                 
 properties       | {}                   
 minReaderVersion | 1                    
 minWriterVersion | 2  
%python
li_file = dbutils.fs.ls('dbfs:/user/hive/warehouse/students')
df3 = sqlContext.createDataFrame(li_file)
df3.show()
+--------------------+--------------------+----+----------------+
|                path|                name|size|modificationTime|
+--------------------+--------------------+----+----------------+
|dbfs:/user/hive/w...|         _delta_log/|   0|   1681750558289|
|dbfs:/user/hive/w...|part-00000-1d6df3...| 868|   1681750256000|
|dbfs:/user/hive/w...|part-00000-57bc12...| 872|   1681750261000|
|dbfs:/user/hive/w...|part-00000-ec8db3...| 873|   1681750259000|
+--------------------+--------------------+----+----------------+
%python
display(spark.sql(f"SELECT * FROM json.`dbfs:/user/hive/warehouse/students/_delta_log/00000000000000000001.json`"))

รันคำสั่งนี้จะ error

%sql
SELECT * FROM parquet.`dbfs:/user/hive/warehouse/students/part-00000-1d6df344-3187-42fd-8591-df4ed47b403f.c000.snappy.parquet`
AnalysisException: Incompatible format detected.

A transaction log for Delta was found at `dbfs:/user/hive/warehouse/students/_delta_log`,
but you are trying to read from `dbfs:/user/hive/warehouse/students/part-00000-57bc1277-813b-41c0-990d-4ba8fd05c9b1.c000.snappy.parquet` using format("parquet"). You must use
'format("delta")' when reading and writing to a delta table.

To disable this check, SET spark.databricks.delta.formatCheck.enabled=false
To learn more about Delta, see https://docs.databricks.com/delta/index.html; line 1 pos 14

ให้ SET ค่านี้ก่อน

%sql
SET spark.databricks.delta.formatCheck.enabled=false

รันใหม่จะได้ละ

%sql
SELECT * FROM parquet.`dbfs:/user/hive/warehouse/students/part-00000-1d6df344-3187-42fd-8591-df4ed47b403f.c000.snappy.parquet`

คำสั่ง CREATE VIEW

Constructs a virtual table that has no physical data based on the result-set of a SQL query. ALTER VIEW and DROP VIEW only change metadata.

Syntax

CREATE [ OR REPLACE ] [ TEMPORARY ] VIEW [ IF NOT EXISTS ] view_name
    [ column_list ]
    [ COMMENT view_comment ]
    [ TBLPROPERTIES clause ]
    AS query

column_list
   ( { column_alias [ COMMENT column_comment ] } [, ...] )

ตัวอย่าง

%sql
CREATE OR REPLACE TEMPORARY VIEW demo_tmp2(name, value) AS
VALUES
  ("Yi", 1),
  ("Ali", 2),
  ("Selina", 3)

หรือใช้คำว่า TEMP แทน TEMPORARY ก็ได้

%sql
CREATE OR REPLACE TEMP VIEW demo_tmp1(name, value) AS
VALUES
  ("Yi", 1),
  ("Ali", 2),
  ("Selina", 3)

PySpark: display a spark data frame in a table format

สร้าง PySpark DataFrame ชื่อ df

%python
df = sqlContext.createDataFrame([("foo", 1), ("bar", 2), ("baz", 3)], ("k", "v"))
%python
print(type(df))

# <class 'pyspark.sql.dataframe.DataFrame'>

แสดงตาราง

%python
df.show()

# +---+---+
# |  k|  v|
# +---+---+
# |foo|  1|
# |bar|  2|
# |baz|  3|
# +---+---+

แสดงตาราง โดยกำหนด n = 2

%python
df.show(n=2)

# +---+---+
# |  k|  v|
# +---+---+
# |foo|  1|
# |bar|  2|
# +---+---+
# only showing top 2 rows
%python
df.show(2, True)

# +---+---+
# |  k|  v|
# +---+---+
# |foo|  1|
# |bar|  2|
# +---+---+
# only showing top 2 rows

doc

%python
df.show?
Signature:
df.show(
    n: int = 20,
    truncate: Union[bool, int] = True,
    vertical: bool = False,
) -> None
Docstring:
Prints the first ``n`` rows to the console.

.. versionadded:: 1.3.0

Parameters
----------
n : int, optional
    Number of rows to show.
truncate : bool or int, optional
    If set to ``True``, truncate strings longer than 20 chars by default.
    If set to a number greater than one, truncates long strings to length ``truncate``
    and align cells right.
vertical : bool, optional
    If set to ``True``, print output rows vertically (one line
    per column value).

Examples
--------
>>> df
DataFrame[age: int, name: string]
>>> df.show()
+---+-----+
|age| name|
+---+-----+
|  2|Alice|
|  5|  Bob|
+---+-----+
>>> df.show(truncate=3)
+---+----+
|age|name|
+---+----+
|  2| Ali|
|  5| Bob|
+---+----+
>>> df.show(vertical=True)
-RECORD 0-----
 age  | 2
 name | Alice
-RECORD 1-----
 age  | 5
 name | Bob
File:      /databricks/spark/python/pyspark/sql/dataframe.py

Write Excel with PySpark

ที่ Cluster ติดตั้ง com.crealytics:spark-excel-2.12.17-3.0.1_2.12:3.0.1_0.18.1

สร้าง pyspark dataframe

%python
data = [('A', "1"),
        ('B', "2"),
        ('C', "3"),
        ('D', "4")
        ]
print(type(data))  # <class 'list'>
df = spark.createDataFrame(data)
print(type(df))    # <class 'pyspark.sql.dataframe.DataFrame'>
display(df)

เขียนไฟล์ excel

%python
path = '/mnt/xxx/tmp/'
filename = f'{path}output1.xlsx'
print(f'filename = {filename}')
df.write.format("com.crealytics.spark.excel")\
  .option("header", "true")\
  .mode("overwrite")\
  .save(filename)

ลอง %fs ls ‘/mnt/xxx/tmp/‘ จะเห็นไฟล์ dbfs:/mnt/xxx/tmp/output1.xlsx ละ

สร้าง dataframe อีกอัน

%python
columns = ['Identifier', 'Value', 'Extra Discount']
vals = [(1, 150, 0), (2, 160, 12)]
df2 = spark.createDataFrame(vals, columns)
df2.show()

# +----------+-----+--------------+
# |Identifier|Value|Extra Discount|
# +----------+-----+--------------+
# |         1|  150|             0|
# |         2|  160|            12|
# +----------+-----+--------------+

เขียนแบบ append โดยข้อมูลเริ่มต้นที่ cell B3 ถึง C35 คอลัมน์ Extra Discount เลยหายไป

%python
df2.write.format("com.crealytics.spark.excel") \
  .option("dataAddress", "'My Sheet'!B3:C35") \
  .option("header", "true") \
  .mode("append") \
  .save(filename)

Write Excel with Pandas

สร้าง pandas DataFrame

%python
import pandas as pd
import openpyxl

df = pd.DataFrame([[11, 21, 31], [12, 22, 32], [31, 32, 33]],
                  index=['one', 'two', 'three'], columns=['a', 'b', 'c'])

print(df)

#         a   b   c
# one    11  21  31
# two    12  22  32
# three  31  32  33

print(type(df))
# <class 'pandas.core.frame.DataFrame'>

เขียน pandas DataFrame ลงไฟล์

%python
path = '/tmp/'
filename = f'{path}output1.xlsx'
print(filename)
with pd.ExcelWriter(filename) as writer:  
     df.to_excel(writer, sheet_name='Sheet_name_1')

SQL Declare Variable

คำสั่ง SET

เป็นการให้ค่าจาก SQL ด้วยคำสั่ง SET แล้วก็อ่านค่าด้วย SQL

%sql
SET value = 2;
%sql
SELECT ${hiveconf:value} 
%sql
SELECT ${hiveconf:value} AS value
%sql
SET LastChangeDate = current_date()
%sql
Select ${hiveconf:LastChangeDate}

คำสั่ง spark.conf.set()

เป็นการให้ค่าจาก Python ด้วยคำสั่ง spark.conf.set() แล้วอ่านค่าด้วย SQL

%python 
spark.conf.set("ab.name", "jack") 
spark.conf.set("AB.name", "jack") 

จะสังเกตุเห็นได้ว่าที่ Python ให้ค่าทั้ง ab และ AB เพื่อให้ SQL อ่านค่าได้ทั้ง ab และ AB

%sql
SELECT '${AB.name}' AS name

Databricks extension for Visual Studio Code

Before you begin

Before you can use the Databricks extension for Visual Studio Code, your Databricks workspace and your local development machine must meet the following requirements. You must also have an access token to authenticate with Databricks.

  • Workspace requirements
  • Access token
  • Local development machine requirements

Workspace requirements

enable Files in Repos

คอนฟิกไฟล์ %USERPROFILE%\.databrickscfg เช่น C:\Users\jack\.databrickscfg

[DEFAULT]
host = https://dbc-xxx2345c-yyyy.cloud.databricks.com
token = dapi1234567xxx123456yyyy123456789012

[DEV]
host = https://dbc-xxx3456d-yyyy.cloud.databricks.com
token = dapi2345678xxx234567yyyy234567890123

Access token

You must have a Databricks personal access token. If you do not have one available, you can generate a personal access token now.

Local development machine requirements

Visual Studio Code version 1.69.1 or higher.

ติดตั้ง Extension

ทดสอบ

from pyspark.sql import SparkSession

spark: SparkSession = spark
print("Hello from Databricks")

Basic SQL

  1. แสดงรายชื่อดาต้าเบส และตาราง
  2. ใช้ SQL อ่านไฟล์
  3. อ่านข้อมูลในไฟล์ CSV มาใส่ใน Delta Table
  4. อ่านข้อมูลในไฟล์ CSV มาสร้าง Temp View
  5. สร้าง external table โดยชี้ไปที่ไฟล์ CSV
  6. คำสั่ง CREATE VIEW
  7. คำสั่ง CREATE TABLE

แสดงรายชื่อดาต้าเบส และตาราง

แสดงรายชื่อดาต้าเบส

%sql
SHOW DATABASES;

แสดงรายชื่อตารางในดาต้าเบส

%sql
USE default;
SHOW TABLES;

ใช้ SQL อ่านไฟล์

%sql
SELECT * FROM delta.`${DA.paths.datasets}/nyctaxi-with-zipcodes/data`
SELECT * FROM text.`dbfs:/databricks-datasets/Rdatasets/data-001/datasets.csv`
SELECT * FROM csv.`dbfs:/databricks-datasets/Rdatasets/data-001/datasets.csv`

ใช้ SQL อ่านไฟล์แบบ text

ใช้ backtick ` ครอบ

%sql
SELECT * FROM text.`dbfs:/databricks-datasets/Rdatasets/data-001/datasets.csv`

ใช้ SQL อ่านไฟล์แบบ CSV

%sql
SELECT * FROM csv.`dbfs:/databricks-datasets/Rdatasets/data-001/datasets.csv`

อ่านข้อมูลในไฟล์ CSV มาใส่ใน Delta Table

สร้างตารางชื่อ table1 โดยดูชื่อคอลัมน์จากคิวรีด้านบน

%sql
/*Table creation with schema*/
CREATE OR REPLACE TABLE table1 (
  Package string,
  Item string,
  Title string,
  csv string,
  doc string
);

SHOW TABLE อีกทีจะเห็นตาราง table1

ลอง SHOW CREATE TABLE

%sql
SHOW CREATE TABLE table1;

copy ข้อมูลจากไฟล์ csv ลงตาราง

%sql
/*Copying dbfs csv data into table*/
COPY INTO table1
  FROM "dbfs:/databricks-datasets/Rdatasets/data-001/datasets.csv"
  FILEFORMAT = csv
  FORMAT_OPTIONS('header'='true','inferSchema'='True');

คิวรีดูข้อมูล

%sql
SELECT * FROM table1

อ่านข้อมูลในไฟล์ CSV มาสร้าง Temp View

%sql
CREATE TEMPORARY VIEW view1 USING CSV OPTIONS (
  path "/databricks-datasets/Rdatasets/data-001/datasets.csv",
  header "true"
)
%sql
CREATE TEMPORARY VIEW diamonds USING CSV OPTIONS (
  path "/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv",
  header "true"
)

ลอง SHOW TABLES จะเห็นว่าเป็น Temporary

สร้าง external table โดยชี้ไปที่ไฟล์ CSV

%sql
CREATE TABLE table2 USING CSV 
OPTIONS ('header' = 'true')
LOCATION '/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv'

พาทไฟล์สามารถ กำหนดที่ LOCATION หรือใน OPTIONS (path) ก็ได้

%sql
DROP TABLE IF EXISTS diamonds;

CREATE TABLE diamonds USING CSV OPTIONS (
  path "/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv",
  header "true"
)

ใน OPTIONS เครื่องหมาย = จะมีหรือไม่มีก็ได้

%sql
DROP TABLE IF EXISTS diamonds;

CREATE TABLE diamonds USING CSV OPTIONS (
  path = "/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv",
  header = "true"
)

คำสั่ง CREATE VIEW

CREATE OR REPLACE TEMP VIEW demo_tmp_vw(name, value) AS VALUES
  ("Yi", 1),
  ("Ali", 2),
  ("Selina", 3);

CREATE TEMPORARY VIEW diamonds USING CSV OPTIONS (
  path "/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv",
  header "true"
);

คำสั่ง CREATE TABLE

CREATE OR REPLACE TABLE table1 (
  Package string,
  Item string,
  Title string,
  csv string,
  doc string
);

CREATE TABLE diamonds USING CSV OPTIONS (
  path = "/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv",
  header = "true"
);

CREATE TABLE diamonds USING CSV 
OPTIONS ('header' = 'true')
LOCATION '/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv';

Python subprocess

subprocess.call() – เก่าแล้ว Older high-level API

ใช้ subprocess.check_call() , subprocess.run() แทน

ตัวอย่าง 1

echo คำว่า test1 test2 ลงไปที่ไฟล์ /tmp/out.txt

%python
import subprocess
e = 'echo "test1 test2" > /tmp/out.txt'
subprocess.call(e, shell=True)

ตัวอย่าง 2

ตั้งชื่อไฟล์ของ log เป็นวันเวลาปัจจุบัน ไว้ใต้ /tmp แล้วใส่ข้อความ Hello World! ลงไป ด้วยการ echo

%python
import datetime
import subprocess
 
run_time = datetime.datetime.now() + datetime.timedelta(hours=7)
tmp_file = '/tmp/' + run_time.strftime("%Y-%m-%dT%H:%M:%S") + '.log'
print(tmp_file) # /tmp/2023-02-21T13:23:52.log

e = f'echo "Hello World!" >> {tmp_file}'
subprocess.call(e, shell=True)

ตรวจสอบว่าได้สร้างไฟล์ขึ้นมามั๊ย

%sh ls -l '/tmp/'

ดูเนื้อหาในไฟล์

%sh cat '/tmp/2023-02-21T13:23:52.log'

ใช้ Pandas อ่าน Text file

%python
import pandas as pd

# read text file
df = pd.read_fwf('/tmp/2023-02-21T14:55:00.log')
print(type(df))
print(df)
print(df.count())

OPTIMIZE

Optimizes the layout of Delta Lake data. Optionally optimize a subset of data or colocate data by column. If you do not specify colocation, bin-packing optimization is performed.

Syntax

OPTIMIZE table_name [WHERE predicate]
  [ZORDER BY (col_name1 [, ...] ) ]

Parameters

  • table_name – Identifies an existing Delta table. The name must not include a temporal specification.
  • WHERE – Optimize the subset of rows matching the given partition predicate. Only filters involving partition key attributes are supported.
  • ZORDER BY – Colocate column information in the same set of files. Co-locality is used by Delta Lake data-skipping algorithms to dramatically reduce the amount of data that needs to be read. You can specify multiple columns for ZORDER BY as a comma-separated list. However, the effectiveness of the locality drops with each additional column.

Examples

OPTIMIZE delta.`/data/events`

OPTIMIZE events

OPTIMIZE events WHERE date >= '2022-11-18'

OPTIMIZE events
WHERE date >= current_timestamp() - INTERVAL 1 day
ZORDER BY (eventType)