Test ด้วยคำสั่ง assert

ใช้ตัวอย่างไฟล์ json จาก Read data in JSON format

# Required for StructField, StringType, IntegerType, etc.
from pyspark.sql.types import *

jsonSchema = StructType([
  StructField("id", LongType(), True),
  StructField("father", StringType(), True),
  StructField("mother", StringType(), True),
  StructField("children", ArrayType(StringType()), True)
])
jsonFile = "/mnt/training/sample.json"

testDF = (spark.read    # The DataFrameReader
  .schema(jsonSchema)   # Use the specified schema
  .json(jsonFile)       # Creates a DataFrame from JSON after reading in the file
)

ดู Schema และ column type

testDF.printSchema()

columns = testDF.dtypes
print(columns)
root
 |-- id: long (nullable = true)
 |-- father: string (nullable = true)
 |-- mother: string (nullable = true)
 |-- children: array (nullable = true)
 |    |-- element: string (containsNull = true)

[('id', 'bigint'), ('father', 'string'), ('mother', 'string'), ('children', 'array<string>')]

ทดสอบด้วย assert

assert len(columns) == 4, "Expected 4 columns but found " + str(len(columns))

assert columns[0][0] == "id",            "Expected column 0 to be \"id\" but found \"" + columns[0][0] + "\"."
assert columns[0][1] == "bigint",        "Expected column 0 to be of type \"bigint\" but found \"" + columns[0][1] + "\"."

assert columns[1][0] == "father",        "Expected column 0 to be \"father\" but found \"" + columns[0][0] + "\"."
assert columns[1][1] == "string",        "Expected column 0 to be of type \"string\" but found \"" + columns[0][1] + "\"."

assert columns[2][0] == "mother",        "Expected column 0 to be \"father\" but found \"" + columns[0][0] + "\"."
assert columns[2][1] == "string",        "Expected column 0 to be of type \"string\" but found \"" + columns[0][1] + "\"."

assert columns[3][0] == "children",      "Expected column 0 to be \"father\" but found \"" + columns[0][0] + "\"."
assert columns[3][1] == "array<string>", "Expected column 0 to be of type \"array<string>\" but found \"" + columns[0][1] + "\"."

print("Congratulations, all tests passed... that is if no jobs were triggered :-)\n")
Congratulations, all tests passed... that is if no jobs were triggered :-)

Write data

Just as there are many ways to read data, we have just as many ways to write data.

  1. Writing Data

1. Writing Data

  • Writing data to Parquet files

ใช้ตัวอย่างไฟล์ json จาก Read data in JSON format

# Required for StructField, StringType, IntegerType, etc.
from pyspark.sql.types import *

jsonSchema = StructType([
  StructField("id", LongType(), True),
  StructField("father", StringType(), True),
  StructField("mother", StringType(), True),
  StructField("children", StructType([
    StructField("first", StringType(), True),
    StructField("second", StringType(), True),
    StructField("third", StringType(), True)
  ]), True),
])
jsonFile = "/mnt/training/sample2.json"

jsonDF = (spark.read
    .schema(jsonSchema)
    .json(jsonFile)
)
display(jsonDF)

Now that we have a DataFrame, we can write it back out as Parquet files or other various formats.

parquetFile = "/mnt/training/family.parquet"

print("Output location: " + parquetFile)

(jsonDF.write                      # Our DataFrameWriter
  .option("compression", "snappy") # One of none, snappy, gzip, and lzo
  .mode("overwrite")               # Replace existing files
  .parquet(parquetFile)            # Write DataFrame to Parquet files
)

Now that the file has been written out, we can see it in the DBFS:

%fs ls /mnt/training/family.parquet
display(dbutils.fs.ls(parquetFile))

And lastly we can read that same parquet file back in and display the results:

display(spark.read.parquet(parquetFile))

Writing to CSV

ในตัวอย่างนี้ถ้า Write เป็น csv จะ error เพราะ CSV data source does not support struct<first:string,second:string,third:string> data type.

csvFile = "/mnt/training/family.csv"

print("Output location: " + csvFile)

(jsonDF.write                      # Our DataFrameWriter
  .mode("overwrite")               # Replace existing files
  .csv(csvFile)                    # Write DataFrame to Parquet files
)

ถ้าเป็น json นี้ Write เป็น csv จะ error เพราะ CSV data source does not support array<string> data type.

# Required for StructField, StringType, IntegerType, etc.
from pyspark.sql.types import *

jsonSchema = StructType([
  StructField("id", LongType(), True),
  StructField("father", StringType(), True),
  StructField("mother", StringType(), True),
  StructField("children", ArrayType(StringType()), True)
])

Read data stored in tables and views

  1. Databases and tables
  2. Views
  3. Reading from a Table/View
  4. Temporary Views

1. Databases and tables

An Azure Databricks database (schema) is a collection of tables. An Azure Databricks table is a collection of structured data. You can cache, filter, and perform any operations supported by Apache Spark DataFrames on Azure Databricks tables. You can query tables with Spark APIs and Spark SQL.

2. Views

Constructs a virtual table that has no physical data based on the result-set of a SQL query. ALTER VIEW and DROP VIEW only change metadata.

3. Reading from a Table/View

tempDF = spark.read.table("jack_db.jack_table1")
tempDF.printSchema()
display(tempDF)
%sql
SELECT * 
FROM jack_db.jack_table1
LIMIT(5)

Review: Reading from Tables

  • No job is executed – the schema is stored in the table definition on Databricks.
  • The file is stored on the DBFS. If we used JDBC, it would open the connection to the database and read it in. If we used an object store (like what is backing the DBFS), it would read the data from source.

4. Temporary Views

We can create a [temporary] view with createOrReplaceTempView()

# create a temporary view from the resulting DataFrame
tempDF.createOrReplaceTempView("jack_view1")
%sql
SELECT * 
FROM jack_view1 
ORDER BY user_id 
DESC LIMIT (5)

** Note #1: ** The method createOrReplaceTempView(..) is bound to the SparkSession meaning it will be discarded once the session ends.

** Note #2: ** On the other hand, the method createOrReplaceGlobalTempView(..) is bound to the spark application.*

Or to put that another way, I can use createOrReplaceTempView(..) in this notebook only. However, I can call createOrReplaceGlobalTempView(..) in this notebook and then access it from another.

Read data in Parquet format

  1. ตรวจสอบไฟล์ข้อมูล
  2. Read in the Parquet Files
  3. Read in the Parquet Files – Schema

1. ตรวจสอบไฟล์ข้อมูล

ตัวอย่างไฟล์ parquet นำมาจาก kylo/samples/sample-data/parquet at master · Teradata/kylo · GitHub และทำการลบไฟล์ README.txt ออกก่อน ค่อยนำมาทดสอบ

%fs ls /mnt/training/sample_parquet/

2. Read in the Parquet Files

To read in this files, we will specify the location of the parquet directory.

parquetFile = "/mnt/training/sample_parquet/"

tempDF = (spark.read
    .parquet(parquetFile)
)
tempDF.printSchema()
root
 |-- registration_dttm: timestamp (nullable = true)
 |-- id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- ip_address: string (nullable = true)
 |-- cc: string (nullable = true)
 |-- country: string (nullable = true)
 |-- birthdate: string (nullable = true)
 |-- salary: double (nullable = true)
 |-- title: string (nullable = true)
 |-- comments: string (nullable = true)
display(tempDF)

Review: Reading from Parquet Files

  • We do not need to specify the schema – the column names and data types are stored in the parquet files.
  • Only one job is required to read that schema from the parquet file’s metadata.
  • The parquet reader can “read” the schema very quickly because it’s reading that schema from the metadata.

3. Read in the Parquet Files – Schema

If you want to avoid the extra job entirely, we can, again, specify the schema even for parquet files:

** WARNING ** Providing a schema may avoid this one-time hit to determine the DataFrame's schema. However, if you specify the wrong schema it will conflict with the true schema and will result in an analysis exception at runtime.

# Required for StructField, StringType, IntegerType, etc.
from pyspark.sql.types import *

parquetSchema = StructType(
  [
    StructField("registration_dttm", TimestampType(), False),
    StructField("id", IntegerType(), False),
    StructField("first_name", StringType(), False),
    StructField("last_name", StringType(), False),
    StructField("email", StringType(), False),
    StructField("gender", StringType(), False),
    StructField("ip_address", StringType(), False),
    StructField("cc", StringType(), False),
    StructField("country", StringType(), False),
    StructField("birthdate", StringType(), False),
    StructField("salary", DoubleType(), False),
    StructField("title", StringType(), False),
    StructField("comments", StringType(), False)
  ]
)
(spark.read               # The DataFrameReader
  .schema(parquetSchema)  # Use the specified schema
  .parquet(parquetFile)   # Creates a DataFrame from Parquet after reading in the file
  .printSchema()          # Print the DataFrame's schema
)
root
 |-- registration_dttm: timestamp (nullable = true)
 |-- id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- ip_address: string (nullable = true)
 |-- cc: string (nullable = true)
 |-- country: string (nullable = true)
 |-- birthdate: string (nullable = true)
 |-- salary: double (nullable = true)
 |-- title: string (nullable = true)
 |-- comments: string (nullable = true)

In most/many cases, people do not provide the schema for Parquet files because reading in the schema is such a cheap process.

Read data in JSON format

  1. ตรวจสอบไฟล์ข้อมูล
  2. Reading from JSON – InferSchema
  3. Reading from JSON – User-Defined Schema

1. ตรวจสอบไฟล์ข้อมูล

We can use %fs ls … to view the file on the DBFS.

%fs ls dbfs:/mnt/training/sample.json

We can use %fs head … to peek at the first couple thousand characters of the file.

%fs head /mnt/training/sample.json
{"id":1,"father":"Mark","mother":"Charlotte","children":["Tom"]}
{"id":2,"father":"John","mother":"Ann","children":["Jessika","Antony","Jack"]}
{"id":3,"father":"Bob","mother":"Monika","children":["Jerry","Karol"]}

และ

%fs head /mnt/training/sample2.json
{"id":1,"father":"Mark","mother":"Charlotte","children":{"first":"Tom1"}}
{"id":2,"father":"John","mother":"Ann","children":{"first":"Jack1","second":"Jack2"}}
{"id":3,"father":"Bob","mother":"Monika","children":{"first":"Karol1","second":"Karol2","third":"Karol3"}}
CREATE OR REPLACE TEMP VIEW tmp_json 
AS 
SELECT * 
FROM   JSON.`path/to/file.json` 

2. Reading from JSON – InferSchema

JSON Lines

  • That there is one JSON object per line and…
  • That it’s delineated by a new-line.

This format is referred to as JSON Lines or newline-delimited JSON. More information about this format can be found at http://jsonlines.org.

** Note: ** Spark 2.2 was released on July 11th 2016. With that comes File IO improvements for CSV & JSON, but more importantly, Support for parsing multi-line JSON and CSV files. You can read more about that (and other features in Spark 2.2) in the Databricks Blog.

Read The JSON File ตัวอย่าง 1

The command to read in JSON looks very similar to that of CSV.

jsonFile = "/mnt/training/sample.json"
tempDF = (spark.read           # The DataFrameReader
   .option("inferSchema", "true")   # Automatically infer data types & column names
   .json(jsonFile)              # Creates a DataFrame from CSV after reading in the file
)
tempDF.printSchema()
root
 |-- children: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- father: string (nullable = true)
 |-- id: long (nullable = true)
 |-- mother: string (nullable = true)

With our DataFrame created, we can now take a peak at the data. But to demonstrate a unique aspect of JSON data (or any data with embedded fields), we will first create a temporary view and then view the data via SQL:

# create a view called temp_view
tempDF.createOrReplaceTempView("temp_view")

And now we can take a peak at the data with simple SQL SELECT statement:

%sql
SELECT * FROM temp_view
%sql
SELECT id, father, mother, children
FROM temp_view

Read The JSON File ตัวอย่าง 2

jsonFile2 = "/mnt/training/sample2.json"

tempDF2 = (spark.read           # The DataFrameReader
   .option("inferSchema", "true")   # Automatically infer data types & column names
   .json(jsonFile2)              # Creates a DataFrame from CSV after reading in the file
)
tempDF2.printSchema()
root
 |-- children: struct (nullable = true)
 |    |-- first: string (nullable = true)
 |    |-- second: string (nullable = true)
 |    |-- third: string (nullable = true)
 |-- father: string (nullable = true)
 |-- id: long (nullable = true)
 |-- mother: string (nullable = true)
# create a view called temp_view2
tempDF2.createOrReplaceTempView("temp_view2")
%sql
SELECT * FROM temp_view2
%sql
SELECT id, children.first, children.second, children.third 
FROM temp_view2 
WHERE children.second IS NOT NULL

Review: Reading from JSON – InferSchema

  • We only need one job even when inferring the schema.
  • There is no header which is why there isn’t a second job in this case – the column names are extracted from the JSON object’s attributes.

3. Reading from JSON – User-Defined Schema

To avoid the extra job, we can (just like we did with CSV) specify the schema for the DataFrame.

 ตัวอย่าง 1 Step #1 – Create the Schema

Note that we can support complex data types as seen in the field children.

# Required for StructField, StringType, IntegerType, etc.
from pyspark.sql.types import *

jsonSchema = StructType([
  StructField("id", LongType(), True),
  StructField("father", StringType(), True),
  StructField("mother", StringType(), True),
  StructField("children", ArrayType(StringType()), True)
])

For a small file, manually creating the the schema may not be worth the effort. However, for a large file, the time to manually create the schema may be worth the trade off of a really long infer-schema process.

 ตัวอย่าง 1 Step #2 – Read in the JSON

Next, we will read in the JSON file and print its schema.

(spark.read             # The DataFrameReader
  .schema(jsonSchema)   # Use the specified schema
  .json(jsonFile)       # Creates a DataFrame from JSON after reading in the file
  .printSchema()
)
root
 |-- id: long (nullable = true)
 |-- father: string (nullable = true)
 |-- mother: string (nullable = true)
 |-- children: array (nullable = true)
 |    |-- element: string (containsNull = true)

 ตัวอย่าง 2 Step #1 – Create the Schema

# Required for StructField, StringType, IntegerType, etc.
from pyspark.sql.types import *

jsonSchema2 = StructType([
  StructField("id", LongType(), True),
  StructField("father", StringType(), True),
  StructField("mother", StringType(), True),
  StructField("children", StructType([
    StructField("first", StringType(), True),
    StructField("second", StringType(), True),
    StructField("third", StringType(), True)
  ]), True),
])

 ตัวอย่าง 2 Step #2 – Read in the JSON

(spark.read             # The DataFrameReader
  .schema(jsonSchema2)  # Use the specified schema
  .json(jsonFile2)      # Creates a DataFrame from JSON after reading in the file
  .printSchema()
)
root
 |-- id: long (nullable = true)
 |-- father: string (nullable = true)
 |-- mother: string (nullable = true)
 |-- children: struct (nullable = true)
 |    |-- first: string (nullable = true)
 |    |-- second: string (nullable = true)
 |    |-- third: string (nullable = true)

Review: Reading from JSON – User-Defined Schema

  • Just like CSV, providing the schema avoids the extra jobs.
  • The schema allows us to rename columns and specify alternate data types.

คำสั่ง display()

tempDF2 = (spark.read
  .schema(jsonSchema2)
  .json(jsonFile2)
)
display(tempDF2)

Read data in CSV format

  1. ตรวจสอบไฟล์ข้อมูล
  2. Reading from CSV – InferSchema
  3. Reading from CSV – User-Defined Schema
  4. Reading from multiple CSV

1. ตรวจสอบไฟล์ข้อมูล

We can use %fs ls … to view the file on the DBFS.

%fs ls /mnt/training/

We can use %fs head … to peek at the first couple thousand characters of the file.

%fs head /mnt/training/sample.tsv
ID	Name	Weight
1	Man1	61
2	Man2	62
3	Man3	63
4	Man4	64
5	Man5	65
6	Man6	66
7	Man7	67
8	Man8	68
9	Man9	69

2. Reading from CSV – InferSchema

Step #1 – Read The CSV File

Let’s start with the bare minimum by specifying the tab character as the delimiter and the location of the file:

# A reference to our tab-separated-file
csvFile = "/mnt/training/sample.tsv"

tempDF = (spark.read           # The DataFrameReader
   .option("sep", "\t")        # Use tab delimiter (default is comma-separator)
   .csv(csvFile)               # Creates a DataFrame from CSV after reading in the file
)

This is guaranteed to trigger one job. A Job is triggered anytime we are “physically” required to touch the data. In some cases, one action may create multiple jobs (multiple reasons to touch the data). In this case, the reader has to “peek” at the first line of the file to determine how many columns of data we have.

We can see the structure of the DataFrame by executing the command printSchema()

tempDF.printSchema()
root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)

the column names _c0_c1, and _c2 (automatically generated names)

display(tempDF)
รายการแรกกลายเป็นข้อมูลไปด้วย แทนที่จะเป็น header

Step #2 – Use the File’s Header

tempDF2 = (spark.read          # The DataFrameReader
   .option("sep", "\t")        # Use tab delimiter (default is comma-separator)
   .option("header", "true")   # Use first line of all files as header
   .csv(csvFile)               # Creates a DataFrame from CSV after reading in the file
)
tempDF2.printSchema()
root
 |-- ID: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Weight: string (nullable = true)
display(tempDF2)
รายการแรกเป็น header ถูกต้องละ

Step #3 – Infer the Schema

Lastly, we can add an option that tells the reader to infer each column’s data type (aka the schema)

tempDF3 = (spark.read              # The DataFrameReader
   .option("header", "true")       # Use first line of all files as header
   .option("sep", "\t")            # Use tab delimiter (default is comma-separator)
   .option("inferSchema", "true")  # Automatically infer data types
   .csv(csvFile)                   # Creates a DataFrame from CSV after reading in the file
)
tempDF3.printSchema()
root
 |-- ID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Weight: integer (nullable = true)

Two Spark jobs were executed (not one as in the previous example)

3. Reading from CSV – User-Defined Schema

The difference here is that we are going to define the schema beforehand and hopefully avoid the execution of any extra jobs.

Step #1

Declare the schema. This is just a list of field names and data types.

# Required for StructField, StringType, IntegerType, etc.
from pyspark.sql.types import *

csvSchema = StructType([
  StructField("ID", IntegerType(), False),
  StructField("Name", StringType(), False),
  StructField("Weight", IntegerType(), False)
])

Step #2

Read in our data (and print the schema). We can specify the schema, or rather the StructType, with the schema(..) command:

tempDF4 = (spark.read         # The DataFrameReader
  .option('header', 'true')   # Ignore line #1 - it's a header
  .option('sep', "\t")        # Use tab delimiter (default is comma-separator)
  .schema(csvSchema)          # Use the specified schema
  .csv(csvFile)               # Creates a DataFrame from CSV after reading in the file
)
tempDF4.printSchema()
root
 |-- ID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Weight: integer (nullable = true)

Zero Spark jobs were executed

4. Reading from multiple CSV

tempDF = (spark.read
   .option("header", "true")
   .csv("/mnt/training/*.csv")
)
tempDF = (spark.read
   .format("csv")
   .option("header", "true")
   .load("/mnt/training/*.csv")
)

Databricks

Databricks on AWS

DROP DATABASE

While usage of SCHEMA and DATABASE is interchangeable, SCHEMA is preferred.

Related articles

DROP VIEW (Databricks SQL)

DROP VIEW [ IF EXISTS ] view_name

Related articles

Other

ดูเวอร์ชันของ Databricks Runtime

%scala
dbutils.notebook.getContext.tags("sparkVersion")
// res1: String = 10.4.x-scala2.12
%python
spark.conf.get("spark.databricks.clusterUsageTags.sparkVersion")
# '10.4.x-scala2.12'

Magic Commands

Magic Command: %sh

%sh เป็นการใช้คำสั่ง shell

%sh ps
%sh ps | grep 'java'

การอ้างถึงพาทต่างๆของ %sh เป็นพาทของ shell

%sh ls -l /

ต่างกับ %fs ที่การอ้างพาท เป็นพาทของ dbfs

%fs ls /

Databricks File System – DBFS

  • DBFS is a layer over a cloud-based object store
  • Files in DBFS are persisted to the object store
  • The lifetime of files in the DBFS are NOT tied to the lifetime of our cluster

Magic Command: Other Languages

%python

print("Hello Python!")
%scala

println("Hello Scala!")
%sql

select "Hello SQL!"
%r

print("Hello R!", quote=FALSE)

Magic Command: %md

%md
%md
### Magic Command: &percnt;md

Our favorite Magic Command **&percnt;md** allows us to render Markdown in a cell:
* Double click this cell to begin editing it
* Then hit `Esc` to stop editing

# Title One
## Title Two
### Title Three

This is a test of the emergency broadcast system. This is only a test.

This is text with a **bold** word in it.

This is text with an *italicized* word in it.

This is an ordered list
0. once
0. two
0. three

This is an unordered list
* apples
* peaches
* bananas

Links/Embedded HTML: <a href="http://bfy.tw/19zq" target="_blank">What is Markdown?</a>

Images:
![Spark Engines](https://files.training.databricks.com/images/Apache-Spark-Logo_TM_200px.png)

And of course, tables:

| Name  | Age | Sex    |
|-------|-----|--------|
| Tom   | 32  | Male   |
| Mary  | 29  | Female |
| Dick  | 73  | Male   |
| Sally | 55  | Female |

Magic Command: %run

  • You can run a notebook from another notebook by using the Magic Command %run
  • All variables & functions defined in that other notebook will become available in your current notebook
%run "./Includes/Classroom-Setup"

Magic Command: %fs

It is a wrapper around dbutils.fs and it is the Magic Command known as %fs.

The following call is equivalent to the display( dbutils.fs.ls("/mnt/training") ) – there is no real difference between the two.

%fs ls /mnt/training
%fs head /mnt/training/pageviews_by_second.tsv

Databricks Utilities (dbutils)

  1. List available utilities
  2. Data utility (dbutils.data)
  3. File system utility (dbutils.fs)
  4. Library utility (dbutils.library)
  5. Notebook utility (dbutils.notebook)
  6. Secrets utility (dbutils.secrets)
  7. Widgets utility (dbutils.widgets)
  8. Databricks Utilities API library
  9. Limitations

Databricks Utilities – dbutils

  • You can access the DBFS through the Databricks Utilities class (and other file IO routines).
  • An instance of DBUtils is already declared for us as dbutils.
  • For in-notebook documentation on DBUtils you can execute the command dbutils.help().

1. List available utilities

dbutils.help()

List available commands for a utility

dbutils.help?
dbutils.fs.help()

Display help for a command

dbutils.fs.help('cp')

2. Data utility (dbutils.data)

Commandssummarize

dbutils.data.help()

summarize command (dbutils.data.summarize)

Calculates and displays summary statistics of an Apache Spark DataFrame or pandas DataFrame. This command is available for Python, Scala and R.

df = spark.read.format('csv').load(
  '/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv',
  header=True,
  inferSchema=True
)
dbutils.data.summarize(df)

Note that the visualization uses SI notation to concisely render numerical values smaller than 0.01 or larger than 10000.

3. File system utility (dbutils.fs)

CommandscpheadlsmkdirsmountmountsmvputrefreshMountsrmunmountupdateMount

cp command (dbutils.fs.cp)

Copies a file or directory, possibly across filesystems.

dbutils.fs.cp("/FileStore/old_file.txt", "/tmp/new/new_file.txt")

head command (dbutils.fs.head)

Returns up to the specified maximum number bytes of the given file. The bytes are returned as a UTF-8 encoded string.

dbutils.fs.head("/tmp/my_file.txt", 25)

ls command (dbutils.fs.ls)

Lists the contents of a directory.

dbutils.fs.ls("/tmp")
files = dbutils.fs.ls("/mnt/training/")

for fileInfo in files:
  print(fileInfo.path)

print("-"*80)

display(..)

Besides printing each item returned from dbutils.fs.ls(..) we can also pass that collection to another Databricks specific command called display(..).

files = dbutils.fs.ls("/mnt/training/")

display(files)

mkdirs command (dbutils.fs.mkdirs)

Creates the given directory if it does not exist. Also creates any necessary parent directories.

dbutils.fs.mkdirs("/tmp/parent/child/grandchild")

mount command (dbutils.fs.mount)

Mounts the specified source directory into DBFS at the specified mount point.

dbutils.fs.mount(
  source = "wasbs://<container-name>@<storage-account-name>.blob.core.windows.net",
  mount_point = "/mnt/<mount-name>",
  extra_configs = {"<conf-key>":dbutils.secrets.get(scope = "<scope-name>", key = "<key-name>")})

mounts command (dbutils.fs.mounts)

Displays information about what is currently mounted within DBFS.

dbutils.fs.mounts()
mounts = dbutils.fs.mounts()

for mount in mounts:
  print(mount.mountPoint + " >> " + mount.source)

print("-"*80)

mv command (dbutils.fs.mv)

Moves a file or directory, possibly across filesystems. A move is a copy followed by a delete, even for moves within filesystems.

dbutils.fs.mv("/FileStore/my_file.txt", "/tmp/parent/child/grandchild")

put command (dbutils.fs.put)

Writes the specified string to a file. The string is UTF-8 encoded.

dbutils.fs.put("/tmp/hello_db.txt", "Hello, Databricks!", True)

refreshMounts command (dbutils.fs.refreshMounts)

Forces all machines in the cluster to refresh their mount cache, ensuring they receive the most recent information.

dbutils.fs.refreshMounts()

rm command (dbutils.fs.rm)

Removes a file or directory.

dbutils.fs.rm("/tmp/hello_db.txt")

unmount command (dbutils.fs.unmount)

Deletes a DBFS mount point.

dbutils.fs.unmount("/mnt/<mount-name>")

updateMount command (dbutils.fs.updateMount)

Similar to the dbutils.fs.mount command, but updates an existing mount point instead of creating a new one. Returns an error if the mount point is not present.

dbutils.fs.updateMount(
  source = "wasbs://<container-name>@<storage-account-name>.blob.core.windows.net",
  mount_point = "/mnt/<mount-name>",
  extra_configs = {"<conf-key>":dbutils.secrets.get(scope = "<scope-name>", key = "<key-name>")})

4. Library utility (dbutils.library)

The library utility is deprecated.

Utilities for session isolated libraries

CommandsinstallinstallPyPIlistrestartPythonupdateCondaEnv

5. Notebook utility (dbutils.notebook)

Commandsexitrun

The notebook utility allows you to chain together notebooks and act on their results. See Notebook workflows.

dbutils.notebook.help()

6. Secrets utility (dbutils.secrets)

CommandsgetgetByteslistlistScopes

The secrets utility allows you to store and access sensitive credential information without making them visible in notebooks. See Secret management and Use the secrets in a notebook. To list the available commands, run dbutils.secrets.help().

dbutils.secrets.help()

7. Widgets utility (dbutils.widgets)

CommandscomboboxdropdowngetgetArgumentmultiselectremoveremoveAlltext

The widgets utility allows you to parameterize notebooks. See Widgets. To list the available commands, run dbutils.widgets.help().

combobox command (dbutils.widgets.combobox)

Creates and displays a combobox widget with the specified programmatic name, default value, choices, and optional label.

dbutils.widgets.combobox(
  name='fruits_combobox',
  defaultValue='banana',
  choices=['apple', 'banana', 'coconut', 'dragon fruit'],
  label='Fruits'
)

print(dbutils.widgets.get("fruits_combobox"))

# banana

dropdown command (dbutils.widgets.dropdown)

Creates and displays a dropdown widget with the specified programmatic name, default value, choices, and optional label.

dbutils.widgets.dropdown(
  name='toys_dropdown',
  defaultValue='basketball',
  choices=['alphabet blocks', 'basketball', 'cape', 'doll'],
  label='Toys'
)

print(dbutils.widgets.get("toys_dropdown"))

# basketball

get command (dbutils.widgets.get)

Gets the current value of the widget with the specified programmatic name. This programmatic name can be either:

dbutils.widgets.get('fruits_combobox')

# banana

getArgument command (dbutils.widgets.getArgument)

Gets the current value of the widget with the specified programmatic name. If the widget does not exist, an optional message can be returned.

This command is deprecated. Use dbutils.widgets.get instead.

dbutils.widgets.getArgument('fruits_combobox', 'Error: Cannot find fruits combobox')

# Deprecation warning: Use dbutils.widgets.text() or dbutils.widgets.dropdown() to create a widget and dbutils.widgets.get() to get its bound value.
# Out[3]: 'banana'

multiselect command (dbutils.widgets.multiselect)

Creates and displays a multiselect widget with the specified programmatic name, default value, choices, and optional label.

dbutils.widgets.multiselect(
  name='days_multiselect',
  defaultValue='Tuesday',
  choices=['Monday', 'Tuesday', 'Wednesday', 'Thursday',
    'Friday', 'Saturday', 'Sunday'],
  label='Days of the Week'
)

print(dbutils.widgets.get("days_multiselect"))

# Tuesday

remove command (dbutils.widgets.remove)

Removes the widget with the specified programmatic name.

If you add a command to remove a widget, you cannot add a subsequent command to create a widget in the same cell. You must create the widget in another cell.

dbutils.widgets.remove('fruits_combobox')

removeAll command (dbutils.widgets.removeAll)

Removes all widgets from the notebook.

If you add a command to remove all widgets, you cannot add a subsequent command to create any widgets in the same cell. You must create the widgets in another cell.

dbutils.widgets.removeAll()

text command (dbutils.widgets.text)

Creates and displays a text widget with the specified programmatic name, default value, and optional label.

dbutils.widgets.text(
  name='your_name_text',
  defaultValue='Enter your name',
  label='Your name'
)

print(dbutils.widgets.get("your_name_text"))

# Enter your name

8. Databricks Utilities API library

To accelerate application development, it can be helpful to compile, build, and test applications before you deploy them as production jobs. To enable you to compile against Databricks Utilities, Databricks provides the dbutils-api library. 

9. Limitations

Calling dbutils inside of executors can produce unexpected results or potentially result in errors.

If you need to run file system operations on executors using dbutils, there are several faster and more scalable alternatives available:

For information about executors, see Cluster Mode Overview on the Apache Spark website.

ติดตั้ง Azkaban 3.84.14 บน Ubuntu 18.04.5 LTS

Link

อันนี้ติดตั้งแบบ Solo Server

In solo server mode, the DB is embedded H2 and both web server and executor server run in the same process.

The multiple executor mode is for most serious production environment. Its DB should be backed by MySQL instances with master-slave set up. The web server and executor servers should ideally run in different hosts

ติดตั้ง Azkaban 3.84.14 บน Ubuntu 18.04.5 LTS

และลอง ติดตั้ง Azkaban 3.90.0 บน Ubuntu 20.04.6 LTS ได้

แต่ลอง ติดตั้ง Azkaban 4.0.0 บน Ubuntu 20.04.6 LTS ไม่ได้ ติดตอน bin/start-solo.sh แล้ว error No implementation for azkaban.imagemgmt.services.ImageRampupService was bound.

อัพเดทแพกเกจ

sudo apt update

ถ้าเดิมมี ติดตั้ง Java 11 ให้เอาออกก่อน (Uninstall OpenJDK in Ubuntu. Though the Ubuntu environments comes… | by Hirosh Tharaka (BSc.Hons, CTFL) | Medium)

$ sudo apt-get purge --auto-remove openjdk*

ติดตั้ง Java 8

sudo apt install openjdk-8-jdk

ติดตั้ง Hadoop

ถ้าใช้ Hadoop ก็ ติดตั้ง Hadoop in Stand-Alone Mode บน Ubuntu 18.04

ติดตั้ง Azkaban

ดาว์นโหลด Azkaban 3.84.14

git clone https://github.com/phaisarnsut/azkaban3.84.14.git
หรือ
git clone https://github.com/phaisarnsut/azkaban3.90.0.git
$ mv azkaban3.84.14/ azkaban
$ cd azkaban/
$ chmod +x gradlew

Build

แก้ไขไฟล์ azkaban-web-server/build.gradle จาก

distBaseUrl = 'https://nodejs.org/dist'

เป็น

distBaseUrl = 'https://direct.nodejs.org/dist/'
$ ./gradlew distZip
Downloading https://services.gradle.org/distributions/gradle-4.6-all.zip
...

unzip Azkaban solo server

sudo apt install unzip
$ unzip azkaban-solo-server/build/distributions/azkaban-solo-server-0.1.0-SNAPSHOT.zip

สร้างไฟล์ config

หรือ copy ตัวอย่างมาจาก azkaban-solo-server/src/main/resources/conf/ หรือ azkaban/azkaban-solo-server/src/main/resources/conf at master · azkaban/azkaban · GitHub

$ cd azkaban-solo-server-0.1.0-SNAPSHOT/
$ mkdir conf
$ cd conf

สร้างไฟล์ azkaban.properties

# Azkaban Personalization Settings
azkaban.name=Test
azkaban.label=My Local Azkaban
azkaban.color=#FF3601
azkaban.default.servlet.path=/index
web.resource.dir=web/
default.timezone.id=America/Los_Angeles
# Azkaban UserManager class
user.manager.class=azkaban.user.XmlUserManager
user.manager.xml.file=conf/azkaban-users.xml
# Loader for projects
executor.global.properties=conf/global.properties
azkaban.project.dir=projects
database.type=h2
# h2.path=./h2
h2.path=.
# h2.create.tables=true
# Velocity dev mode
velocity.dev.mode=false
# Azkaban Jetty server properties.
jetty.use.ssl=false
jetty.maxThreads=25
jetty.port=8081
# Azkaban Executor settings
executor.port=12321
# mail settings
mail.sender=
mail.host=
# User facing web server configurations used to construct the user facing server URLs. They are useful when there is a reverse proxy between Azkaban web servers and users.
# enduser -> myazkabanhost:443 -> proxy -> localhost:8081
# when this parameters set then these parameters are used to generate email links.
# if these parameters are not set then jetty.hostname, and jetty.port(if ssl configured jetty.ssl.port) are used.
# azkaban.webserver.external_hostname=myazkabanhost.com
# azkaban.webserver.external_ssl_port=443
# azkaban.webserver.external_port=8081
job.failure.email=
job.success.email=
lockdown.create.projects=false
cache.directory=cache
# JMX stats
jetty.connector.stats=true
executor.connector.stats=true
# Azkaban plugin settings
azkaban.jobtype.plugin.dir=plugins/jobtypes
# Number of executions to be displayed
azkaban.display.execution_page_size=16
azkaban.use.multiple.executors=true
azkaban.executor.runtimeProps.override.eager=false
# Azkaban Ramp Feature Configuration
#Ramp Feature Related
azkaban.ramp.enabled=true
azkaban.ramp.status.polling.enabled=true
azkaban.ramp.status.polling.interval.min=30
azkaban.ramp.status.push.interval.threshold=15
azkaban.ramp.status.pull.interval.threshold=100

สร้างไฟล์ azkaban-users.xml

<azkaban-users>
  <user groups="azkaban" password="azkaban" roles="admin" username="azkaban"/>
  <user password="metrics" roles="metrics" username="metrics"/>

  <role name="admin" permissions="ADMIN"/>
  <role name="metrics" permissions="METRICS"/>
</azkaban-users>

สร้างไฟล์เปล่า global.properties

touch global.properties
cd ..

Start the server

$ cd azkaban-solo-server-0.1.0-SNAPSHOT/
$ pwd
/home/jack/Downloads/azkaban/azkaban-solo-server-0.1.0-SNAPSHOT

Start the solo server:

$ bin/start-solo.sh

ดูว่า start ขึ้นมั๊ย

$ cat soloServerLog_*
เช่น
$ cat soloServerLog__2022-05-05+14:52:27.out

Stop server

$ bin/shutdown-solo.sh

ทดลองเรียกไปที่ http://localhost:8081/

ตัวอย่าง Flow แบบง่าย

สร้างไฟล์ flow20.project

azkaban-flow-version: 2.0

สร้างไฟล์ basic.flow

nodes:
  - name: jobA
    type: command
    config:
      command: echo "This is an echoed text."

ตัวอย่าง Flow แบบมี Dependencies

แก้ไขไฟล์ basic.flow

nodes:
  - name: jobC
    type: noop
    # jobC depends on jobA and jobB
    dependsOn:
      - jobA
      - jobB

  - name: jobA
    type: command
    config:
      command: echo "This is an echoed text."

  - name: jobB
    type: command
    config:
      command: pwd

type: noop คือ A job that takes no parameters and is essentially a null operation. Used for organizing your graph. (Azkaban 3.0 Documentation #builtin-jobtypes)

ตัวอย่าง Flow แบบมี Dependencies เป็น flow

nodes:
  - name: jobC
    type: noop
    dependsOn:
      - embedded_flow

  - name: embedded_flow
    type: flow
    config:
      flow.num.job.threads: 3
    nodes:
      - name: job01
        type: command
        config:
          command: sleep 10

      - name: job02
        type: command
        config:
          command: sleep 10

      - name: job03
        type: command
        config:
          command: sleep 10

      - name: job04
        type: command
        config:
          command: sleep 10

      - name: job05
        type: command
        config:
          command: sleep 10

      - name: job06
        type: command
        config:
          command: sleep 10

กำหนดจำนวน thread ที่ flow.num.job.threads

การสั่งให้ job รันมากกว่า 1 คำสั่ง

ถ้าใส่มากกว่า 1 คำสั่งเข้าไปตรงๆ จะ error เช่น

command: cd /home/jack/; ./run_some_job

ให้ปรับเป็นใส่คำสั่งเรียก shell script แทน เช่น

command: /home/jack/run_some_shell.sh

แล้วใส่คำสั่งใน run_some_shell.sh แทน

cd /home/jack/
./run_some_job