Spark#
This page considers the python SDK for Spark. For more information, check out thePySpark Overveiew tutorial on the official website.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark_session = SparkSession.builder.appName('Temp').getOrCreate()
WARNING: Using incubator modules: jdk.incubator.vector
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/09/23 11:20:27 WARN Utils: Your hostname, user-ThinkPad-E16-Gen-2, resolves to a loopback address: 127.0.1.1; using 10.202.22.210 instead (on interface enp0s31f6)
25/09/23 11:20:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/23 11:20:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Configuration#
Some configuration is required to start experimenting with Spark in local mode:
pip3 install pyspark
: for spark instalation.Install java:
openjdk-17-jdk
package inapt
. Set path to the jdk to the$JAVA_HOME
variable. In ubuntu caseexport JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64
.
If you have completed the specified configuration correctly, you will be able to run the script below, which creates a local SparkContext
- way to experiment with spark without any clusters.
Spark Context: is a low-level API for manipulating with computational resources provided by Spark.
from pyspark import SparkContext, SparkConf
sc = SparkContext(conf=SparkConf().setMaster("local"))
WARNING: Using incubator modules: jdk.incubator.vector
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/09/19 09:04:44 WARN Utils: Your hostname, user-ThinkPad-E16-Gen-2, resolves to a loopback address: 127.0.1.1; using 10.202.22.210 instead (on interface enp0s31f6)
25/09/19 09:04:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/19 09:04:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Session#
Spark Session: is built on top of the SparkContext tool to implement the way users interact with SparkSQL.
The following list shows the different functions that allow manipulation of the session lifecycle:
The
SparkSession.builder.getOrCreate()
creates the session.The
SparkSession.getActiveSession()
to get the active session, it will returnNone
if there is no active session.The
stop
method allows you to stop the current session. Note. Spark does not allow to keep two sessions on the same JVM.
The following cell illustrates an example of how to create a Spark session.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Temp").getOrCreate()
type(spark)
pyspark.sql.session.SparkSession
After that, SparkSession.getActiveSession
returns an object representing the session.
session = SparkSession.getActiveSession()
type(session)
pyspark.sql.session.SparkSession
After calling stop
from the Spark session the getActiveSession
returns just None
.
spark.stop()
SparkSession.getActiveSession() is None
True
Dataframe#
Spark SQL contains a DataFrame objects that provide a way to interact with tabular data.
You can define a data frame:
Directly from your code using the
createDataFrame
method of the session object.Using some special methods to read from external sources stored in the
read
attribute of the session.
The following cell defines the Spark dataset, which is formatted so that each row is a tuple whose values correspond to each column. And shows it.
df = spark_session.createDataFrame(
data=[("Alice", 25), ("Bob", 30), ("Cathy", 35)]
)
df.show()
+-----+---+
| _1| _2|
+-----+---+
|Alice| 25|
| Bob| 30|
|Cathy| 35|
+-----+---+
The following cell shows an alternative way to define the same data frame. Each row here is represented as a dictionary, and the values are specified under the keys, which correesponds to the column names.
df = spark_session.createDataFrame(
data=[
{"name": "Alice", "age": 25},
{"name": "Bob", "age": 30},
{"name": "Cathy", "age": 35}
]
)
df.show()
+---+-----+
|age| name|
+---+-----+
| 25|Alice|
| 30| Bob|
| 35|Cathy|
+---+-----+
Data sources#
Spark is a typical tool for building ETL pipelines, which include cyclical improvements through the process of saving and loading data. More over spark have special tools for data versioning.
For more details check:
A comprehensive Data Sources guide in the corresponding page.
Data Sources for more practical examples.
The Spark data frame contains the write
interface, for saving data into the storage. The following table lists important methods of the interface.
Method |
Description |
Primary Use Case |
Key Features |
---|---|---|---|
|
Writes the DataFrame to a file system path. You specify the format (e.g., Parquet, CSV, JSON). |
Simple, file-based persistence of data. |
Creates unmanaged data; Spark does not track its metadata. Dropping the table (if you create one) does not delete the files. |
|
Writes the DataFrame to a file system path, in a corresponding format |
Simple, file-based persistence of data. |
Creates unmanaged data; Spark does not track its metadata. Dropping the table (if you create one) does not delete the files. |
|
A specific and highly-recommended way to save data in the Parquet format. |
High-performance, schema-aware storage for analytics. |
Columnar storage, automatic schema preservation, and efficient compression. This is the default for |
|
Creates a managed table in the Hive Metastore. |
Creating a named table for easy querying with Spark SQL. |
Spark manages both the data and its metadata. Dropping the table deletes both the catalog entry and the data files. |
|
Writes the DataFrame to a relational database using a JDBC connection. |
Storing data in a traditional database for transactional or reporting purposes. |
Requires a JDBC driver and connection string. Allows you to specify table names, modes, and connection properties. |
|
A more generic way to save data, explicitly specifying the format and path. |
When using a format that isn’t a dedicated method (e.g., Avro, ORC). |
Gives you full control over the data source format. Also used for setting format-specific options. |
|
A method to partition the data on disk based on one or more columns. |
Optimizing queries that frequently filter on specific columns. |
Creates subdirectories for each unique value of the specified partition column(s), significantly speeding up read operations. |
The Spark data frame implements the read
interface, which has the following specific methods:
Method |
Description |
Primary Use Case |
Key Options |
---|---|---|---|
|
A generic method to read data from a source. You must explicitly specify the format. |
Reading from a data source when you need full control over the format and options. |
|
|
A dedicated, highly-optimized method for reading Parquet files. |
Loading high-performance data that was previously saved by Spark. |
This method automatically infers the schema from the Parquet file’s metadata, so it requires fewer options. |
|
Loads data from CSV files. |
Reading human-readable, simple-structured data where the schema is not embedded. |
|
|
Loads data from line-delimited JSON files. |
Reading semi-structured data from web logs, APIs, or data dumps. |
The method automatically infers the schema, but you can provide a schema to avoid inference. |
|
Connects to and reads data from a relational database table. |
Loading data from a traditional database for ETL or analysis. |
Requires a |
|
Reads a managed table from the Spark/Hive Metastore. |
Reading a table that was previously created using |
This is the easiest way to load data since you only need the table name. Spark handles locating the data and its schema automatically. |
It’s also important to know that Spark’s most native data sources its spark SQL catalog, which in different configuratoins can be different. Possible options are:
Just folder that stores all necesarry files.
Hive storage.
Delta Lake.
Apahce Iceberg.
Columns#
Data frame consists of a set of columns. There are two concepts important to know for refering the columns:
There are corresponding attibute of the data frame.
The
pyspark.sql.functions.col
allows you to define a reference to a column, when applied to a particular dataset, will be interpreted as a specific column in that dataset.
The following table shows the typical use cases in which you may be required to reference a column.
Category |
Method / Operator |
Example |
Description |
---|---|---|---|
Comparison |
|
|
Compares column values. Returns a boolean column. |
Boolean |
|
|
Logical AND ( |
Arithmetic |
|
|
Arithmetic operations between columns or with literals. |
Aliasing |
|
|
Renames the column in the resulting DataFrame. |
Casting |
|
|
Changes the column type. |
Null Handling |
|
|
Tests for |
String Ops |
|
|
String matching and filtering. |
Math |
(via |
|
Use functions like |
Aggregation |
(via |
|
Use |
Conditional |
(via |
|
Build conditional expressions. |
Window Ops |
(via |
|
Used for ranking, lead/lag, etc. |
Collection |
|
|
Access element of array column. |
Struct Access |
|
|
Access field of struct column. |
Spark uses these references to the columns when performing operations like: withColumn
and filter
.
The following cell defines the data frame that will be used as an example.
test_df = spark_session.createDataFrame(
data=[
(8, 20),
(9, 43),
(15, 88)
],
schema=["column1", "column2"]
)
test_df.show()
+-------+-------+
|column1|column2|
+-------+-------+
| 8| 20|
| 9| 43|
| 15| 88|
+-------+-------+
The following cell apply the filter
with the condition specified using a direct reference to the test_df.column1
.
condition = (test_df.column1 > 10)
print(type(condition))
test_df.filter(condition=condition).show()
<class 'pyspark.sql.classic.column.Column'>
+-------+-------+
|column1|column2|
+-------+-------+
| 15| 88|
+-------+-------+
Alternatively, the next cell specifies the calculation
using the abstract column2
. However, the withColumn
function of the test_df
interprets it as a reference to column2
it contains.
calculation = col("column2") + 8
test_df.withColumn("result", calculation).show()
+-------+-------+------+
|column1|column2|result|
+-------+-------+------+
| 8| 20| 28|
| 9| 43| 51|
| 15| 88| 96|
+-------+-------+------+
Transformations#
This section consdiers general data transformation accessible in PySpark:
The
withColumn
: allows to specify the result column, and a computation that would be used to produce values.The
selectExr
: allows you to specify the operations under the columns using SQL syntax to produce the new ones.The
na
attribute gathers methods that specify transformation how to handle missing values:na.fill
,na.drop
andna.replace
.The
replace
method allows to specify which values to replace.
Check Transformations page.
Group by#
The data frame contains the groupBy
method method, which returns a special GroupedData
object. This object contains a set of tools for building an aggregations over the data:
Method |
Description |
---|---|
|
General aggregation with one or more expressions. |
|
Computes the average of the given columns. |
|
Alias for |
|
Maximum value for each column. |
|
Minimum value for each column. |
|
Sum of values for each column. |
|
Count of rows for each group. |
|
Performs a pivot (like SQL |
|
Apply a function to each group as a Pandas DataFrame and return a new DataFrame. |
|
Apply a user-defined function to each group (returns an RDD, not a DataFrame — less common). |
Check more details in the groupby
page.
The following cell defines an example data frame. It constructs and shows the GroupedData
object based on it.
test_df = spark_session.createDataFrame(
data=[
("a", 3),
("a", 2),
("c", 4),
("c", 7)
],
schema=['group', 'value']
)
grouped_expression = test_df.groupBy('group')
grouped_expression
GroupedData[grouping expressions: [group], value: [group: string, value: bigint], type: GroupBy]
The following code shows how to use the agg
function to compute the aggregations based on the groups.
from pyspark.sql.functions import sum, avg, min, max
grouped_expression.agg(
sum('value'),
avg('value'),
min('value'),
max('value')
).show()
+-----+----------+----------+----------+----------+
|group|sum(value)|avg(value)|min(value)|max(value)|
+-----+----------+----------+----------+----------+
| a| 5| 2.5| 2| 3|
| c| 11| 5.5| 4| 7|
+-----+----------+----------+----------+----------+
ML#
There is a pyspark.ml
package that contains a set of tools that implements a typical ml transformations. The following table provides an overwiew of its capabilities:
Module / Class |
Description |
---|---|
Pipeline |
Combines multiple stages (transformers + estimators) into a single workflow. |
Transformer |
An object that transforms a DataFrame (e.g., feature scaling). |
Estimator |
An object that fits a model from data and returns a Transformer. |
Evaluator |
Computes metrics to evaluate model performance. |
tuning |
Tools for hyperparameter tuning ( |
feature |
Feature engineering utilities (e.g., |
classification |
Classification algorithms (e.g., |
regression |
Regression algorithms (e.g., |
clustering |
Clustering algorithms (e.g., |
recommendation |
Collaborative filtering (e.g., |
stat |
Statistical functions (e.g., |
For more details check:
Machine Learning Library Guide in spark documentation.
ML page.