Spark DataFrame

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
sqlContext 

df  = sqlContext.sql("""
  SELECT *
  FROM   some_schema.some_table
  WHERE  eff_date >= '2022-05-01'
  AND    eff_date < '2022-06-01'
""")
print(type(df))
<class 'pyspark.sql.dataframe.DataFrame'>
data = [('A', "1"),
        ('B', "2"),
        ('C', "3"),
        ('D', "4"),
        ("Cate", "ID")
        ]
print(type(data))  # <class 'list'>
df = spark.createDataFrame(data)
print(type(df))    # <class 'pyspark.sql.dataframe.DataFrame'>
data = [{"Category": 'A', "ID": 1, "Value": 121.44, "Truth": True},
        {"Category": 'B', "ID": 2, "Value": 300.01, "Truth": False},
        {"Category": 'C', "ID": 3, "Value": 10.99, "Truth": None},
        {"Category": 'D', "ID": 4, "Value": 33.87, "Truth": True}
        ]
print(type(data))  # <class 'list'>
df = spark.createDataFrame(data)
print(type(df))    # <class 'pyspark.sql.dataframe.DataFrame'>
li = df.collect()
print(type(li))    # <class 'list'>
print(li[0])
# Row(Category='A', ID=1, Truth=True, Value=121.44)

กำหนดค่าเป็น null ได้ด้วยการให้ค่า None หรือ ไม่ให้ค่าตัวแปรนั้น

data = [{"Category": 'A', "ID": 1, "Value": 121.44, "Truth": True, "date": "20220927"},
        {"Category": 'B', "ID": 2, "Value": 300.01, "Truth": False},
        {"Category": 'C', "ID": 3, "Value": 10.99, "Truth": None},
        {"Category": 'D', "ID": 4, "Value": 33.87, "Truth": True}
        ]
df = spark.createDataFrame(data)
display(df)

Using Spark to Write Data to a Single CSV File

Apache Spark is a system designed to work with very large datasets. Its default behavior reflects the assumption that you will be working with a large dataset that is split across many nodes in a cluster.

When you use Apache Spark to write a dataframe to disk, you will notice that it writes the data into multiple files. Let’s look at an example and see this in action.

# First, we just read in some sample data so we have a Spark dataframe
df = spark.read.option("header", "true").csv("dbfs:/databricks-datasets/atlas_higgs/atlas_higgs.csv")

# Now, let's write this data out in CSV format so we can see how Spark writes the files
df.write.format("csv").mode("overwrite").save("/my-output/default-csv")

Now let’s take a look at the CSV files that Spark wrote…

dbutils.fs.ls("/my-output/default-csv")
Out[22]: [FileInfo(path='dbfs:/my-output/default-csv/_SUCCESS', name='_SUCCESS', size=0),
 FileInfo(path='dbfs:/my-output/default-csv/_committed_3363429043923895909', name='_committed_3363429043923895909', size=1256),
 FileInfo(path='dbfs:/my-output/default-csv/_started_3363429043923895909', name='_started_3363429043923895909', size=0),
 FileInfo(path='dbfs:/my-output/default-csv/part-00000-tid-3363429043923895909-2caa68d8-1164-41ee-9c37-2bce95052501-31-1-c000.csv', name='part-00000-tid-3363429043923895909-2caa68d8-1164-41ee-9c37-2bce95052501-31-1-c000.csv', size=4193821),
 FileInfo(path='dbfs:/my-output/default-csv/part-00001-tid-3363429043923895909-2caa68d8-1164-41ee-9c37-2bce95052501-32-1-c000.csv', name='part-00001-tid-3363429043923895909-2caa68d8-1164-41ee-9c37-2bce95052501-32-1-c000.csv', size=4194469),
 FileInfo(path='dbfs:/my-output/default-csv/part-00002-tid-3363429043923895909-2caa68d8-1164-41ee-9c37-2bce95052501-33-1-c000.csv', name='part-00002-tid-3363429043923895909-2caa68d8-1164-41ee-9c37-2bce95052501-33-1-c000.csv', size=4194236),
 FileInfo(path='dbfs:/my-output/default-csv/part-00003-tid-3363429043923895909-2caa68d8-1164-41ee-9c37-2bce95052501-34-1-c000.csv', name='part-00003-tid-3363429043923895909-2caa68d8-1164-41ee-9c37-2bce95052501-34-1-c000.csv', size=4194352),
...

You will notice that our dataset was not written to one, single CSV file in a nice, tidy format. Instead, the rows are spread out across a bunch of different CSV files. Spark can work easily with these multiple files. However, if you want to share this data with other systems, having multiple files can be cumbersome.

Before we look at how to change Spark’s behavior, we need to understand why Spark writes the data this way.

The key thing to always remember about Spark is that the data is always spread out across multiple computers. The data doesn’t reside in the memory of just one computer. It has been divided into multiple partitions, and those partitions are distributed across many computers.

When you tell Spark to write your data, it completes this operation in parallel. The driver tells all of the nodes to start writing their data at the same time. So each node in the cluster starts writing all of the partitions that it has at the same time all of the other nodes are writing all of their partitions. Therefore, Spark can’t write the data to just one file because all of the nodes would be tripping over each other. They would each try to write to the same file and end up overwriting the data that other nodes had written.

To solve this problem, Spark saves the data from each partition to its own file. Therefore, the number of files that get written is equal to the number of partitions that Spark created for your data.

Changing Spark’s Behavior

While Spark is designed to work with large, mult-terabyte datasets that could never fit into the memory of just one computer, we sometimes use it to work with smaller datasets. And sometime this dataset is relatively small… just a couple of gigabytes or even a few hundred megabytes. If you find yourself working with a small dataset like this, you can get Spark to write the data to just one file.

That last point is very important and bears repeating. To make this work, all of the data must be loaded into the memory of just one computer. Therefore, this technique only works on small datasets. If the nodes in your cluster each have 16GB of RAM, then you can probably make this work with 10GB of data or less. If you have a dataset that is bigger than the amount of RAM on each node, you cannot use this technique because you will risk crashing your cluster.

Fortunately, our sample dataset above is less than 100MB. So, keeping in mind the important limitation described above, this dataset should easily fit in the memory of just one PC. So let’s proceed with writing out our dataset to just one CSV file. There are a couple of ways to achieve this, and we will look at both of them.

Option 1: Use the coalesce Feature

The Spark Dataframe API has a method called coalesce that tells Spark to shuffle your data into the specified number of partitions. Since our dataset is small, we use this to tell Spark to rearrange our data into a single partition before writing out the data.

Note, though, that there is a performance penalty for this. Before writing the data, Spark must shuffle the data from all of the nodes to a single partition on a single node. This takes time and puts traffic on the cluster’s network. For a ver small dataset (like the one here in our example), this is a small penalty, but it will increase as the size of your data increases.

df\
.coalesce(1)\
.write\
.format("csv")\
.mode("overwrite")\
.save("/my-output/coalesce-csv")

Let’s take a look at the files created by Spark after using the coalesce method.

dbutils.fs.ls("/my-output/coalesce-csv")
Out[27]: [FileInfo(path='dbfs:/my-output/coalesce-csv/_SUCCESS', name='_SUCCESS', size=0),
 FileInfo(path='dbfs:/my-output/coalesce-csv/_committed_8239842462067349322', name='_committed_8239842462067349322', size=112),
 FileInfo(path='dbfs:/my-output/coalesce-csv/_started_8239842462067349322', name='_started_8239842462067349322', size=0),
 FileInfo(path='dbfs:/my-output/coalesce-csv/part-00000-tid-8239842462067349322-52e5d421-3f6b-4768-a979-71ac9a0c9ee2-45-1-c000.csv', name='part-00000-tid-8239842462067349322-52e5d421-3f6b-4768-a979-71ac9a0c9ee2-45-1-c000.csv', size=55253165)]

You will notice that Spark still wrote the data into a directory, and that directory has multiple files. There are the Spark control files (e.g. the “SUCCESS” file, the “started” file, and the “committed” file). But there is only Cone SV file containing our data. Unfortunately, this file does not have a friendly name. If we want to share this file, we may want to rename it to something shorter. We can Python to clean up the control files and rename the data file.

data_location = "/my-output/coalesce-csv/"

files = dbutils.fs.ls(data_location)
csv_file = [x.path for x in files if x.path.endswith(".csv")][0]
dbutils.fs.mv(csv_file, data_location.rstrip('/') + ".csv")
dbutils.fs.rm(data_location, recurse = True)
Out[44]: True

Now let’s take one more look at our files to see that we have just one CSV file with a nice, friendly name.

dbutils.fs.ls("/my-output")
Out[45]: [FileInfo(path='dbfs:/my-output/coalesce-csv.csv', name='coalesce-csv.csv', size=55253165),
 FileInfo(path='dbfs:/my-output/default-csv/', name='default-csv/', size=0)]

Option 2: Use collect and Pandas

If you’ve used Python for data science work, you may be familiar with the pandas package. This popular tool allows you to create in-memory dataframes on a single computer. If your Spark dataframe is small enough to fit into the RAM of your cluster’s driver node, then you can simply convert your Spark dataframe to a pandas dataframe. Then you can use the standard pandas functionality to save your pandas dataframe to a single CSV file.

pd = df.toPandas()
pd.to_csv("/dbfs/my-output/pandas.csv")

And now if we look at our output directory, we will see our new CSV file.

dbutils.fs.ls("/my-output")
Out[52]: [FileInfo(path='dbfs:/my-output/coalesce-csv.csv', name='coalesce-csv.csv', size=55253165),
 FileInfo(path='dbfs:/my-output/default-csv/', name='default-csv/', size=0),
 FileInfo(path='dbfs:/my-output/pandas.csv', name='pandas.csv', size=56892564)]

That was super easy! But you must be very careful with this approach. It will only work with small datasets. If you try to convert a large dataframe to a pandas dataframe, you could crash the driver node of your cluster. Make sure your driver node has enough RAM to hold the entire dataset.

One other note on this approach. You will notice that throughout this notebook we have written data to the DBFS. We’ve done this using paths relative to the root of the DBFS, like: /my-output/coalesce-csv. In Databricks, Spark and the dbutils tool are all “DBFS-aware”. Whenever you supply a filepath to these tools, it assumes that you want to use the DBFS. Non-Spark tools (like the pandas tool) are not “DBFS-aware”. Whenever you give them a filepath, they assume you want to use the filesystem of the driver node. Therefore, you must add /dbfs/ to the beginning of your filepath so these tools will look in the right place. For example, when we used the to_csv method from the pandas package, we had to use /dbfs/my-output/pandas.csv as our location.