ML#

The pyspark package conatins the ml module, which contains a set of classes for building typical for machine learning pipelines. This page discusses the details of the pyspark.ml module.

Check the official description in Machine Learning Library Guide.

from pyspark.sql import SparkSession
spark_session = SparkSession.builder.appName("temp").getOrCreate()
WARNING: Using incubator modules: jdk.incubator.vector
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/09/23 15:57:13 WARN Utils: Your hostname, user-ThinkPad-E16-Gen-2, resolves to a loopback address: 127.0.1.1; using 10.202.22.210 instead (on interface enp0s31f6)
25/09/23 15:57:13 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/23 15:57:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/09/23 15:57:15 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/09/23 15:57:15 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
25/09/23 15:57:15 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.

Features#

The features module contains a tools for preparing the input data for a machine learning models. The following table overviews the most classes assosiated with that:

Class / Transformer

Description

Binarizer

Converts continuous values into 0/1 based on a threshold.

Bucketizer

Splits a continuous feature into buckets (bins) given split points.

QuantileDiscretizer

Similar to Bucketizer but automatically computes bins by quantiles.

StringIndexer

Converts string labels into numeric indices.

IndexToString

Converts numeric indices back to original string labels.

OneHotEncoder

Converts indexed categorical values into one-hot encoded vectors.

VectorAssembler

Combines multiple columns into a single vector column (features).

VectorIndexer

Identifies categorical features in a vector and indexes them automatically.

PolynomialExpansion

Generates polynomial features up to a specified degree.

Interaction

Generates interaction features between input columns.

Normalizer

Normalizes feature vectors to unit norm (L1, L2, max).

StandardScaler

Standardizes features by removing mean and scaling to unit variance.

MinMaxScaler

Scales features to a specified range (default [0, 1]).

MaxAbsScaler

Scales features to [-1, 1] based on max absolute value per feature.

Imputer

Fills missing values in numeric columns with mean, median, or mode.

PCA

Performs Principal Component Analysis for dimensionality reduction.

ChiSqSelector

Selects categorical features based on Chi-Squared test results.

StopWordsRemover

Removes stop words from text columns.

Tokenizer

Splits text into individual words (tokens).

RegexTokenizer

Tokenizes text using a regular expression pattern.

HashingTF

Maps a sequence of terms to fixed-length feature vectors using hashing.

CountVectorizer

Converts text documents to term frequency vectors with a learned vocab.

IDF

Computes inverse document frequency for TF-IDF transformation.

Word2Vec

Learns word embeddings from text data.

ElementwiseProduct

Multiplies each element of a vector by a constant scaling vector.

DCT

Applies Discrete Cosine Transform to a vector column.

Imputer#

The imputer replaces the missing values in the input data with a constant value which is mean, mode or median from the column.


The following cell shows usage of the Inputer.

from pyspark.ml.feature import Imputer

df = spark_session.createDataFrame(
    data=[
        (2, None),
        (None, 4),
        (6, 6)
    ],
    schema=["a", "b"]
)

(
    Imputer(
        inputCols=["a", "b"],
        outputCols=["a_transf", "b_transf"]
    )
    .fit(df).transform(df).show()
)
+----+----+--------+--------+
|   a|   b|a_transf|b_transf|
+----+----+--------+--------+
|   2|NULL|       2|       5|
|NULL|   4|       4|       4|
|   6|   6|       6|       6|
+----+----+--------+--------+

OHE#

Ohe is realised through OneHotEncoder. Some of details assocaited with using it:

  • It only handle only numeric values, so you must replace your categorical values with numeric labels before, for example, using the StringIndexer.

  • The output will be column of the sparse vectors.


The following cell demonstrates how to apply the OneHotEncoder to the simple spark dataframe and prints the results.

from pyspark.ml.feature import OneHotEncoder
df = spark_session.createDataFrame(
    data=[(1,), (2,), (1,)],
    schema=["column"]
)
OneHotEncoder(
    inputCols=["column"],
    outputCols=["result"],
    dropLast=False
).fit(df).transform(df).show()
+------+-------------+
|column|       result|
+------+-------------+
|     1|(3,[1],[1.0])|
|     2|(3,[2],[1.0])|
|     1|(3,[1],[1.0])|
+------+-------------+

Pipelines#

Pipelines allow you to build a single object that takes relatively raw data and returns a predictsion. All the typical processing associated with fitting the model is encapsulated in that object.

Check details in the ML Pipelines tutorial of the official documentation.


Consider building a simple pipeline.

from pyspark.ml import Pipeline
from pyspark.ml.feature import Imputer, VectorAssembler, StandardScaler

pipeline = Pipeline(
    stages=[
        Imputer(inputCols=["col1", "col2"], outputCols=["col1", "col2"]),
        VectorAssembler(inputCols=["col1", "col2"], outputCol="assembled_columns"),
        StandardScaler(inputCol="assembled_columns", outputCol="output")
    ]
)

The defined pipeline must:

  • Replace missing values using the Imputer and save outputs to the same columns.

  • Compose values to a vector feature with VectorAssembler.

  • Standartize the assembled_columns with StandardScaler.

The example of applying such transformation is displayed below:

df = spark_session.createDataFrame(
    data=[
        (10, 20),
        (40, 50),
        (None, 20)
    ],
    schema=["col1", "col2"]
)

pipeline.fit(df).transform(df).show()
+----+----+-----------------+--------------------+
|col1|col2|assembled_columns|              output|
+----+----+-----------------+--------------------+
|  10|  20|      [10.0,20.0]|[0.66666666666666...|
|  40|  50|      [40.0,50.0]|[2.66666666666666...|
|  25|  20|      [25.0,20.0]|[1.66666666666666...|
+----+----+-----------------+--------------------+