Spark DataFrame

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
sqlContext 

df  = sqlContext.sql("""
  SELECT *
  FROM   some_schema.some_table
  WHERE  eff_date >= '2022-05-01'
  AND    eff_date < '2022-06-01'
""")
print(type(df))
<class 'pyspark.sql.dataframe.DataFrame'>
data = [('A', "1"),
        ('B', "2"),
        ('C', "3"),
        ('D', "4"),
        ("Cate", "ID")
        ]
print(type(data))  # <class 'list'>
df = spark.createDataFrame(data)
print(type(df))    # <class 'pyspark.sql.dataframe.DataFrame'>
data = [{"Category": 'A', "ID": 1, "Value": 121.44, "Truth": True},
        {"Category": 'B', "ID": 2, "Value": 300.01, "Truth": False},
        {"Category": 'C', "ID": 3, "Value": 10.99, "Truth": None},
        {"Category": 'D', "ID": 4, "Value": 33.87, "Truth": True}
        ]
print(type(data))  # <class 'list'>
df = spark.createDataFrame(data)
print(type(df))    # <class 'pyspark.sql.dataframe.DataFrame'>
li = df.collect()
print(type(li))    # <class 'list'>
print(li[0])
# Row(Category='A', ID=1, Truth=True, Value=121.44)

กำหนดค่าเป็น null ได้ด้วยการให้ค่า None หรือ ไม่ให้ค่าตัวแปรนั้น

data = [{"Category": 'A', "ID": 1, "Value": 121.44, "Truth": True, "date": "20220927"},
        {"Category": 'B', "ID": 2, "Value": 300.01, "Truth": False},
        {"Category": 'C', "ID": 3, "Value": 10.99, "Truth": None},
        {"Category": 'D', "ID": 4, "Value": 33.87, "Truth": True}
        ]
print(type(data))     # <class 'list'>
print(type(data[0]))  # <class 'dict'>
df = spark.createDataFrame(data)
display(df)

Pyspark add new row to dataframe

create 1st dataframe

%python
columns = ['Identifier', 'Value', 'Extra Discount']
vals = [(1, 150, 0), (2, 160, 12)]
df = spark.createDataFrame(vals, columns)
df.show()
+----------+-----+--------------+
|Identifier|Value|Extra Discount|
+----------+-----+--------------+
|         1|  150|             0|
|         2|  160|            12|
+----------+-----+--------------+

create 2nd dataframe

%python
newRow = spark.createDataFrame([(3,205,7)], columns)
newRow.show()
+----------+-----+--------------+
|Identifier|Value|Extra Discount|
+----------+-----+--------------+
|         3|  205|             7|
+----------+-----+--------------+

union dataframe

%python
new_df = df.union(newRow)
new_df.show()
+----------+-----+--------------+
|Identifier|Value|Extra Discount|
+----------+-----+--------------+
|         1|  150|             0|
|         2|  160|            12|
|         3|  205|             7|
+----------+-----+--------------+

Merge pandas dataFrame

create dataframe

%python
import pyspark.pandas as ps
df1 = ps.DataFrame({'lkey': ['foo', 'bar', 'baz', 'foo'],
                    'value': [1, 2, 3, 5]},
                   columns=['lkey', 'value'])
display(df1)
#   lkey  value
# 0  foo      1
# 1  bar      2
# 2  baz      3
# 3  foo      5
# df2

df2 = ps.DataFrame({'rkey': ['foo', 'bar', 'baz', 'foo'],
                    'value': [5, 6, 7, 8]},
                   columns=['rkey', 'value'])
display(df2)
#   rkey  value
# 0  foo      5
# 1  bar      6
# 2  baz      7
# 3  foo      8

merge

%python
merged = df1.merge(df2, left_on='lkey', right_on='rkey')
display(merged)
#   lkey  value_x rkey  value_y
#    bar        2  bar        6
#    baz        3  baz        7
#    foo        1  foo        5
#    foo        1  foo        8
#    foo        5  foo        5
#    foo        5  foo        8