Read data in CSV format

  1. ตรวจสอบไฟล์ข้อมูล
  2. Reading from CSV – InferSchema
  3. Reading from CSV – User-Defined Schema
  4. Reading from multiple CSV

1. ตรวจสอบไฟล์ข้อมูล

We can use %fs ls … to view the file on the DBFS.

%fs ls /mnt/training/

We can use %fs head … to peek at the first couple thousand characters of the file.

%fs head /mnt/training/sample.tsv
ID	Name	Weight
1	Man1	61
2	Man2	62
3	Man3	63
4	Man4	64
5	Man5	65
6	Man6	66
7	Man7	67
8	Man8	68
9	Man9	69

2. Reading from CSV – InferSchema

Step #1 – Read The CSV File

Let’s start with the bare minimum by specifying the tab character as the delimiter and the location of the file:

# A reference to our tab-separated-file
csvFile = "/mnt/training/sample.tsv"

tempDF = (spark.read           # The DataFrameReader
   .option("sep", "\t")        # Use tab delimiter (default is comma-separator)
   .csv(csvFile)               # Creates a DataFrame from CSV after reading in the file
)

This is guaranteed to trigger one job. A Job is triggered anytime we are “physically” required to touch the data. In some cases, one action may create multiple jobs (multiple reasons to touch the data). In this case, the reader has to “peek” at the first line of the file to determine how many columns of data we have.

We can see the structure of the DataFrame by executing the command printSchema()

tempDF.printSchema()
root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)

the column names _c0_c1, and _c2 (automatically generated names)

display(tempDF)
รายการแรกกลายเป็นข้อมูลไปด้วย แทนที่จะเป็น header

Step #2 – Use the File’s Header

tempDF2 = (spark.read          # The DataFrameReader
   .option("sep", "\t")        # Use tab delimiter (default is comma-separator)
   .option("header", "true")   # Use first line of all files as header
   .csv(csvFile)               # Creates a DataFrame from CSV after reading in the file
)
tempDF2.printSchema()
root
 |-- ID: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Weight: string (nullable = true)
display(tempDF2)
รายการแรกเป็น header ถูกต้องละ

Step #3 – Infer the Schema

Lastly, we can add an option that tells the reader to infer each column’s data type (aka the schema)

tempDF3 = (spark.read              # The DataFrameReader
   .option("header", "true")       # Use first line of all files as header
   .option("sep", "\t")            # Use tab delimiter (default is comma-separator)
   .option("inferSchema", "true")  # Automatically infer data types
   .csv(csvFile)                   # Creates a DataFrame from CSV after reading in the file
)
tempDF3.printSchema()
root
 |-- ID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Weight: integer (nullable = true)

Two Spark jobs were executed (not one as in the previous example)

3. Reading from CSV – User-Defined Schema

The difference here is that we are going to define the schema beforehand and hopefully avoid the execution of any extra jobs.

Step #1

Declare the schema. This is just a list of field names and data types.

# Required for StructField, StringType, IntegerType, etc.
from pyspark.sql.types import *

csvSchema = StructType([
  StructField("ID", IntegerType(), False),
  StructField("Name", StringType(), False),
  StructField("Weight", IntegerType(), False)
])

Step #2

Read in our data (and print the schema). We can specify the schema, or rather the StructType, with the schema(..) command:

tempDF4 = (spark.read         # The DataFrameReader
  .option('header', 'true')   # Ignore line #1 - it's a header
  .option('sep', "\t")        # Use tab delimiter (default is comma-separator)
  .schema(csvSchema)          # Use the specified schema
  .csv(csvFile)               # Creates a DataFrame from CSV after reading in the file
)
tempDF4.printSchema()
root
 |-- ID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Weight: integer (nullable = true)

Zero Spark jobs were executed

4. Reading from multiple CSV

tempDF = (spark.read
   .option("header", "true")
   .csv("/mnt/training/*.csv")
)
tempDF = (spark.read
   .format("csv")
   .option("header", "true")
   .load("/mnt/training/*.csv")
)