ML#
The pyspark
package conatins the ml
module, which contains a set of classes for building typical for machine learning pipelines. This page discusses the details of the pyspark.ml
module.
Check the official description in Machine Learning Library Guide.
from pyspark.sql import SparkSession
spark_session = SparkSession.builder.appName("temp").getOrCreate()
WARNING: Using incubator modules: jdk.incubator.vector
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/09/23 15:57:13 WARN Utils: Your hostname, user-ThinkPad-E16-Gen-2, resolves to a loopback address: 127.0.1.1; using 10.202.22.210 instead (on interface enp0s31f6)
25/09/23 15:57:13 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/23 15:57:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/09/23 15:57:15 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/09/23 15:57:15 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
25/09/23 15:57:15 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
Features#
The features
module contains a tools for preparing the input data for a machine learning models. The following table overviews the most classes assosiated with that:
Class / Transformer |
Description |
---|---|
Binarizer |
Converts continuous values into 0/1 based on a threshold. |
Bucketizer |
Splits a continuous feature into buckets (bins) given split points. |
QuantileDiscretizer |
Similar to Bucketizer but automatically computes bins by quantiles. |
StringIndexer |
Converts string labels into numeric indices. |
IndexToString |
Converts numeric indices back to original string labels. |
OneHotEncoder |
Converts indexed categorical values into one-hot encoded vectors. |
VectorAssembler |
Combines multiple columns into a single vector column (features). |
VectorIndexer |
Identifies categorical features in a vector and indexes them automatically. |
PolynomialExpansion |
Generates polynomial features up to a specified degree. |
Interaction |
Generates interaction features between input columns. |
Normalizer |
Normalizes feature vectors to unit norm (L1, L2, max). |
StandardScaler |
Standardizes features by removing mean and scaling to unit variance. |
MinMaxScaler |
Scales features to a specified range (default [0, 1]). |
MaxAbsScaler |
Scales features to [-1, 1] based on max absolute value per feature. |
Imputer |
Fills missing values in numeric columns with mean, median, or mode. |
PCA |
Performs Principal Component Analysis for dimensionality reduction. |
ChiSqSelector |
Selects categorical features based on Chi-Squared test results. |
StopWordsRemover |
Removes stop words from text columns. |
Tokenizer |
Splits text into individual words (tokens). |
RegexTokenizer |
Tokenizes text using a regular expression pattern. |
HashingTF |
Maps a sequence of terms to fixed-length feature vectors using hashing. |
CountVectorizer |
Converts text documents to term frequency vectors with a learned vocab. |
IDF |
Computes inverse document frequency for TF-IDF transformation. |
Word2Vec |
Learns word embeddings from text data. |
ElementwiseProduct |
Multiplies each element of a vector by a constant scaling vector. |
DCT |
Applies Discrete Cosine Transform to a vector column. |
Imputer#
The imputer replaces the missing values in the input data with a constant value which is mean, mode or median from the column.
The following cell shows usage of the Inputer
.
from pyspark.ml.feature import Imputer
df = spark_session.createDataFrame(
data=[
(2, None),
(None, 4),
(6, 6)
],
schema=["a", "b"]
)
(
Imputer(
inputCols=["a", "b"],
outputCols=["a_transf", "b_transf"]
)
.fit(df).transform(df).show()
)
+----+----+--------+--------+
| a| b|a_transf|b_transf|
+----+----+--------+--------+
| 2|NULL| 2| 5|
|NULL| 4| 4| 4|
| 6| 6| 6| 6|
+----+----+--------+--------+
OHE#
Ohe is realised through OneHotEncoder
. Some of details assocaited with using it:
It only handle only numeric values, so you must replace your categorical values with numeric labels before, for example, using the
StringIndexer
.The output will be column of the sparse vectors.
The following cell demonstrates how to apply the OneHotEncoder
to the simple spark dataframe and prints the results.
from pyspark.ml.feature import OneHotEncoder
df = spark_session.createDataFrame(
data=[(1,), (2,), (1,)],
schema=["column"]
)
OneHotEncoder(
inputCols=["column"],
outputCols=["result"],
dropLast=False
).fit(df).transform(df).show()
+------+-------------+
|column| result|
+------+-------------+
| 1|(3,[1],[1.0])|
| 2|(3,[2],[1.0])|
| 1|(3,[1],[1.0])|
+------+-------------+
Pipelines#
Pipelines allow you to build a single object that takes relatively raw data and returns a predictsion. All the typical processing associated with fitting the model is encapsulated in that object.
Check details in the ML Pipelines tutorial of the official documentation.
Consider building a simple pipeline.
from pyspark.ml import Pipeline
from pyspark.ml.feature import Imputer, VectorAssembler, StandardScaler
pipeline = Pipeline(
stages=[
Imputer(inputCols=["col1", "col2"], outputCols=["col1", "col2"]),
VectorAssembler(inputCols=["col1", "col2"], outputCol="assembled_columns"),
StandardScaler(inputCol="assembled_columns", outputCol="output")
]
)
The defined pipeline must:
Replace missing values using the
Imputer
and save outputs to the same columns.Compose values to a vector feature with
VectorAssembler
.Standartize the
assembled_columns
withStandardScaler
.
The example of applying such transformation is displayed below:
df = spark_session.createDataFrame(
data=[
(10, 20),
(40, 50),
(None, 20)
],
schema=["col1", "col2"]
)
pipeline.fit(df).transform(df).show()
+----+----+-----------------+--------------------+
|col1|col2|assembled_columns| output|
+----+----+-----------------+--------------------+
| 10| 20| [10.0,20.0]|[0.66666666666666...|
| 40| 50| [40.0,50.0]|[2.66666666666666...|
| 25| 20| [25.0,20.0]|[1.66666666666666...|
+----+----+-----------------+--------------------+