Pyspark
Following documentation from Dap Cats pyspark intro
Overview
PySpark DataFrames are processed on the Spark cluster. This is a big pool of linked machines, called nodes. PySpark DataFrames are distributed into partitions, and are processed in parallel on the nodes in the Spark cluster. You can have much greater memory capacity with Spark and so is suitable for big data.
Sessions
- Start a default session :
SparkSession.builder/spark_connect()
from pyspark.sql import SparkSession
spark = (
SparkSession.builder.appName("default-session")
.getOrCreate()
)
-
Stop sessions by using :
spark.stop()/spark_disconnect() -
Config
spark-defaults.confconfiguration file -
Two modes: local, cluster
-
When using cdsw, spark config is already done for you when you launch the session.
Data Types
-
Types are inferred in Pyspark
-
Import from
pyspark.sql.type -
Example scheme:
root
|-- incident_number: string (nullable = true)
|-- date_time_of_call: string (nullable = true)
|-- cal_year: integer (nullable = true)
|-- fin_year: string (nullable = true)
-
Can read data from parquet files (scheme included) and csvs (not included, scheme inferred or provided like below)
-
Column methods
.cast()or.astype()to change type of column
Creating Spark Dataframes
Create spark session
import pandas as pd
from pyspark.sql import SparkSession, functions as F
spark = (SparkSession.builder.master("local[2]")
.appName("create-DFs")
.getOrCreate())
winners_pd = pd.DataFrame(
{"year": list(range(2017, 2022)),
"winner": ["Minella Times", None, "Tiger Roll", "Tiger Roll", "One For Arthur"],
"starting_price": ["11/1", None, "4/1 F", "10/1", "14/1"],
"age": [8, None, 9, 8, 8],
"jockey": ["Rachael Blackmore", None, "Davy Russell", "Davy Russell", "Derek Fox"]
})
winners_spark = spark.createDataFrame(winners_pd)
Manual creation of df
winners_spark = spark.createDataFrame(data=[
[2021, "Minella Times", "11/1", 8, "Rachael Blackmore"],
[2020, None, None, None, None],
[2019, "Tiger Roll", "4/1 F", 9, "Davy Russell"],
[2018, "Tiger Roll", "10/1", 8, "Davy Russell"],
[2017, "One For Arthur", "14/1", 8, "Derek Fox"]],
schema=["year", "winner", "starting_price", "age", "jockey"])
Quick reference
| functions | description |
|---|---|
spark.read.csv(), spark.read.table() |
Read in data from csv or hive tables |
.printSchema() |
Print schema |
.show() |
Show first few rows of df to terminal |
.toPandas() |
spark df to pandas df |
.select() |
sql query to select columns |
.count() |
counts rows |
.drop() |
drop columns |
.withColumnRenamed() |
rename column |
.filter() |
filter columns, use with F.col() |
F.when(df.age > 3, 1).otherwise(0) |
if else statement |
Testing
When creating dataframes during testing, you need to reference a spark session. Instead of creating a new sparksession inside each function, create a fixture for it.
import pytest
from pyspark.sql import SparkSession
@pytest.fixture
def spark_fixture():
spark = SparkSession.builder.appName("Testing PySpark Example").getOrCreate()
yield spark
Use chispa library to check if 2 dfs are equal.