Databricks SQL – CTEs

Common Table Expressions (CTEs)

Here is an example of a CTE. We are going to query the view we just created, but we are going to filter based on the total_price being above 20000. We are calling that sales_below_20000 and then immediately querying the DISTINCT customer_name from that CTE.

USE demo;
WITH sales_below_20000 AS
    (SELECT *
     FROM high_sales
     WHERE total_price < 20000)
SELECT DISTINCT customer_name FROM sales_below_20000;

Common table expression (CTE) (Databricks SQL)

Defines a temporary result set that you can reference possibly multiple times within the scope of a SQL statement. A CTE is used mainly in a SELECT statement.

WITH common_table_expression [, ...]

common_table_expression
  view_identifier [ ( column_identifier [, ...] ) ] [ AS ] ( query )

Examples

-- CTE with multiple column aliases
> WITH t(x, y) AS (SELECT 1, 2)
  SELECT * FROM t WHERE x = 1 AND y = 2;
   1   2

-- CTE in CTE definition
> WITH t AS (
    WITH t2 AS (SELECT 1)
    SELECT * FROM t2)
  SELECT * FROM t;
   1

-- CTE in subquery
> SELECT max(c) FROM (
    WITH t(c) AS (SELECT 1)
    SELECT * FROM t);
      1

-- CTE in subquery expression
> SELECT (WITH t AS (SELECT 1)
          SELECT * FROM t);
                1

-- CTE in CREATE VIEW statement
> CREATE VIEW v AS
    WITH t(a, b, c, d) AS (SELECT 1, 2, 3, 4)
    SELECT * FROM t;
> SELECT * FROM v;
   1   2   3   4

-- CTE names are scoped
> WITH t  AS (SELECT 1),
       t2 AS (
        WITH t AS (SELECT 2)
        SELECT * FROM t)
SELECT * FROM t2;
   2
WITH column_date AS (
  SELECT date_format(TIMESTAMPADD(HOUR, +7, current_timestamp()),'yyyyMMdd') AS T0
    ,    date_format(TIMESTAMPADD(HOUR, +7-24, current_timestamp()),'yyyyMMdd') AS T1
    ,    date_format(TIMESTAMPADD(HOUR, +7-48, current_timestamp()),'yyyyMMdd') AS T2
    ,    date_format(TIMESTAMPADD(HOUR, +7-72, current_timestamp()),'yyyyMMdd') AS T3
)
SELECT *
FROM   column_date;

---------------------------------------------
|   T0     |    T1    |   T2     |   T3     |
---------------------------------------------
| 20230310 | 20230309 | 20230308 | 20230307 |

Databricks SQL – Views

USE demo;
CREATE OR REPLACE VIEW high_sales AS
    SELECT * FROM delta.`wasbs://courseware@dbacademy.blob.core.windows.net/data-analysis-with-databricks/v01/sales` 
        WHERE total_price > 10000;
SELECT * FROM high_sales;

CREATE VIEW (Databricks SQL)

CREATE [ OR REPLACE ] [ TEMPORARY ] VIEW [ IF NOT EXISTS ] view_name
    [ column_list ]
    [ COMMENT view_comment ]
    [ TBLPROPERTIES clause ]
    AS query

column_list
   ( { column_alias [ COMMENT column_comment ] } [, ...] )

Parameters

  • OR REPLACE – If a view of the same name already exists, it is replaced. To replace an existing view you must be its owner.
  • TEMPORARY – TEMPORARY views are visible only to the session that created them and are dropped when the session ends.
  • IF NOT EXISTS – Creates the view only if it does not exist. If a view by this name already exists the CREATE VIEW statement is ignored.You may specify at most one of IF NOT EXISTS or OR REPLACE.
  • view_name – The name of the newly created view. A temporary view’s name must not be qualified. A the fully qualified view name must be unique.
  • column_list – Optionally labels the columns in the query result of the view. If you provide a column list the number of column aliases must match the number of expressions in the query. In case no column list is specified aliases are derived from the body of the view.
    • column_alias – The column aliases must be unique.
    • column_comment – An optional STRING literal describing the column alias.
  • view_comment – An optional STRING literal providing a view-level comments.
  • TBLPROPERTIES – Optionally sets one or more user defined properties.
  • AS query – A query that constructs the view from base tables or other views.

Examples

-- Create or replace view for `experienced_employee` with comments.
> CREATE OR REPLACE VIEW experienced_employee
    (id COMMENT 'Unique identification number', Name)
    COMMENT 'View for experienced employees'
    AS SELECT id, name
         FROM all_employee
        WHERE working_years > 5;

-- Create a temporary view `subscribed_movies`.
> CREATE TEMPORARY VIEW subscribed_movies
    AS SELECT mo.member_id, mb.full_name, mo.movie_title
         FROM movies AS mo
         INNER JOIN members AS mb
            ON mo.member_id = mb.id;

Databricks SQL – Tables

Note that I have “USING DELTA” at the end of the CREATE statment. This is optional because Delta is the default table type.

USE demo;
CREATE OR REPLACE TABLE managed_table (width INT, length INT, height INT) USING DELTA;
INSERT INTO managed_table VALUES (3, 2, 1);
SELECT * FROM managed_table;
USE demo;
DESCRIBE EXTENDED managed_table;
DESCRIBE TABLE EXTENDED managed_table;
USE demo;
SHOW CREATE TABLE managed_table;
USE demo;
DESCRIBE HISTORY managed_table;
USE demo;
SELECT *
FROM managed_table
VERSION AS OF 0
RESTORE TABLE employee TO TIMESTAMP AS OF '2022-08-02 00:00:00';
RESTORE TABLE employee TO VERSION AS OF 1;
USE demo;
DROP TABLE IF EXISTS managed_table;

CREATE TABLE [USING] (Databricks SQL)

Defines a managed or external table, optionally using a data source.

{ { [CREATE OR] REPLACE TABLE | CREATE TABLE [ IF NOT EXISTS ] }
  table_name
  [ column_specification ] [ USING data_source ]
  [ table_clauses ]
  [ AS query ] }

column_specification
  ( { column_identifier column_type [ NOT NULL ]
      [ GENERATED ALWAYS AS ( expr ) |
        GENERATED { ALWAYS | BY DEFAULT } AS IDENTITY [ ( [ START WITH start ] [ INCREMENT BY step ] ) ] ]
      [ COMMENT column_comment ] } [, ...] )

table_clauses
  { OPTIONS clause |
    PARTITIONED BY clause |
    clustered_by_clause |
    LOCATION path [ WITH ( CREDENTIAL credential_name ) ] |
    COMMENT table_comment |
    TBLPROPERTIES clause } [...]

clustered_by_clause
  { CLUSTERED BY ( cluster_column [, ...] )
    [ SORTED BY ( { sort_column [ ASC | DESC ] } [, ...] ) ]
    INTO num_buckets BUCKETS }

Examples

-- Creates a Delta table
> CREATE TABLE student (id INT, name STRING, age INT);

-- Use data from another table
> CREATE TABLE student_copy AS SELECT * FROM student;

-- Creates a CSV table from an external directory
> CREATE TABLE student USING CSV LOCATION '/mnt/csv_files';

-- Specify table comment and properties
> CREATE TABLE student (id INT, name STRING, age INT)
    COMMENT 'this is a comment'
    TBLPROPERTIES ('foo'='bar');

-- Specify table comment and properties with different clauses order
> CREATE TABLE student (id INT, name STRING, age INT)
    TBLPROPERTIES ('foo'='bar')
    COMMENT 'this is a comment';

-- Create partitioned table
> CREATE TABLE student (id INT, name STRING, age INT)
    PARTITIONED BY (age);

-- Create a table with a generated column
> CREATE TABLE rectangles(a INT, b INT,
                          area INT GENERATED ALWAYS AS (a * b));

DESCRIBE TABLE (Databricks SQL)

Returns the basic metadata information of a table. The metadata information includes column name, column type and column comment. Optionally you can specify a partition spec or column name to return the metadata pertaining to a partition or column respectively.

{ DESC | DESCRIBE } [ TABLE ] [ EXTENDED | FORMATTED ] table_name { [ PARTITION clause ] | [ column_name ] }

SHOW CREATE TABLE (Databricks SQL)

Returns the CREATE TABLE statement or CREATE VIEW statement that was used to create a given table or view.

SHOW CREATE TABLE { table_name | view_name }

Examples

> CREATE TABLE test (c INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
    STORED AS TEXTFILE
    TBLPROPERTIES ('prop1' = 'value1', 'prop2' = 'value2');

> SHOW CREATE TABLE test;
                                       createtab_stmt
 ----------------------------------------------------
 CREATE TABLE `default`.`test` (`c` INT)
 USING text
 TBLPROPERTIES (
   'transient_lastDdlTime' = '1586269021',
   'prop1' = 'value1',
   'prop2' = 'value2')

DESCRIBE HISTORY (Databricks SQL)

You can retrieve information on the operations, user, timestamp, and so on for each write to a Delta table by running the history command. The operations are returned in reverse chronological order. By default table history is retained for 30 days.

ดู version และ timestamp ของ delta table

DESCRIBE HISTORY table_name

Examples

DESCRIBE HISTORY '/data/events/'          -- get the full history of the table

DESCRIBE HISTORY delta.`/data/events/`

DESCRIBE HISTORY '/data/events/' LIMIT 1  -- get the last operation only

DESCRIBE HISTORY eventsTable
from delta.tables import *

deltaTable = DeltaTable.forPath(spark, pathToTable)

fullHistoryDF = deltaTable.history()    # get the full history of the table

lastOperationDF = deltaTable.history(1) # get the last operation

RESTORE (Delta Lake on Databricks)

Restores a Delta table to an earlier state. Restoring to an earlier version number or a timestamp is supported.

RESTORE [ TABLE ] table_name [ TO ] time_travel_version

time_travel_version
 { TIMESTAMP AS OF timestamp_expression |
   VERSION AS OF version }

DROP TABLE (Databricks SQL)

DROP TABLE [ IF EXISTS ] table_name

Examples

-- Assumes a table named `employeetable` exists.
> DROP TABLE employeetable;

-- Assumes a table named `employeetable` exists in the `userdb` schema
> DROP TABLE userdb.employeetable;

-- Assumes a table named `employeetable` does not exist.
-- Throws exception
> DROP TABLE employeetable;
  Error: Table or view not found: employeetable;

-- Assumes a table named `employeetable` does not exist,Try with IF EXISTS
-- this time it will not throw exception
> DROP TABLE IF EXISTS employeetable;

Dropping External Tables Does Not Delete Data

Even though we dropped the table, we are still able to query the data directly in the filesystem because it still exists in the object store.

Now, this is one of the coolest features of Databricks SQL. We’ve talked about how the use of schemas and tables is just an organizational contruct. The data files located in this location can be queried directly, even though they are not part of a table or schema. We use tables and schemas simply to organize data in a way familiar to you.

SELECT * FROM delta.`wasbs://courseware@dbacademy.blob.core.windows.net/data-analysis-with-databricks/v01/sales`;

Databricks SQL – Schema

The code runs three statements. The first drops the schema just in case we are running this twice. The second creates the schema.

Note that there is no CATALOG provided. Databricks is set up to use a default catalog, and this is set up by your Databricks Administrator.

The third statement runs a DESCRIBE SCHEMA EXTENDED, which gives us information about the schema, including the location where managed table data will be stored.

DROP SCHEMA IF EXISTS demo_schema CASCADE;
CREATE SCHEMA IF NOT EXISTS demo_schema;
DESCRIBE SCHEMA EXTENDED demo_schema;

DROP SCHEMA (Databricks SQL)

Drops a schema and deletes the directory associated with the schema from the file system. An exception is thrown if the schema does not exist in the system.

While usage of SCHEMA and DATABASE is interchangeable, SCHEMA is preferred.

DROP SCHEMA [ IF EXISTS ] schema_name [ RESTRICT | CASCADE ]

Parameters

  • IF EXISTS – If specified, no exception is thrown when the schema does not exist.
  • schema_name – The name of an existing schemas in the system. If the name does not exist, an exception is thrown.
  • RESTRICT – If specified, will restrict dropping a non-empty schema and is enabled by default.
  • CASCADE – If specified, will drop all the associated tables and functions.

CREATE SCHEMA (Databricks SQL)

Creates a schema with the specified name. If a schema with the same name already exists, an exception is thrown.

CREATE SCHEMA [ IF NOT EXISTS ] schema_name
    [ COMMENT schema_comment ]
    [ LOCATION schema_directory ]
    [ WITH DBPROPERTIES ( property_name = property_value [ , ... ] ) ]

Parameters

  • schema_name – The name of the schema to be created.
  • IF NOT EXISTS – Creates a schema with the given name if it does not exist. If a schema with the same name already exists, nothing will happen.
  • schema_directory – Path of the file system in which the specified schema is to be created. If the specified path does not exist in the underlying file system, creates a directory with the path. If the location is not specified, the schema is created in the default warehouse directory, whose path is configured by the static configuration spark.sql.warehouse.dir.
  • schema_comment – The description for the schema.
  • WITH DBPROPERTIES ( property_name = property_value [ , … ] ) – The properties for the schema in key-value pairs.

DESCRIBE SCHEMA (Databricks SQL)

Returns the metadata of an existing schema. The metadata information includes the schema’s name, comment, and location on the filesystem. If the optional EXTENDED option is specified, schema properties are also returned.

While usage of SCHEMA and DATABASE is interchangeable, SCHEMA is preferred.

{ DESC | DESCRIBE } SCHEMA [ EXTENDED ] schema_name

Parameters

  • schema_name – The name of an existing schema (schema) in the system. If the name does not exist, an exception is thrown.

Databricks Data Analyst

Databricks Certified Data Analyst Associate – Databricks

  1. DataBricks Academy
  2. Databricks SQL guide | Databricks on AWS
  3. GitHub Notebook v1.1.5 , v1.2.2

Resources

Databricks Connect

Databricks recommends that you use dbx by Databricks Labs for local development instead of Databricks Connect. Databricks plans no new feature development for Databricks Connect at this time. Also, be aware of the limitations of Databricks Connect.

Requirements

Databricks Runtime versionPython version
10.4 LTS ML, 10.4 LTS3.8
9.1 LTS ML, 9.1 LTS3.8
7.3 LTS ML, 7.3 LTS3.7
6.4 ML, 6.43.7

The Databricks Connect major and minor package version must always match your Databricks Runtime version.

Java Runtime Environment (JRE) 8. The client has been tested with the OpenJDK 8 JRE. The client does not support Java 11.

Set up the client

Step 1: Install the client

Uninstall PySpark. This is required because the databricks-connect package conflicts with PySpark. For details, see Conflicting PySpark installations.

pip uninstall pyspark

Install the Databricks Connect client.

pip install -U "databricks-connect==10.4.*"  # or X.Y.* to match your cluster version.

Step 2: Configure connection properties

databricks-connect configure
Do you accept the above agreement? [y/N] y
Set new config values (leave input empty to accept default):
Databricks Host [no current value, must start with https://]: <databricks-url>
Databricks Token [no current value]: <databricks-token>
Cluster ID (e.g., 0921-001415-jelly628) [no current value]: <cluster-id>
Org ID (Azure-only, see ?o=orgId in URL) [0]: <org-id>
Port [15001]: <port>
ParameterSQL config keyEnvironment variable name
Databricks Hostspark.databricks.service.addressDATABRICKS_ADDRESS
Databricks Tokenspark.databricks.service.tokenDATABRICKS_API_TOKEN
Cluster IDspark.databricks.service.clusterIdDATABRICKS_CLUSTER_ID
Org IDspark.databricks.service.orgIdDATABRICKS_ORG_ID
Portspark.databricks.service.portDATABRICKS_PORT

Test connectivity to Databricks.

databricks-connect test

If the cluster you configured is not running, the test starts the cluster which will remain running until its configured autotermination time. The output should be something like:

* PySpark is installed at /.../3.5.6/lib/python3.5/site-packages/pyspark
* Checking java version
java version "1.8.0_152"
Java(TM) SE Runtime Environment (build 1.8.0_152-b16)
Java HotSpot(TM) 64-Bit Server VM (build 25.152-b16, mixed mode)
* Testing scala command
18/12/10 16:38:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
18/12/10 16:38:50 WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set.
18/12/10 16:39:53 WARN SparkServiceRPCClient: Now tracking server state for 5abb7c7e-df8e-4290-947c-c9a38601024e, invalidating prev state
18/12/10 16:39:59 WARN SparkServiceRPCClient: Syncing 129 files (176036 bytes) took 3003 ms
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.0-SNAPSHOT
      /_/

Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_152)
Type in expressions to have them evaluated.
Type :help for more information.

scala> spark.range(100).reduce(_ + _)
Spark context Web UI available at https://10.8.5.214:4040
Spark context available as 'sc' (master = local[*], app id = local-1544488730553).
Spark session available as 'spark'.
View job details at <databricks-url>/?o=0#/setting/clusters/<cluster-id>/sparkUi
View job details at <databricks-url>?o=0#/setting/clusters/<cluster-id>/sparkUi
res0: Long = 4950

scala> :quit

* Testing python command
18/12/10 16:40:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
18/12/10 16:40:17 WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set.
18/12/10 16:40:28 WARN SparkServiceRPCClient: Now tracking server state for 5abb7c7e-df8e-4290-947c-c9a38601024e, invalidating prev state
View job details at <databricks-url>/?o=0#/setting/clusters/<cluster-id>/sparkUi

Run examples

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from datetime import date

spark = SparkSession.builder.appName('temps-demo').getOrCreate()

# Create a Spark DataFrame consisting of high and low temperatures
# by airport code and date.
schema = StructType([
    StructField('AirportCode', StringType(), False),
    StructField('Date', DateType(), False),
    StructField('TempHighF', IntegerType(), False),
    StructField('TempLowF', IntegerType(), False)
])

data = [
    [ 'BLI', date(2021, 4, 3), 52, 43],
    [ 'BLI', date(2021, 4, 2), 50, 38],
    [ 'BLI', date(2021, 4, 1), 52, 41],
    [ 'PDX', date(2021, 4, 3), 64, 45],
    [ 'PDX', date(2021, 4, 2), 61, 41],
    [ 'PDX', date(2021, 4, 1), 66, 39],
    [ 'SEA', date(2021, 4, 3), 57, 43],
    [ 'SEA', date(2021, 4, 2), 54, 39],
    [ 'SEA', date(2021, 4, 1), 56, 41]
]

temps = spark.createDataFrame(data, schema)

# Create a table on the Databricks cluster and then fill
# the table with the DataFrame's contents.
# If the table already exists from a previous run,
# delete it first.
spark.sql('USE default')
spark.sql('DROP TABLE IF EXISTS demo_temps_table')
temps.write.saveAsTable('demo_temps_table')

# Query the table on the Databricks cluster, returning rows
# where the airport code is not BLI and the date is later
# than 2021-04-01. Group the results and order by high
# temperature in descending order.
df_temps = spark.sql("SELECT * FROM demo_temps_table " \
    "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " \
    "GROUP BY AirportCode, Date, TempHighF, TempLowF " \
    "ORDER BY TempHighF DESC")
df_temps.show()

# Results:
#
# +-----------+----------+---------+--------+
# |AirportCode|      Date|TempHighF|TempLowF|
# +-----------+----------+---------+--------+
# |        PDX|2021-04-03|       64|      45|
# |        PDX|2021-04-02|       61|      41|
# |        SEA|2021-04-03|       57|      43|
# |        SEA|2021-04-02|       54|      39|
# +-----------+----------+---------+--------+

# Clean up by deleting the table from the Databricks cluster.
spark.sql('DROP TABLE demo_temps_table')

Work with dependencies

from lib import Foo
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

sc = spark.sparkContext
#sc.setLogLevel("INFO")

print("Testing simple count")
print(spark.range(100).count())

print("Testing addPyFile isolation")
sc.addPyFile("lib.py")
print(sc.parallelize(range(10)).map(lambda i: Foo(2)).collect())

class Foo(object):
  def __init__(self, x):
    self.x = x

Access DBUtils

from pyspark.sql import SparkSession
from pyspark.dbutils import DBUtils

spark = SparkSession.builder.getOrCreate()

dbutils = DBUtils(spark)
print(dbutils.fs.ls("dbfs:/"))
print(dbutils.secrets.listScopes())

When using Databricks Runtime 7.3 LTS or above, to access the DBUtils module in a way that works both locally and in Databricks clusters, use the following get_dbutils():

def get_dbutils(spark):
  from pyspark.dbutils import DBUtils
  return DBUtils(spark)

Otherwise, use the following get_dbutils():

def get_dbutils(spark):
  if spark.conf.get("spark.databricks.service.client.enabled") == "true":
    from pyspark.dbutils import DBUtils
    return DBUtils(spark)
  else:
    import IPython
    return IPython.get_ipython().user_ns["dbutils"]

Copying files between local and remote filesystems

from pyspark.dbutils import DBUtils
dbutils = DBUtils(spark)

dbutils.fs.cp('file:/home/user/data.csv', 'dbfs:/uploads')
dbutils.fs.cp('dbfs:/output/results.csv', 'file:/home/user/downloads/')

Enable dbutils.secrets.get

Because of security restrictions, the ability to call dbutils.secrets.get is disabled by default. Contact Databricks support to enable this feature for your workspace.

Databricks job

Run with different parameters

To re-run the job and filter baby names for a different year:

  1. Click Blue Down Caret next to Run Now and select Run Now with Different Parameters or click Run Now with Different Parameters in the Active Runs table.
  2. In the Value field, enter 2015.
  3. Click Run.
babynames = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("dbfs:/FileStore/babynames.csv")
babynames.createOrReplaceTempView("babynames_table")
years = spark.sql("select distinct(Year) from babynames_table").rdd.map(lambda row : row[0]).collect()
years.sort()
dbutils.widgets.dropdown("year", "2014", [str(x) for x in years])
display(babynames.filter(babynames.Year == dbutils.widgets.get("year")))

การรับ-ส่งข้อมูลระหว่าง task

task ฝั่งส่งข้อมูล (task ที่รันก่อน)

dbutils.jobs.taskValues.set(key   = "my-key", \
                            value = 5)

dbutils.jobs.taskValues.set(key   = "my-other-key", \
                            value = "my other value")

task ฝั่งรับข้อมูล (task ที่รันทีหลัง)

dbutils.jobs.taskValues.get(taskKey    = "my-task", \
                            key        = "my-key", \
                            default    = 7, \
                            debugValue = 42)

โดย taskKey คือชื่อของ task ก่อนหน้าที่ส่งข้อมูลมาให้

Databricks widgets

Input widgets allow you to add parameters to your notebooks and dashboards. The widget API consists of calls to create various types of input widgets, remove them, and get bound values.

In Databricks Runtime 11.0 and above, you can also use ipywidgets in Databricks notebooks.

Databricks widgets are best for:

  • Building a notebook or dashboard that is re-executed with different parameters
  • Quickly exploring results of a single query with different parameters

View the documentation for the widget API in Scala, Python, and R with the following command:

dbutils.widgets.help()
dbutils.widgets.help("text")
dbutils.widgets.help("dropdown")

Databricks widget types

There are 4 types of widgets:

  • text: Input a value in a text box.
  • dropdown: Select a value from a list of provided values.
  • combobox: Combination of text and dropdown. Select a value from a provided list or input one in the text box.
  • multiselect: Select one or more values from a list of provided values.

Databricks widget example

Create a simple text widget.

dbutils.widgets.text("param1", "", "Input param")
param1 = dbutils.widgets.get("param1")
print(param1)

Create a simple dropdown widget.

dbutils.widgets.dropdown("X", "1", [str(x) for x in range(1, 10)])

You can access the current value of the widget with the call:

dbutils.widgets.get("X")

Finally, you can remove a widget or all widgets in a notebook:

dbutils.widgets.remove("X")

dbutils.widgets.removeAll()

Use Databricks widgets with %run

%run /path/to/notebook $X="10" $Y="1"
%run ./test_job $param1="bar"

Databricks CLI

CLI

Install the CLI

pip install databricks-cli

Update the CLI

pip install databricks-cli --upgrade

Set up authentication

databricks configure --token

After you complete the prompts, your access credentials are stored in the file ~/.databrickscfg on Unix, Linux, or macOS, or %USERPROFILE%\.databrickscfg on Windows. The file contains a default profile entry:

[DEFAULT]
host = <workspace-URL>
token = <personal-access-token>

Use the CLI

$ databricks --help
Usage: databricks [OPTIONS] COMMAND [ARGS]...

Options:
  -v, --version   x.xx.x
  --debug         Debug Mode. Shows full stack trace on error.
  --profile TEXT  CLI connection profile to use.
                  The default profile is "DEFAULT".

  -h, --help      Show this message and exit.

Commands:
  clusters        Utility to interact with Databricks clusters.
  configure       Configures host and authentication info for the CLI.
  fs              Utility to interact with DBFS.
  groups          Utility to interact with Databricks groups.
  instance-pools  Utility to interact with Databricks instance pools.
  jobs            Utility to interact with jobs.
  libraries       Utility to interact with libraries.
  pipelines       Utility to interact with the Databricks Delta Pipelines.
  runs            Utility to interact with the jobs runs.
  secrets         Utility to interact with Databricks secret API.
  stack           [Beta] Utility to deploy and download Databricks resource
                  stacks.

  workspace       Utility to interact with the Databricks workspace.
databricks jobs list --profile test
databricks jobs list --profile test--output JSON | jq '.jobs[] | select(.job_id == 123) | .settings'
databricks clusters list --profile test
databricks clusters list --profile test --output JSON | jq '[ .clusters[] | { name: .cluster_name, id: .cluster_id } ]'

JSON string parameters

databricks jobs run-now --job-id 9 --jar-params '["20180505", "alantest"]'

Runs CLI

Requirements to call the Jobs REST API 2.0

Update the CLI to version 0.16.0 or above

Run the command 

databricks jobs configure --version=2.0

This adds the setting jobs-api-version = 2.0 to the file ~/.databrickscfg on Unix, Linux, or macOS, or %USERPROFILE%\.databrickscfg on Windows. All job runs CLI (and jobs CLI) subcommands will call the Jobs REST API 2.0 by default.

Subcommands and general usage

$ databricks runs --help
Usage: databricks runs [OPTIONS] COMMAND [ARGS]...

  Utility to interact with jobs runs.

Options:
  -v, --version   0.11.0
  --debug         Debug Mode. Shows full stack trace on error.
  --profile TEXT  CLI connection profile to use. The default profile is
                  "DEFAULT".

  -h, --help      Show this message and exit.

Commands:
  cancel      Cancels the run specified.
  get         Gets the metadata about a run in json form.
  get-output  Gets the output of a run The output schema is documented...
  list        Lists job runs.
  submit      Submits a one-time run.

Get the output of a run

databricks runs get-output --run-id 119
{
  "metadata": {
    "job_id": 239,
    "run_id": 119,
    "number_in_job": 1,
    "original_attempt_run_id": 119,
    "state": {
      "life_cycle_state": "TERMINATED",
      "result_state": "SUCCESS",
      "state_message": ""
    },
    "task": {
      "notebook_task": {
        "notebook_path": "/Users/someone@example.com/notebooks/my-notebook.ipynb"
      }
    },
    "cluster_spec": {
      "new_cluster": {
        "spark_version": "8.1.x-scala2.12",
        "aws_attributes": {
          "zone_id": "us-west-2c",
          "availability": "SPOT_WITH_FALLBACK"
        },
        "node_type_id": "m5d.large",
        "enable_elastic_disk": false,
        "num_workers": 1
      }
    },
    "cluster_instance": {
      "cluster_id": "1234-567890-abcd123",
      "spark_context_id": "1234567890123456789"
    },
    "start_time": 1618510327335,
    "setup_duration": 191000,
    "execution_duration": 41000,
    "cleanup_duration": 2000,
    "end_time": 1618510561615,
    "trigger": "ONE_TIME",
    "creator_user_name": "someone@example.com",
    "run_name": "my-notebook-run",
    "run_page_url": "https://dbc-a1b2345c-d6e7.cloud.databricks.com/?o=1234567890123456#job/239/run/1",
    "run_type": "JOB_RUN",
    "attempt_number": 0
  },
  "notebook_output": {}
}

โดย notebook_output จะได้ค่ามาจาก dbutils.notebook.exit() ใน notebook Jobs API 2.0 | Databricks on AWS

MERGE INTO

Merges a set of updates, insertions, and deletions based on a source table into a target Delta table.

Syntax

MERGE INTO target_table_name [target_alias]
   USING source_table_reference [source_alias]
   ON merge_condition
   [ WHEN MATCHED [ AND condition ] THEN matched_action ] [...]
   [ WHEN NOT MATCHED [ AND condition ]  THEN not_matched_action ] [...]

matched_action
 { DELETE |
   UPDATE SET * |
   UPDATE SET { column1 = value1 } [, ...] }

not_matched_action
 { INSERT * |
   INSERT (column1 [, ...] ) VALUES (value1 [, ...])

See update table data syntax documentation with code – Quickstart — Delta Lake Documentation