Databricks recommends that you use dbx by Databricks Labs for local development instead of Databricks Connect. Databricks plans no new feature development for Databricks Connect at this time. Also, be aware of the limitations of Databricks Connect.
Requirements
Databricks Runtime version | Python version |
---|---|
10.4 LTS ML, 10.4 LTS | 3.8 |
9.1 LTS ML, 9.1 LTS | 3.8 |
7.3 LTS ML, 7.3 LTS | 3.7 |
6.4 ML, 6.4 | 3.7 |
The Databricks Connect major and minor package version must always match your Databricks Runtime version.
Java Runtime Environment (JRE) 8. The client has been tested with the OpenJDK 8 JRE. The client does not support Java 11.
Set up the client
Step 1: Install the client
Uninstall PySpark. This is required because the databricks-connect
package conflicts with PySpark. For details, see Conflicting PySpark installations.
pip uninstall pyspark
Install the Databricks Connect client.
pip install -U "databricks-connect==10.4.*" # or X.Y.* to match your cluster version.
Step 2: Configure connection properties
databricks-connect configure
Do you accept the above agreement? [y/N] y Set new config values (leave input empty to accept default): Databricks Host [no current value, must start with https://]: <databricks-url> Databricks Token [no current value]: <databricks-token> Cluster ID (e.g., 0921-001415-jelly628) [no current value]: <cluster-id> Org ID (Azure-only, see ?o=orgId in URL) [0]: <org-id> Port [15001]: <port>
Parameter | SQL config key | Environment variable name |
---|---|---|
Databricks Host | spark.databricks.service.address | DATABRICKS_ADDRESS |
Databricks Token | spark.databricks.service.token | DATABRICKS_API_TOKEN |
Cluster ID | spark.databricks.service.clusterId | DATABRICKS_CLUSTER_ID |
Org ID | spark.databricks.service.orgId | DATABRICKS_ORG_ID |
Port | spark.databricks.service.port | DATABRICKS_PORT |
Test connectivity to Databricks.
databricks-connect test
If the cluster you configured is not running, the test starts the cluster which will remain running until its configured autotermination time. The output should be something like:
* PySpark is installed at /.../3.5.6/lib/python3.5/site-packages/pyspark * Checking java version java version "1.8.0_152" Java(TM) SE Runtime Environment (build 1.8.0_152-b16) Java HotSpot(TM) 64-Bit Server VM (build 25.152-b16, mixed mode) * Testing scala command 18/12/10 16:38:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 18/12/10 16:38:50 WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set. 18/12/10 16:39:53 WARN SparkServiceRPCClient: Now tracking server state for 5abb7c7e-df8e-4290-947c-c9a38601024e, invalidating prev state 18/12/10 16:39:59 WARN SparkServiceRPCClient: Syncing 129 files (176036 bytes) took 3003 ms Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.4.0-SNAPSHOT /_/ Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_152) Type in expressions to have them evaluated. Type :help for more information. scala> spark.range(100).reduce(_ + _) Spark context Web UI available at https://10.8.5.214:4040 Spark context available as 'sc' (master = local[*], app id = local-1544488730553). Spark session available as 'spark'. View job details at <databricks-url>/?o=0#/setting/clusters/<cluster-id>/sparkUi View job details at <databricks-url>?o=0#/setting/clusters/<cluster-id>/sparkUi res0: Long = 4950 scala> :quit * Testing python command 18/12/10 16:40:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 18/12/10 16:40:17 WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set. 18/12/10 16:40:28 WARN SparkServiceRPCClient: Now tracking server state for 5abb7c7e-df8e-4290-947c-c9a38601024e, invalidating prev state View job details at <databricks-url>/?o=0#/setting/clusters/<cluster-id>/sparkUi
Run examples
from pyspark.sql import SparkSession from pyspark.sql.types import * from datetime import date spark = SparkSession.builder.appName('temps-demo').getOrCreate() # Create a Spark DataFrame consisting of high and low temperatures # by airport code and date. schema = StructType([ StructField('AirportCode', StringType(), False), StructField('Date', DateType(), False), StructField('TempHighF', IntegerType(), False), StructField('TempLowF', IntegerType(), False) ]) data = [ [ 'BLI', date(2021, 4, 3), 52, 43], [ 'BLI', date(2021, 4, 2), 50, 38], [ 'BLI', date(2021, 4, 1), 52, 41], [ 'PDX', date(2021, 4, 3), 64, 45], [ 'PDX', date(2021, 4, 2), 61, 41], [ 'PDX', date(2021, 4, 1), 66, 39], [ 'SEA', date(2021, 4, 3), 57, 43], [ 'SEA', date(2021, 4, 2), 54, 39], [ 'SEA', date(2021, 4, 1), 56, 41] ] temps = spark.createDataFrame(data, schema) # Create a table on the Databricks cluster and then fill # the table with the DataFrame's contents. # If the table already exists from a previous run, # delete it first. spark.sql('USE default') spark.sql('DROP TABLE IF EXISTS demo_temps_table') temps.write.saveAsTable('demo_temps_table') # Query the table on the Databricks cluster, returning rows # where the airport code is not BLI and the date is later # than 2021-04-01. Group the results and order by high # temperature in descending order. df_temps = spark.sql("SELECT * FROM demo_temps_table " \ "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " \ "GROUP BY AirportCode, Date, TempHighF, TempLowF " \ "ORDER BY TempHighF DESC") df_temps.show() # Results: # # +-----------+----------+---------+--------+ # |AirportCode| Date|TempHighF|TempLowF| # +-----------+----------+---------+--------+ # | PDX|2021-04-03| 64| 45| # | PDX|2021-04-02| 61| 41| # | SEA|2021-04-03| 57| 43| # | SEA|2021-04-02| 54| 39| # +-----------+----------+---------+--------+ # Clean up by deleting the table from the Databricks cluster. spark.sql('DROP TABLE demo_temps_table')
Work with dependencies
from lib import Foo from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() sc = spark.sparkContext #sc.setLogLevel("INFO") print("Testing simple count") print(spark.range(100).count()) print("Testing addPyFile isolation") sc.addPyFile("lib.py") print(sc.parallelize(range(10)).map(lambda i: Foo(2)).collect()) class Foo(object): def __init__(self, x): self.x = x
Access DBUtils
from pyspark.sql import SparkSession from pyspark.dbutils import DBUtils spark = SparkSession.builder.getOrCreate() dbutils = DBUtils(spark) print(dbutils.fs.ls("dbfs:/")) print(dbutils.secrets.listScopes())
When using Databricks Runtime 7.3 LTS or above, to access the DBUtils module in a way that works both locally and in Databricks clusters, use the following get_dbutils()
:
def get_dbutils(spark): from pyspark.dbutils import DBUtils return DBUtils(spark)
Otherwise, use the following get_dbutils()
:
def get_dbutils(spark): if spark.conf.get("spark.databricks.service.client.enabled") == "true": from pyspark.dbutils import DBUtils return DBUtils(spark) else: import IPython return IPython.get_ipython().user_ns["dbutils"]
Copying files between local and remote filesystems
from pyspark.dbutils import DBUtils dbutils = DBUtils(spark) dbutils.fs.cp('file:/home/user/data.csv', 'dbfs:/uploads') dbutils.fs.cp('dbfs:/output/results.csv', 'file:/home/user/downloads/')
Enable dbutils.secrets.get
Because of security restrictions, the ability to call dbutils.secrets.get
is disabled by default. Contact Databricks support to enable this feature for your workspace.