Spark#
This page considers the python SDK for Spark. For more information, check out thePySpark Overveiew tutorial on the official website.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark_session = SparkSession.builder.appName('Temp').getOrCreate()
WARNING: Using incubator modules: jdk.incubator.vector
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/09/23 11:20:27 WARN Utils: Your hostname, user-ThinkPad-E16-Gen-2, resolves to a loopback address: 127.0.1.1; using 10.202.22.210 instead (on interface enp0s31f6)
25/09/23 11:20:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/23 11:20:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Configuration#
Some configuration is required to start experimenting with Spark in local mode:
pip3 install pyspark: for spark instalation.Install java:
openjdk-17-jdkpackage inapt. Set path to the jdk to the$JAVA_HOMEvariable. In ubuntu caseexport JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64.
If you have completed the specified configuration correctly, you will be able to run the script below, which creates a local SparkContext - way to experiment with spark without any clusters.
Spark Context: is a low-level API for manipulating with computational resources provided by Spark.
from pyspark import SparkContext, SparkConf
sc = SparkContext(conf=SparkConf().setMaster("local"))
WARNING: Using incubator modules: jdk.incubator.vector
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/09/19 09:04:44 WARN Utils: Your hostname, user-ThinkPad-E16-Gen-2, resolves to a loopback address: 127.0.1.1; using 10.202.22.210 instead (on interface enp0s31f6)
25/09/19 09:04:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/19 09:04:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Session#
Spark Session: is built on top of the SparkContext tool to implement the way users interact with SparkSQL.
The following list shows the different functions that allow manipulation of the session lifecycle:
The
SparkSession.builder.getOrCreate()creates the session.The
SparkSession.getActiveSession()to get the active session, it will returnNoneif there is no active session.The
stopmethod allows you to stop the current session. Note. Spark does not allow to keep two sessions on the same JVM.
The following cell illustrates an example of how to create a Spark session.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Temp").getOrCreate()
type(spark)
pyspark.sql.session.SparkSession
After that, SparkSession.getActiveSession returns an object representing the session.
session = SparkSession.getActiveSession()
type(session)
pyspark.sql.session.SparkSession
After calling stop from the Spark session the getActiveSession returns just None.
spark.stop()
SparkSession.getActiveSession() is None
True
Dataframe#
Spark SQL contains a DataFrame objects that provide a way to interact with tabular data.
You can define a data frame:
Directly from your code using the
createDataFramemethod of the session object.Using some special methods to read from external sources stored in the
readattribute of the session.
The following cell defines the Spark dataset, which is formatted so that each row is a tuple whose values correspond to each column. And shows it.
df = spark_session.createDataFrame(
data=[("Alice", 25), ("Bob", 30), ("Cathy", 35)]
)
df.show()
+-----+---+
| _1| _2|
+-----+---+
|Alice| 25|
| Bob| 30|
|Cathy| 35|
+-----+---+
The following cell shows an alternative way to define the same data frame. Each row here is represented as a dictionary, and the values are specified under the keys, which correesponds to the column names.
df = spark_session.createDataFrame(
data=[
{"name": "Alice", "age": 25},
{"name": "Bob", "age": 30},
{"name": "Cathy", "age": 35}
]
)
df.show()
+---+-----+
|age| name|
+---+-----+
| 25|Alice|
| 30| Bob|
| 35|Cathy|
+---+-----+
Data sources#
Spark is a typical tool for building ETL pipelines, which include cyclical improvements through the process of saving and loading data. More over spark have special tools for data versioning.
For more details check:
A comprehensive Data Sources guide in the corresponding page.
Data Sources for more practical examples.
The Spark data frame contains the write interface, for saving data into the storage. The following table lists important methods of the interface.
Method |
Description |
Primary Use Case |
Key Features |
|---|---|---|---|
|
Writes the DataFrame to a file system path. You specify the format (e.g., Parquet, CSV, JSON). |
Simple, file-based persistence of data. |
Creates unmanaged data; Spark does not track its metadata. Dropping the table (if you create one) does not delete the files. |
|
Writes the DataFrame to a file system path, in a corresponding format |
Simple, file-based persistence of data. |
Creates unmanaged data; Spark does not track its metadata. Dropping the table (if you create one) does not delete the files. |
|
A specific and highly-recommended way to save data in the Parquet format. |
High-performance, schema-aware storage for analytics. |
Columnar storage, automatic schema preservation, and efficient compression. This is the default for |
|
Creates a managed table in the Hive Metastore. |
Creating a named table for easy querying with Spark SQL. |
Spark manages both the data and its metadata. Dropping the table deletes both the catalog entry and the data files. |
|
Writes the DataFrame to a relational database using a JDBC connection. |
Storing data in a traditional database for transactional or reporting purposes. |
Requires a JDBC driver and connection string. Allows you to specify table names, modes, and connection properties. |
|
A more generic way to save data, explicitly specifying the format and path. |
When using a format that isn’t a dedicated method (e.g., Avro, ORC). |
Gives you full control over the data source format. Also used for setting format-specific options. |
|
A method to partition the data on disk based on one or more columns. |
Optimizing queries that frequently filter on specific columns. |
Creates subdirectories for each unique value of the specified partition column(s), significantly speeding up read operations. |
The Spark data frame implements the read interface, which has the following specific methods:
Method |
Description |
Primary Use Case |
Key Options |
|---|---|---|---|
|
A generic method to read data from a source. You must explicitly specify the format. |
Reading from a data source when you need full control over the format and options. |
|
|
A dedicated, highly-optimized method for reading Parquet files. |
Loading high-performance data that was previously saved by Spark. |
This method automatically infers the schema from the Parquet file’s metadata, so it requires fewer options. |
|
Loads data from CSV files. |
Reading human-readable, simple-structured data where the schema is not embedded. |
|
|
Loads data from line-delimited JSON files. |
Reading semi-structured data from web logs, APIs, or data dumps. |
The method automatically infers the schema, but you can provide a schema to avoid inference. |
|
Connects to and reads data from a relational database table. |
Loading data from a traditional database for ETL or analysis. |
Requires a |
|
Reads a managed table from the Spark/Hive Metastore. |
Reading a table that was previously created using |
This is the easiest way to load data since you only need the table name. Spark handles locating the data and its schema automatically. |
It’s also important to know that Spark’s most native data sources its spark SQL catalog, which in different configuratoins can be different. Possible options are:
Just folder that stores all necesarry files.
Hive storage.
Delta Lake.
Apahce Iceberg.
Columns#
Data frame consists of a set of columns. There are two concepts important to know for refering the columns:
There are corresponding attibute of the data frame.
The
pyspark.sql.functions.colallows you to define a reference to a column, when applied to a particular dataset, will be interpreted as a specific column in that dataset.
The following table shows the typical use cases in which you may be required to reference a column.
Category |
Method / Operator |
Example |
Description |
|---|---|---|---|
Comparison |
|
|
Compares column values. Returns a boolean column. |
Boolean |
|
|
Logical AND ( |
Arithmetic |
|
|
Arithmetic operations between columns or with literals. |
Aliasing |
|
|
Renames the column in the resulting DataFrame. |
Casting |
|
|
Changes the column type. |
Null Handling |
|
|
Tests for |
String Ops |
|
|
String matching and filtering. |
Math |
(via |
|
Use functions like |
Aggregation |
(via |
|
Use |
Conditional |
(via |
|
Build conditional expressions. |
Window Ops |
(via |
|
Used for ranking, lead/lag, etc. |
Collection |
|
|
Access element of array column. |
Struct Access |
|
|
Access field of struct column. |
Spark uses these references to the columns when performing operations like: withColumn and filter.
The following cell defines the data frame that will be used as an example.
test_df = spark_session.createDataFrame(
data=[
(8, 20),
(9, 43),
(15, 88)
],
schema=["column1", "column2"]
)
test_df.show()
+-------+-------+
|column1|column2|
+-------+-------+
| 8| 20|
| 9| 43|
| 15| 88|
+-------+-------+
The following cell apply the filter with the condition specified using a direct reference to the test_df.column1.
condition = (test_df.column1 > 10)
print(type(condition))
test_df.filter(condition=condition).show()
<class 'pyspark.sql.classic.column.Column'>
+-------+-------+
|column1|column2|
+-------+-------+
| 15| 88|
+-------+-------+
Alternatively, the next cell specifies the calculation using the abstract column2. However, the withColumn function of the test_df interprets it as a reference to column2 it contains.
calculation = col("column2") + 8
test_df.withColumn("result", calculation).show()
+-------+-------+------+
|column1|column2|result|
+-------+-------+------+
| 8| 20| 28|
| 9| 43| 51|
| 15| 88| 96|
+-------+-------+------+
Transformations#
This section consdiers general data transformation accessible in PySpark:
The
withColumn: allows to specify the result column, and a computation that would be used to produce values.The
selectExr: allows you to specify the operations under the columns using SQL syntax to produce the new ones.The
naattribute gathers methods that specify transformation how to handle missing values:na.fill,na.dropandna.replace.The
replacemethod allows to specify which values to replace.
Check Transformations page.
Group by#
The data frame contains the groupBy method method, which returns a special GroupedData object. This object contains a set of tools for building an aggregations over the data:
Method |
Description |
|---|---|
|
General aggregation with one or more expressions. |
|
Computes the average of the given columns. |
|
Alias for |
|
Maximum value for each column. |
|
Minimum value for each column. |
|
Sum of values for each column. |
|
Count of rows for each group. |
|
Performs a pivot (like SQL |
|
Apply a function to each group as a Pandas DataFrame and return a new DataFrame. |
|
Apply a user-defined function to each group (returns an RDD, not a DataFrame — less common). |
Check more details in the groupby page.
The following cell defines an example data frame. It constructs and shows the GroupedData object based on it.
test_df = spark_session.createDataFrame(
data=[
("a", 3),
("a", 2),
("c", 4),
("c", 7)
],
schema=['group', 'value']
)
grouped_expression = test_df.groupBy('group')
grouped_expression
GroupedData[grouping expressions: [group], value: [group: string, value: bigint], type: GroupBy]
The following code shows how to use the agg function to compute the aggregations based on the groups.
from pyspark.sql.functions import sum, avg, min, max
grouped_expression.agg(
sum('value'),
avg('value'),
min('value'),
max('value')
).show()
+-----+----------+----------+----------+----------+
|group|sum(value)|avg(value)|min(value)|max(value)|
+-----+----------+----------+----------+----------+
| a| 5| 2.5| 2| 3|
| c| 11| 5.5| 4| 7|
+-----+----------+----------+----------+----------+
ML#
There is a pyspark.ml package that contains a set of tools that implements a typical ml transformations. The following table provides an overwiew of its capabilities:
Module / Class |
Description |
|---|---|
Pipeline |
Combines multiple stages (transformers + estimators) into a single workflow. |
Transformer |
An object that transforms a DataFrame (e.g., feature scaling). |
Estimator |
An object that fits a model from data and returns a Transformer. |
Evaluator |
Computes metrics to evaluate model performance. |
tuning |
Tools for hyperparameter tuning ( |
feature |
Feature engineering utilities (e.g., |
classification |
Classification algorithms (e.g., |
regression |
Regression algorithms (e.g., |
clustering |
Clustering algorithms (e.g., |
recommendation |
Collaborative filtering (e.g., |
stat |
Statistical functions (e.g., |
For more details check:
Machine Learning Library Guide in spark documentation.
ML page.
Declarative Pipelines#
Spark Declarative Pipelines is facility to build a data pipelines in spark. Check more in the Spark Declarative Pipelines Programming Guide.
You can create different spark structures based on the outputs of the functions wrapped in the corresponding decorator.
Decorcator |
Description |
|---|---|
|
Creates a materilized view |
|
Creates a temporary view |
|
Creates a streaming table |
|
Flows to appending data to the target |
The spark-pipelines command is used to manage the pipelines:
The
spark-pipelines init --name <name>to initialise the project.The
spark-pipelines runto run the project.
Consider the process of creating and running Spark’s declarative pipes.
The following cell creates a folder and initializes the project within it.
%%bash
rm -rf /tmp/spark_declarative_pipe
mkdir /tmp/spark_declarative_pipe
cd /tmp/spark_declarative_pipe
spark-pipelines init --name my_first_pipeline
WARNING: Using incubator modules: jdk.incubator.vector
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/10/01 09:15:01 WARN Utils: Your hostname, user-ThinkPad-E16-Gen-2, resolves to a loopback address: 127.0.1.1; using 10.202.22.210 instead (on interface enp0s31f6)
25/10/01 09:15:01 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Pipeline project 'my_first_pipeline' created successfully. To run your pipeline:
cd 'my_first_pipeline'
spark-pipelines run
25/10/01 09:15:04 INFO ShutdownHookManager: Shutdown hook called
25/10/01 09:15:04 INFO ShutdownHookManager: Deleting directory /tmp/spark-8bbe0540-0ad6-418d-88af-185649eb6aef
The following cell represents the structure of the generated project.
!tree /tmp/spark_declarative_pipe
/tmp/spark_declarative_pipe
└── my_first_pipeline
├── pipeline.yml
└── transformations
├── example_python_materialized_view.py
└── example_sql_materialized_view.sql
3 directories, 3 files
The pipeline.yml defines the behaviour of the pipes:
!cat /tmp/spark_declarative_pipe/my_first_pipeline/pipeline.yml
name: my_first_pipeline
libraries:
- glob:
include: transformations/**
THe following cell represents an example of the output.
!cat /tmp/spark_declarative_pipe/my_first_pipeline/transformations/example_python_materialized_view.py
from pyspark import pipelines as dp
from pyspark.sql import DataFrame, SparkSession
spark = SparkSession.active()
@dp.materialized_view
def example_python_materialized_view() -> DataFrame:
return spark.range(10)
This is a script generates a table with numbers from 0 to 9. With name corresponding to the name of the wrapped function: example_python_materialized_view.
The next code runs the pipeline.
%%bash
cd /tmp/spark_declarative_pipe/my_first_pipeline
spark-pipelines run
WARNING: Using incubator modules: jdk.incubator.vector
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/10/01 09:21:29 WARN Utils: Your hostname, user-ThinkPad-E16-Gen-2, resolves to a loopback address: 127.0.1.1; using 10.202.22.210 instead (on interface enp0s31f6)
25/10/01 09:21:29 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
2025-10-01 09:21:31: Loading pipeline spec from /tmp/spark_declarative_pipe/my_first_pipeline/pipeline.yml...
2025-10-01 09:21:31: Creating Spark session...
WARNING: Using incubator modules: jdk.incubator.vector
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/10/01 09:21:33 WARN Utils: Your hostname, user-ThinkPad-E16-Gen-2, resolves to a loopback address: 127.0.1.1; using 10.202.22.210 instead (on interface enp0s31f6)
25/10/01 09:21:33 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/01 09:21:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
/home/user/.virtualenvironments/python/lib/python3.13/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/connect/conf.py:64: UserWarning: Failed to set spark.sql.catalogImplementation to Some(in-memory) due to [CANNOT_MODIFY_CONFIG] Cannot modify the value of the Spark config: "spark.sql.catalogImplementation".
See also 'https://spark.apache.org/docs/latest/sql-migration-guide.html#ddl-statements'. SQLSTATE: 46110
2025-10-01 09:21:37: Creating dataflow graph...
2025-10-01 09:21:38: Registering graph elements...
2025-10-01 09:21:38: Loading definitions. Root directory: '/tmp/spark_declarative_pipe/my_first_pipeline'.
2025-10-01 09:21:38: Found 2 files matching glob 'transformations/**/*'
2025-10-01 09:21:38: Registering SQL file /tmp/spark_declarative_pipe/my_first_pipeline/transformations/example_sql_materialized_view.sql...
2025-10-01 09:21:38: Importing /tmp/spark_declarative_pipe/my_first_pipeline/transformations/example_python_materialized_view.py...
2025-10-01 09:21:38: Starting run...
2025-10-01 07:21:39: Flow spark_catalog.default.example_python_materialized_view is QUEUED.
2025-10-01 07:21:39: Flow spark_catalog.default.example_sql_materialized_view is QUEUED.
2025-10-01 07:21:39: Flow spark_catalog.default.example_python_materialized_view is PLANNING.
2025-10-01 07:21:39: Flow spark_catalog.default.example_python_materialized_view is STARTING.
2025-10-01 07:21:39: Flow spark_catalog.default.example_python_materialized_view is RUNNING.
2025-10-01 07:21:41: Flow spark_catalog.default.example_python_materialized_view has COMPLETED.
2025-10-01 07:21:42: Flow spark_catalog.default.example_sql_materialized_view is PLANNING.
2025-10-01 07:21:42: Flow spark_catalog.default.example_sql_materialized_view is STARTING.
2025-10-01 07:21:42: Flow spark_catalog.default.example_sql_materialized_view is RUNNING.
2025-10-01 07:21:43: Flow spark_catalog.default.example_sql_materialized_view has COMPLETED.
2025-10-01 07:21:44: Run is COMPLETED.
25/10/01 09:21:45 INFO ShutdownHookManager: Shutdown hook called
25/10/01 09:21:45 INFO ShutdownHookManager: Deleting directory /tmp/spark-f82ca8cb-3e75-42fb-ba08-0d4b7b65a10f
As the result spark-warehouse folder is generated.
!tree /tmp/spark_declarative_pipe
/tmp/spark_declarative_pipe
└── my_first_pipeline
├── pipeline.yml
├── spark-warehouse
│ ├── example_python_materialized_view
│ │ ├── part-00000-183d4b9e-e121-4d6b-849c-a320e7e99683-c000.snappy.parquet
│ │ └── _SUCCESS
│ └── example_sql_materialized_view
│ ├── part-00000-5ecdee8c-bcb8-4e9e-a7ef-38dd8f6d9dfd-c000.snappy.parquet
│ └── _SUCCESS
└── transformations
├── example_python_materialized_view.py
├── example_sql_materialized_view.sql
└── __pycache__
└── example_python_materialized_view.cpython-313.pyc
7 directories, 8 files
The following cell load the table generated by the run.
import pandas as pd
pd.read_parquet('/tmp/spark_declarative_pipe/my_first_pipeline/spark-warehouse/example_python_materialized_view')
| id | |
|---|---|
| 0 | 0 |
| 1 | 1 |
| 2 | 2 |
| 3 | 3 |
| 4 | 4 |
| 5 | 5 |
| 6 | 6 |
| 7 | 7 |
| 8 | 8 |
| 9 | 9 |