Databricks Connect

Databricks recommends that you use dbx by Databricks Labs for local development instead of Databricks Connect. Databricks plans no new feature development for Databricks Connect at this time. Also, be aware of the limitations of Databricks Connect.

Requirements

Databricks Runtime versionPython version
10.4 LTS ML, 10.4 LTS3.8
9.1 LTS ML, 9.1 LTS3.8
7.3 LTS ML, 7.3 LTS3.7
6.4 ML, 6.43.7

The Databricks Connect major and minor package version must always match your Databricks Runtime version.

Java Runtime Environment (JRE) 8. The client has been tested with the OpenJDK 8 JRE. The client does not support Java 11.

Set up the client

Step 1: Install the client

Uninstall PySpark. This is required because the databricks-connect package conflicts with PySpark. For details, see Conflicting PySpark installations.

pip uninstall pyspark

Install the Databricks Connect client.

pip install -U "databricks-connect==10.4.*"  # or X.Y.* to match your cluster version.

Step 2: Configure connection properties

databricks-connect configure
Do you accept the above agreement? [y/N] y
Set new config values (leave input empty to accept default):
Databricks Host [no current value, must start with https://]: <databricks-url>
Databricks Token [no current value]: <databricks-token>
Cluster ID (e.g., 0921-001415-jelly628) [no current value]: <cluster-id>
Org ID (Azure-only, see ?o=orgId in URL) [0]: <org-id>
Port [15001]: <port>
ParameterSQL config keyEnvironment variable name
Databricks Hostspark.databricks.service.addressDATABRICKS_ADDRESS
Databricks Tokenspark.databricks.service.tokenDATABRICKS_API_TOKEN
Cluster IDspark.databricks.service.clusterIdDATABRICKS_CLUSTER_ID
Org IDspark.databricks.service.orgIdDATABRICKS_ORG_ID
Portspark.databricks.service.portDATABRICKS_PORT

Test connectivity to Databricks.

databricks-connect test

If the cluster you configured is not running, the test starts the cluster which will remain running until its configured autotermination time. The output should be something like:

* PySpark is installed at /.../3.5.6/lib/python3.5/site-packages/pyspark
* Checking java version
java version "1.8.0_152"
Java(TM) SE Runtime Environment (build 1.8.0_152-b16)
Java HotSpot(TM) 64-Bit Server VM (build 25.152-b16, mixed mode)
* Testing scala command
18/12/10 16:38:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
18/12/10 16:38:50 WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set.
18/12/10 16:39:53 WARN SparkServiceRPCClient: Now tracking server state for 5abb7c7e-df8e-4290-947c-c9a38601024e, invalidating prev state
18/12/10 16:39:59 WARN SparkServiceRPCClient: Syncing 129 files (176036 bytes) took 3003 ms
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.0-SNAPSHOT
      /_/

Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_152)
Type in expressions to have them evaluated.
Type :help for more information.

scala> spark.range(100).reduce(_ + _)
Spark context Web UI available at https://10.8.5.214:4040
Spark context available as 'sc' (master = local[*], app id = local-1544488730553).
Spark session available as 'spark'.
View job details at <databricks-url>/?o=0#/setting/clusters/<cluster-id>/sparkUi
View job details at <databricks-url>?o=0#/setting/clusters/<cluster-id>/sparkUi
res0: Long = 4950

scala> :quit

* Testing python command
18/12/10 16:40:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
18/12/10 16:40:17 WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set.
18/12/10 16:40:28 WARN SparkServiceRPCClient: Now tracking server state for 5abb7c7e-df8e-4290-947c-c9a38601024e, invalidating prev state
View job details at <databricks-url>/?o=0#/setting/clusters/<cluster-id>/sparkUi

Run examples

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from datetime import date

spark = SparkSession.builder.appName('temps-demo').getOrCreate()

# Create a Spark DataFrame consisting of high and low temperatures
# by airport code and date.
schema = StructType([
    StructField('AirportCode', StringType(), False),
    StructField('Date', DateType(), False),
    StructField('TempHighF', IntegerType(), False),
    StructField('TempLowF', IntegerType(), False)
])

data = [
    [ 'BLI', date(2021, 4, 3), 52, 43],
    [ 'BLI', date(2021, 4, 2), 50, 38],
    [ 'BLI', date(2021, 4, 1), 52, 41],
    [ 'PDX', date(2021, 4, 3), 64, 45],
    [ 'PDX', date(2021, 4, 2), 61, 41],
    [ 'PDX', date(2021, 4, 1), 66, 39],
    [ 'SEA', date(2021, 4, 3), 57, 43],
    [ 'SEA', date(2021, 4, 2), 54, 39],
    [ 'SEA', date(2021, 4, 1), 56, 41]
]

temps = spark.createDataFrame(data, schema)

# Create a table on the Databricks cluster and then fill
# the table with the DataFrame's contents.
# If the table already exists from a previous run,
# delete it first.
spark.sql('USE default')
spark.sql('DROP TABLE IF EXISTS demo_temps_table')
temps.write.saveAsTable('demo_temps_table')

# Query the table on the Databricks cluster, returning rows
# where the airport code is not BLI and the date is later
# than 2021-04-01. Group the results and order by high
# temperature in descending order.
df_temps = spark.sql("SELECT * FROM demo_temps_table " \
    "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " \
    "GROUP BY AirportCode, Date, TempHighF, TempLowF " \
    "ORDER BY TempHighF DESC")
df_temps.show()

# Results:
#
# +-----------+----------+---------+--------+
# |AirportCode|      Date|TempHighF|TempLowF|
# +-----------+----------+---------+--------+
# |        PDX|2021-04-03|       64|      45|
# |        PDX|2021-04-02|       61|      41|
# |        SEA|2021-04-03|       57|      43|
# |        SEA|2021-04-02|       54|      39|
# +-----------+----------+---------+--------+

# Clean up by deleting the table from the Databricks cluster.
spark.sql('DROP TABLE demo_temps_table')

Work with dependencies

from lib import Foo
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

sc = spark.sparkContext
#sc.setLogLevel("INFO")

print("Testing simple count")
print(spark.range(100).count())

print("Testing addPyFile isolation")
sc.addPyFile("lib.py")
print(sc.parallelize(range(10)).map(lambda i: Foo(2)).collect())

class Foo(object):
  def __init__(self, x):
    self.x = x

Access DBUtils

from pyspark.sql import SparkSession
from pyspark.dbutils import DBUtils

spark = SparkSession.builder.getOrCreate()

dbutils = DBUtils(spark)
print(dbutils.fs.ls("dbfs:/"))
print(dbutils.secrets.listScopes())

When using Databricks Runtime 7.3 LTS or above, to access the DBUtils module in a way that works both locally and in Databricks clusters, use the following get_dbutils():

def get_dbutils(spark):
  from pyspark.dbutils import DBUtils
  return DBUtils(spark)

Otherwise, use the following get_dbutils():

def get_dbutils(spark):
  if spark.conf.get("spark.databricks.service.client.enabled") == "true":
    from pyspark.dbutils import DBUtils
    return DBUtils(spark)
  else:
    import IPython
    return IPython.get_ipython().user_ns["dbutils"]

Copying files between local and remote filesystems

from pyspark.dbutils import DBUtils
dbutils = DBUtils(spark)

dbutils.fs.cp('file:/home/user/data.csv', 'dbfs:/uploads')
dbutils.fs.cp('dbfs:/output/results.csv', 'file:/home/user/downloads/')

Enable dbutils.secrets.get

Because of security restrictions, the ability to call dbutils.secrets.get is disabled by default. Contact Databricks support to enable this feature for your workspace.

Databricks job

Run with different parameters

To re-run the job and filter baby names for a different year:

  1. Click Blue Down Caret next to Run Now and select Run Now with Different Parameters or click Run Now with Different Parameters in the Active Runs table.
  2. In the Value field, enter 2015.
  3. Click Run.
babynames = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("dbfs:/FileStore/babynames.csv")
babynames.createOrReplaceTempView("babynames_table")
years = spark.sql("select distinct(Year) from babynames_table").rdd.map(lambda row : row[0]).collect()
years.sort()
dbutils.widgets.dropdown("year", "2014", [str(x) for x in years])
display(babynames.filter(babynames.Year == dbutils.widgets.get("year")))

การรับ-ส่งข้อมูลระหว่าง task

task ฝั่งส่งข้อมูล (task ที่รันก่อน)

dbutils.jobs.taskValues.set(key   = "my-key", \
                            value = 5)

dbutils.jobs.taskValues.set(key   = "my-other-key", \
                            value = "my other value")

task ฝั่งรับข้อมูล (task ที่รันทีหลัง)

dbutils.jobs.taskValues.get(taskKey    = "my-task", \
                            key        = "my-key", \
                            default    = 7, \
                            debugValue = 42)

โดย taskKey คือชื่อของ task ก่อนหน้าที่ส่งข้อมูลมาให้

Databricks widgets

Input widgets allow you to add parameters to your notebooks and dashboards. The widget API consists of calls to create various types of input widgets, remove them, and get bound values.

In Databricks Runtime 11.0 and above, you can also use ipywidgets in Databricks notebooks.

Databricks widgets are best for:

  • Building a notebook or dashboard that is re-executed with different parameters
  • Quickly exploring results of a single query with different parameters

View the documentation for the widget API in Scala, Python, and R with the following command:

dbutils.widgets.help()
dbutils.widgets.help("text")
dbutils.widgets.help("dropdown")

Databricks widget types

There are 4 types of widgets:

  • text: Input a value in a text box.
  • dropdown: Select a value from a list of provided values.
  • combobox: Combination of text and dropdown. Select a value from a provided list or input one in the text box.
  • multiselect: Select one or more values from a list of provided values.

Databricks widget example

Create a simple text widget.

dbutils.widgets.text("param1", "", "Input param")
param1 = dbutils.widgets.get("param1")
print(param1)

Create a simple dropdown widget.

dbutils.widgets.dropdown("X", "1", [str(x) for x in range(1, 10)])

You can access the current value of the widget with the call:

dbutils.widgets.get("X")

Finally, you can remove a widget or all widgets in a notebook:

dbutils.widgets.remove("X")

dbutils.widgets.removeAll()

Use Databricks widgets with %run

%run /path/to/notebook $X="10" $Y="1"
%run ./test_job $param1="bar"

Databricks CLI

CLI

Install the CLI

pip install databricks-cli

Update the CLI

pip install databricks-cli --upgrade

Set up authentication

databricks configure --token

After you complete the prompts, your access credentials are stored in the file ~/.databrickscfg on Unix, Linux, or macOS, or %USERPROFILE%\.databrickscfg on Windows. The file contains a default profile entry:

[DEFAULT]
host = <workspace-URL>
token = <personal-access-token>

Use the CLI

$ databricks --help
Usage: databricks [OPTIONS] COMMAND [ARGS]...

Options:
  -v, --version   x.xx.x
  --debug         Debug Mode. Shows full stack trace on error.
  --profile TEXT  CLI connection profile to use.
                  The default profile is "DEFAULT".

  -h, --help      Show this message and exit.

Commands:
  clusters        Utility to interact with Databricks clusters.
  configure       Configures host and authentication info for the CLI.
  fs              Utility to interact with DBFS.
  groups          Utility to interact with Databricks groups.
  instance-pools  Utility to interact with Databricks instance pools.
  jobs            Utility to interact with jobs.
  libraries       Utility to interact with libraries.
  pipelines       Utility to interact with the Databricks Delta Pipelines.
  runs            Utility to interact with the jobs runs.
  secrets         Utility to interact with Databricks secret API.
  stack           [Beta] Utility to deploy and download Databricks resource
                  stacks.

  workspace       Utility to interact with the Databricks workspace.
databricks jobs list --profile test
databricks jobs list --profile test--output JSON | jq '.jobs[] | select(.job_id == 123) | .settings'
databricks clusters list --profile test
databricks clusters list --profile test --output JSON | jq '[ .clusters[] | { name: .cluster_name, id: .cluster_id } ]'

JSON string parameters

databricks jobs run-now --job-id 9 --jar-params '["20180505", "alantest"]'

Runs CLI

Requirements to call the Jobs REST API 2.0

Update the CLI to version 0.16.0 or above

Run the command 

databricks jobs configure --version=2.0

This adds the setting jobs-api-version = 2.0 to the file ~/.databrickscfg on Unix, Linux, or macOS, or %USERPROFILE%\.databrickscfg on Windows. All job runs CLI (and jobs CLI) subcommands will call the Jobs REST API 2.0 by default.

Subcommands and general usage

$ databricks runs --help
Usage: databricks runs [OPTIONS] COMMAND [ARGS]...

  Utility to interact with jobs runs.

Options:
  -v, --version   0.11.0
  --debug         Debug Mode. Shows full stack trace on error.
  --profile TEXT  CLI connection profile to use. The default profile is
                  "DEFAULT".

  -h, --help      Show this message and exit.

Commands:
  cancel      Cancels the run specified.
  get         Gets the metadata about a run in json form.
  get-output  Gets the output of a run The output schema is documented...
  list        Lists job runs.
  submit      Submits a one-time run.

Get the output of a run

databricks runs get-output --run-id 119
{
  "metadata": {
    "job_id": 239,
    "run_id": 119,
    "number_in_job": 1,
    "original_attempt_run_id": 119,
    "state": {
      "life_cycle_state": "TERMINATED",
      "result_state": "SUCCESS",
      "state_message": ""
    },
    "task": {
      "notebook_task": {
        "notebook_path": "/Users/someone@example.com/notebooks/my-notebook.ipynb"
      }
    },
    "cluster_spec": {
      "new_cluster": {
        "spark_version": "8.1.x-scala2.12",
        "aws_attributes": {
          "zone_id": "us-west-2c",
          "availability": "SPOT_WITH_FALLBACK"
        },
        "node_type_id": "m5d.large",
        "enable_elastic_disk": false,
        "num_workers": 1
      }
    },
    "cluster_instance": {
      "cluster_id": "1234-567890-abcd123",
      "spark_context_id": "1234567890123456789"
    },
    "start_time": 1618510327335,
    "setup_duration": 191000,
    "execution_duration": 41000,
    "cleanup_duration": 2000,
    "end_time": 1618510561615,
    "trigger": "ONE_TIME",
    "creator_user_name": "someone@example.com",
    "run_name": "my-notebook-run",
    "run_page_url": "https://dbc-a1b2345c-d6e7.cloud.databricks.com/?o=1234567890123456#job/239/run/1",
    "run_type": "JOB_RUN",
    "attempt_number": 0
  },
  "notebook_output": {}
}

โดย notebook_output จะได้ค่ามาจาก dbutils.notebook.exit() ใน notebook Jobs API 2.0 | Databricks on AWS

MERGE INTO

Merges a set of updates, insertions, and deletions based on a source table into a target Delta table.

Syntax

MERGE INTO target_table_name [target_alias]
   USING source_table_reference [source_alias]
   ON merge_condition
   [ WHEN MATCHED [ AND condition ] THEN matched_action ] [...]
   [ WHEN NOT MATCHED [ AND condition ]  THEN not_matched_action ] [...]

matched_action
 { DELETE |
   UPDATE SET * |
   UPDATE SET { column1 = value1 } [, ...] }

not_matched_action
 { INSERT * |
   INSERT (column1 [, ...] ) VALUES (value1 [, ...])

See update table data syntax documentation with code – Quickstart — Delta Lake Documentation

SQL reference

Time Travel

DESCRIBE HISTORY customer_data_delta
SELECT COUNT(*)
FROM customer_data_delta
VERSION AS OF 1
SELECT SUM(total_orders) - (
  SELECT SUM(total_orders)
  FROM customer_counts
  VERSION AS OF 0
  WHERE Country='Sweden') AS new_entries
FROM customer_counts
WHERE Country='Sweden'

ZORDER

OPTIMIZE customer_counts
ZORDER by (CustomerID)
SELECT CustomerID, SUM(total_orders) AS total
FROM customer_counts
GROUP BY CustomerID
ORDER BY total DESC
SELECT CustomerID, COUNT(Country) AS num_countries
FROM customer_counts
GROUP BY CustomerID
SORT BY num_countries DESC

VACUUM

VACUUM customer_data_delta RETAIN 0 HOURS;

uuid

Returns an universally unique identifier (UUID) string.

> SELECT uuid();
 46707d92-02f4-4817-8116-a4c3b23e6266

คำสั่ง uuid() ใช้ได้ใน SELECT

ถ้าจะใช้ uuid() ใน MERGE ด้วยการให้ค่าตอน INSERT ตรงๆเลย จะ ERROR ประมาณว่า

AnalysisException: nondeterministic expressions are only allowed in Project, Filter, Aggregate, Window, or Generate

ให้ไปใช้ uuid() ที่ SELECT ใน USING แล้วค่อยอ้างอิงมาให้ค่าตรง INSERT แล้วจะไม่ ERROR

cast

Casts the value expr to the target data type type.

> SELECT cast(NULL AS INT);
  NULL

> SELECT cast(5.6 AS INT);
  5

> SELECT cast(5.6 AS DECIMAL(2, 0);
  6

> SELECT cast(-5.6 AS INT);
  -5

> SELECT cast(-5.6 AS DECIMAL(2, 0);
  -6

> SELECT cast(128 AS TINYINT);
  Overflow

> SELECT cast(128 AS DECIMAL(2, 0));
  Overflow

> SELECT cast('123' AS INT);
  123

> SELECT cast('123.0' AS INT);
  Invalid format

> SELECT cast(TIMESTAMP'1970-01-01 00:00:01' AS LONG);
  1

> SELECT cast(TIMESTAMP'1970-01-01 00:00:00.000001' AS DOUBLE);
  1.0E-6

> SELECT cast(TIMESTAMP'2022-02-01 00:00:00' AS SMALLINT);
  error: overflow

> SELECT cast(true AS BOOLEAN);
  1

ฟังก์ชันเกี่ยวกับเวลา

current_timestamp และ now

Returns the current timestamp at the start of query evaluation.

> SELECT current_timestamp();
 2022-06-08T10:22:43.561+0000
> SELECT current_timestamp;
 2022-06-08T10:23:00.961+0000
> SELECT now();
 2022-06-08T10:23:44.145+0000

current_timezone

Returns the current session local timezone.

> SELECT current_timezone();
 Etc/UTC
> SELECT current_timezone();
 Asia/Shanghai

from_utc_timestamp

Returns a timestamp in expr specified in UTC in the timezone timeZone.

> SELECT from_utc_timestamp(current_timestamp, 'Asia/Bangkok');
 2022-06-08T17:30:43.343+0000
> SELECT from_utc_timestamp('2022-06-08', 'Asia/Bangkok');
 2022-06-08T07:00:00.000+0000
> SELECT from_utc_timestamp('2022-06-08 00:30:00.0', 'GMT+7');
 2022-06-08T07:30:00.000+0000

to_utc_timestamp

Returns the timestamp in expr in a different timezone as UTC.

> SELECT to_utc_timestamp('2022-06-01', 'Asia/Bangkok');
 2022-05-31T17:00:00.000+0000
> SELECT to_utc_timestamp('2022-06-01', 'GMT+7');
 2022-05-31T17:00:00.000+0000
> SELECT to_utc_timestamp('2022-06-01 00:00:00 UTC+07:00', 'GMT+7');
 2022-05-31T10:00:00.000+0000
> SELECT to_utc_timestamp('2022-06-01 00:00:00 UTC+00:00', 'GMT+7');
 2022-05-31T17:00:00.000+0000
> SELECT to_utc_timestamp('2022-06-01 00:00:00 UTC-07:00', 'GMT+7');
 2022-06-01T00:00:00.000+0000

ใช้แบบ 'GMT+7' (กำหนด time zone offsets) ดีกว่าแบบ 'Asia/Bangkok' เพราะบาง timezone มีเรื่องของ daylight savings time (DST) Dates and timestamps | Databricks on AWS

As you can see from the preceding examples, the mapping of time zone names to offsets is ambiguous, and is not one to one. In the cases when it is possible, when constructing timestamps we recommend specifying exact time zone offsets, for example 2019-11-03 01:30:00 UTC-07:00.
SELECT CURRENT_TIMESTAMP AS UTC_time
,      from_utc_timestamp(CURRENT_TIMESTAMP, 'GMT+7') AS local_time
,      to_utc_timestamp(from_utc_timestamp(CURRENT_TIMESTAMP, 'GMT+7'), 'GMT+7') AS UTC_time_again;

timestampadd

Adds value units to a timestamp expr.

timestampadd(unit, value, expr)

unit
 { MICROSECOND |
   MILLISECOND |
   SECOND |
   MINUTE |
   HOUR |
   DAY | DAYOFYEAR |
   WEEK |
   MONTH |
   QUARTER |
   YEAR }
> SELECT timestampadd(HOUR, 7, TIMESTAMP'2022-02-28 00:00:00');
 2022-02-28T07:00:00.000+0000
> SELECT timestampadd(MONTH, -1, TIMESTAMP'2022-03-31 00:00:00');
 2022-02-28T00:00:00.000+0000
> SELECT TIMESTAMPADD(HOUR, +7, CURRENT_TIMESTAMP());
 2022-06-09T09:41:24.763+0000

to_date

Returns expr cast to a date using an optional formatting.

to_date(expr [, fmt] )
> SELECT to_date('2009-07-30 04:17:52');
 2009-07-30
> SELECT to_date('2016-12-31', 'yyyy-MM-dd');
 2016-12-31

to_timestamp

Returns expr cast to a timestamp using an optional formatting.

> SELECT to_timestamp('2016-12-31 00:12:00');
 2016-12-31T00:12:00.000+0000

> SELECT to_timestamp('2016-12-31', 'yyyy-MM-dd');
 2016-12-31T00:00:00.000+0000

> SELECT to_timestamp('2022-06-09 10:41:48' , 'yyyy-MM-dd HH:mm:ss');
 2022-06-09T10:41:48.000+0000

ฟังก์ชันเกี่ยวกับสตริง

substr และ substring

Returns the substring of expr that starts at pos and is of length len.

substr(expr, pos [, len] )
substr(expr FROM pos[ FOR len])

substring(expr, pos [, len])
substring(expr FROM pos [FOR len] ] )
> SELECT substring('Spark SQL', 5);
 k SQL
> SELECT substring('Spark SQL', -3);
 SQL
> SELECT substring('Spark SQL', 5, 1);
 k
> SELECT substring('Spark SQL' FROM 5);
 k SQL
> SELECT substring('Spark SQL' FROM -3);
 SQL
> SELECT substring('Spark SQL' FROM 5 FOR 1);
 k
> SELECT substring('Spark SQL' FROM -10 FOR 5);
 Spar
> SELECT current_timestamp;
 2022-06-16T15:58:43.875+0000
> SELECT cast(current_timestamp() AS STRING);
 2022-06-16 15:58:43.875
> SELECT substring(cast(current_timestamp() AS STRING) FROM -23 FOR 19);
 2022-06-16 15:58:43

md5

Returns an MD5 128-bit checksum of expr as a hex string.

> SELECT md5('Spark');
 8cde774d6f7333752ed72cacddb05126

Concatenation

concat

Returns the concatenation of the arguments.

> SELECT concat('Spark', 'SQL');
 SparkSQL
> SELECT concat(array(1, 2, 3), array(4, 5), array(6));
 [1,2,3,4,5,6]

concat_ws

Returns the concatenation strings separated by sep.

> SELECT concat_ws(' ', 'Spark', 'SQL');
  Spark SQL
> SELECT concat_ws('s');
  ''
> SELECT concat_ws(',', 'Spark', array('S', 'Q', NULL, 'L'), NULL);
  Spark,S,Q,L

array_join

Concatenates the elements of array.

> SELECT array_join(array('hello', 'world'), ' ');
 hello world
> SELECT array_join(array('hello', NULL ,'world'), ' ');
 hello world
> SELECT array_join(array('hello', NULL ,'world'), ' ', ',');
 Hello,world

collect_list

Returns an array consisting of all values in expr within the group.

collect_list ( [ALL | DISTINCT] expr ) [FILTER ( WHERE cond ) ]
> SELECT collect_list(col) FROM VALUES (1), (2), (NULL), (1) AS tab(col);
 [1,2,1]
> SELECT collect_list(DISTINCT col) FROM VALUES (1), (2), (NULL), (1) AS tab(col);
 [1,2]

รวมอาร์เรย์จาก collect_list มาเป็นสตริงด้วย array_join

> SELECT array_join(collect_list(col), ',') 
  FROM VALUES (1), (2), (NULL), (1) AS tab(col);
 1,2,1

ฟังก์ชันเกี่ยวกับตัวเลข

abs

Returns the absolute value of the numeric value in expr.

> SELECT abs(-1);
 1

> SELECT abs(cast(-32768 AS Smallint))
 Error: ARITHMETIC_OVERFLOW

mod

Returns the remainder after dividend / divisor.

> SELECT MOD(2, 1.8);
 0.2

> SELECT MOD(2, 0);
 Error: DIVIDE_BY_ZERO

hash

Returns a hash value of the arguments.

> SELECT hash('Spark', array(123), 2);
 -1321691492
> SELECT hash('Spark', array(123));
 1262264271
> SELECT hash('Spark');
 228093765

crc32

Returns a cyclic redundancy check value of expr.

> SELECT crc32('Spark');
 1557323817