Spark#

This page considers the python SDK for Spark. For more information, check out thePySpark Overveiew tutorial on the official website.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark_session = SparkSession.builder.appName('Temp').getOrCreate()
WARNING: Using incubator modules: jdk.incubator.vector
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/09/23 11:20:27 WARN Utils: Your hostname, user-ThinkPad-E16-Gen-2, resolves to a loopback address: 127.0.1.1; using 10.202.22.210 instead (on interface enp0s31f6)
25/09/23 11:20:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/23 11:20:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Configuration#

Some configuration is required to start experimenting with Spark in local mode:

  • pip3 install pyspark: for spark instalation.

  • Install java: openjdk-17-jdk package in apt. Set path to the jdk to the $JAVA_HOME variable. In ubuntu case export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64.


If you have completed the specified configuration correctly, you will be able to run the script below, which creates a local SparkContext - way to experiment with spark without any clusters.

Spark Context: is a low-level API for manipulating with computational resources provided by Spark.

from pyspark import SparkContext, SparkConf

sc = SparkContext(conf=SparkConf().setMaster("local"))
WARNING: Using incubator modules: jdk.incubator.vector
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/09/19 09:04:44 WARN Utils: Your hostname, user-ThinkPad-E16-Gen-2, resolves to a loopback address: 127.0.1.1; using 10.202.22.210 instead (on interface enp0s31f6)
25/09/19 09:04:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/19 09:04:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Session#

Spark Session: is built on top of the SparkContext tool to implement the way users interact with SparkSQL.

The following list shows the different functions that allow manipulation of the session lifecycle:

  • The SparkSession.builder.getOrCreate() creates the session.

  • The SparkSession.getActiveSession() to get the active session, it will return None if there is no active session.

  • The stop method allows you to stop the current session. Note. Spark does not allow to keep two sessions on the same JVM.


The following cell illustrates an example of how to create a Spark session.

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Temp").getOrCreate()
type(spark)
pyspark.sql.session.SparkSession

After that, SparkSession.getActiveSession returns an object representing the session.

session = SparkSession.getActiveSession()
type(session)
pyspark.sql.session.SparkSession

After calling stop from the Spark session the getActiveSession returns just None.

spark.stop()
SparkSession.getActiveSession() is None
True

Dataframe#

Spark SQL contains a DataFrame objects that provide a way to interact with tabular data.

You can define a data frame:

  • Directly from your code using the createDataFrame method of the session object.

  • Using some special methods to read from external sources stored in the read attribute of the session.


The following cell defines the Spark dataset, which is formatted so that each row is a tuple whose values correspond to each column. And shows it.

df = spark_session.createDataFrame(
    data=[("Alice", 25), ("Bob", 30), ("Cathy", 35)]
)
df.show()
                                                                                
+-----+---+
|   _1| _2|
+-----+---+
|Alice| 25|
|  Bob| 30|
|Cathy| 35|
+-----+---+

The following cell shows an alternative way to define the same data frame. Each row here is represented as a dictionary, and the values are specified under the keys, which correesponds to the column names.

df = spark_session.createDataFrame(
    data=[
        {"name": "Alice", "age": 25},
        {"name": "Bob", "age": 30},
        {"name": "Cathy", "age": 35}
    ]
)
df.show()
+---+-----+
|age| name|
+---+-----+
| 25|Alice|
| 30|  Bob|
| 35|Cathy|
+---+-----+

Data sources#

Spark is a typical tool for building ETL pipelines, which include cyclical improvements through the process of saving and loading data. More over spark have special tools for data versioning.

For more details check:

The Spark data frame contains the write interface, for saving data into the storage. The following table lists important methods of the interface.

Method

Description

Primary Use Case

Key Features

save()

Writes the DataFrame to a file system path. You specify the format (e.g., Parquet, CSV, JSON).

Simple, file-based persistence of data.

Creates unmanaged data; Spark does not track its metadata. Dropping the table (if you create one) does not delete the files.

csv(), json(), parquet()

Writes the DataFrame to a file system path, in a corresponding format

Simple, file-based persistence of data.

Creates unmanaged data; Spark does not track its metadata. Dropping the table (if you create one) does not delete the files.

parquet()

A specific and highly-recommended way to save data in the Parquet format.

High-performance, schema-aware storage for analytics.

Columnar storage, automatic schema preservation, and efficient compression. This is the default for save().

saveAsTable()

Creates a managed table in the Hive Metastore.

Creating a named table for easy querying with Spark SQL.

Spark manages both the data and its metadata. Dropping the table deletes both the catalog entry and the data files.

jdbc()

Writes the DataFrame to a relational database using a JDBC connection.

Storing data in a traditional database for transactional or reporting purposes.

Requires a JDBC driver and connection string. Allows you to specify table names, modes, and connection properties.

format('...').save()

A more generic way to save data, explicitly specifying the format and path.

When using a format that isn’t a dedicated method (e.g., Avro, ORC).

Gives you full control over the data source format. Also used for setting format-specific options.

partitionBy()

A method to partition the data on disk based on one or more columns.

Optimizing queries that frequently filter on specific columns.

Creates subdirectories for each unique value of the specified partition column(s), significantly speeding up read operations.

The Spark data frame implements the read interface, which has the following specific methods:

Method

Description

Primary Use Case

Key Options

load()

A generic method to read data from a source. You must explicitly specify the format.

Reading from a data source when you need full control over the format and options.

format(), path(), and any format-specific options (e.g., header). It defaults to Parquet if no format is specified.

parquet()

A dedicated, highly-optimized method for reading Parquet files.

Loading high-performance data that was previously saved by Spark.

This method automatically infers the schema from the Parquet file’s metadata, so it requires fewer options.

csv()

Loads data from CSV files.

Reading human-readable, simple-structured data where the schema is not embedded.

header=True (to use the first row as column names) and inferSchema=True (to automatically detect data types).

json()

Loads data from line-delimited JSON files.

Reading semi-structured data from web logs, APIs, or data dumps.

The method automatically infers the schema, but you can provide a schema to avoid inference.

jdbc()

Connects to and reads data from a relational database table.

Loading data from a traditional database for ETL or analysis.

Requires a url, table, and driver string. You can also specify partitionColumn, lowerBound, and upperBound for parallel reads.

table()

Reads a managed table from the Spark/Hive Metastore.

Reading a table that was previously created using df.write.saveAsTable().

This is the easiest way to load data since you only need the table name. Spark handles locating the data and its schema automatically.

It’s also important to know that Spark’s most native data sources its spark SQL catalog, which in different configuratoins can be different. Possible options are:

  • Just folder that stores all necesarry files.

  • Hive storage.

  • Delta Lake.

  • Apahce Iceberg.

Columns#

Data frame consists of a set of columns. There are two concepts important to know for refering the columns:

  • There are corresponding attibute of the data frame.

  • The pyspark.sql.functions.col allows you to define a reference to a column, when applied to a particular dataset, will be interpreted as a specific column in that dataset.

The following table shows the typical use cases in which you may be required to reference a column.

Category

Method / Operator

Example

Description

Comparison

==, !=, >, <, >=, <=

col("age") > 18

Compares column values. Returns a boolean column.

Boolean

&, |, ~

(col("age") > 18) & (col("vip") == True)

Logical AND (&), OR (|), and NOT (~).

Arithmetic

+, -, *, /, %

(col("price") * col("qty"))

Arithmetic operations between columns or with literals.

Aliasing

.alias(name)

col("age").alias("user_age")

Renames the column in the resulting DataFrame.

Casting

.cast(dataType)

col("age").cast("string")

Changes the column type.

Null Handling

.isNull(), .isNotNull()

col("name").isNotNull()

Tests for NULL values.

String Ops

.contains(), .startswith(), .endswith()

col("name").contains("Al")

String matching and filtering.

Math

(via pyspark.sql.functions)

sqrt(col("value"))

Use functions like abs, log, sqrt, exp, pow.

Aggregation

(via pyspark.sql.functions)

sum(col("value"))

Use avg, min, max, sum, count.

Conditional

(via when)

when(col("age") > 18, "adult")

Build conditional expressions.

Window Ops

(via over)

row_number().over(windowSpec)

Used for ranking, lead/lag, etc.

Collection

.getItem(index)

col("array_col").getItem(0)

Access element of array column.

Struct Access

.getField(name)

col("struct_col").getField("x")

Access field of struct column.

Spark uses these references to the columns when performing operations like: withColumn and filter.


The following cell defines the data frame that will be used as an example.

test_df = spark_session.createDataFrame(
    data=[
        (8, 20),
        (9, 43),
        (15, 88)
    ],
    schema=["column1", "column2"]
)
test_df.show()
+-------+-------+
|column1|column2|
+-------+-------+
|      8|     20|
|      9|     43|
|     15|     88|
+-------+-------+

The following cell apply the filter with the condition specified using a direct reference to the test_df.column1.

condition = (test_df.column1 > 10)
print(type(condition))
test_df.filter(condition=condition).show()
<class 'pyspark.sql.classic.column.Column'>
+-------+-------+
|column1|column2|
+-------+-------+
|     15|     88|
+-------+-------+

Alternatively, the next cell specifies the calculation using the abstract column2. However, the withColumn function of the test_df interprets it as a reference to column2 it contains.

calculation = col("column2") + 8
test_df.withColumn("result", calculation).show()
+-------+-------+------+
|column1|column2|result|
+-------+-------+------+
|      8|     20|    28|
|      9|     43|    51|
|     15|     88|    96|
+-------+-------+------+

Transformations#

This section consdiers general data transformation accessible in PySpark:

  • The withColumn: allows to specify the result column, and a computation that would be used to produce values.

  • The selectExr: allows you to specify the operations under the columns using SQL syntax to produce the new ones.

  • The na attribute gathers methods that specify transformation how to handle missing values: na.fill, na.drop and na.replace.

  • The replace method allows to specify which values to replace.

Check Transformations page.

Group by#

The data frame contains the groupBy method method, which returns a special GroupedData object. This object contains a set of tools for building an aggregations over the data:

Method

Description

agg

General aggregation with one or more expressions.

avg

Computes the average of the given columns.

mean

Alias for avg().

max

Maximum value for each column.

min

Minimum value for each column.

sum

Sum of values for each column.

count

Count of rows for each group.

pivot

Performs a pivot (like SQL PIVOT) on the specified column, turning its values into new columns.

applyInPandas

Apply a function to each group as a Pandas DataFrame and return a new DataFrame.

apply

Apply a user-defined function to each group (returns an RDD, not a DataFrame — less common).

Check more details in the groupby page.


The following cell defines an example data frame. It constructs and shows the GroupedData object based on it.

test_df = spark_session.createDataFrame(
    data=[
        ("a", 3),
        ("a", 2),
        ("c", 4),
        ("c", 7)
    ],
    schema=['group', 'value']
)

grouped_expression = test_df.groupBy('group')
grouped_expression
GroupedData[grouping expressions: [group], value: [group: string, value: bigint], type: GroupBy]

The following code shows how to use the agg function to compute the aggregations based on the groups.

from pyspark.sql.functions import sum, avg, min, max
grouped_expression.agg(
    sum('value'),
    avg('value'),
    min('value'),
    max('value')
).show()
+-----+----------+----------+----------+----------+
|group|sum(value)|avg(value)|min(value)|max(value)|
+-----+----------+----------+----------+----------+
|    a|         5|       2.5|         2|         3|
|    c|        11|       5.5|         4|         7|
+-----+----------+----------+----------+----------+

ML#

There is a pyspark.ml package that contains a set of tools that implements a typical ml transformations. The following table provides an overwiew of its capabilities:

Module / Class

Description

Pipeline

Combines multiple stages (transformers + estimators) into a single workflow.

Transformer

An object that transforms a DataFrame (e.g., feature scaling).

Estimator

An object that fits a model from data and returns a Transformer.

Evaluator

Computes metrics to evaluate model performance.

tuning

Tools for hyperparameter tuning (CrossValidator, TrainValidationSplit).

feature

Feature engineering utilities (e.g., StringIndexer, VectorAssembler).

classification

Classification algorithms (e.g., LogisticRegression, RandomForest).

regression

Regression algorithms (e.g., LinearRegression, GBTRegressor).

clustering

Clustering algorithms (e.g., KMeans, GaussianMixture).

recommendation

Collaborative filtering (e.g., ALS for matrix factorization).

stat

Statistical functions (e.g., ChiSquareTest, Correlation).

For more details check: