Transformations

Transformations#

This section explores a variety of tools for transforming data frames in PySpark.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark_session = SparkSession.builder.appName("Temp").getOrCreate()
WARNING: Using incubator modules: jdk.incubator.vector
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/09/23 13:43:41 WARN Utils: Your hostname, user-ThinkPad-E16-Gen-2, resolves to a loopback address: 127.0.1.1; using 10.202.22.210 instead (on interface enp0s31f6)
25/09/23 13:43:41 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/23 13:43:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/09/23 13:43:42 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.

With column#

The dataframe object provides a withColumn method to operate with columns. You are supposed to provide:

  • The name of the column in which the result should be srored. If the column doesn’t exists, it will be created in output dataframe.

  • The column object or computational expression that defines the new column.


The following cell creates the data frame that we will use for our experiments.

test_df = spark_session.createDataFrame(
    data=[
        (8, "value1"),
        (9, "value2")
    ],
    schema=["numbers", "strings"]
)
test_df.show()
+-------+-------+
|numbers|strings|
+-------+-------+
|      8| value1|
|      9| value2|
+-------+-------+

The following code modifies the example data frame by using withColumn function.

test_df.withColumn(
    "numbers",
    col("numbers") + 90
).show()
+-------+-------+
|numbers|strings|
+-------+-------+
|     98| value1|
|     99| value2|
+-------+-------+

Select expression#

The Spark DataFrame has a selectExpr method that allows you to build a new data frame just by specifying columns of the result data using SQL syntax.


The following cell defines and displays the data frame that will be used as an example.

df = spark_session.createDataFrame(
    [(1, "Alice", 10), (2, "Bob", 20), (3, "Charlie", 30)],
    ["id", "name", "score"]
)
df.show()
+---+-------+-----+
| id|   name|score|
+---+-------+-----+
|  1|  Alice|   10|
|  2|    Bob|   20|
|  3|Charlie|   30|
+---+-------+-----+

The following cell demonstrates the use of the selectExpr method in the following patterns: performing multiplication on a constand, producing a boolean value, and performing an operation on two columns.

df.selectExpr(
    "id",
    "name",
    "score * 2 as double_score",
    "score > 15 as is_high_score",
    "id + score"
).show()
+---+-------+------------+-------------+------------+
| id|   name|double_score|is_high_score|(id + score)|
+---+-------+------------+-------------+------------+
|  1|  Alice|          20|        false|          11|
|  2|    Bob|          40|         true|          22|
|  3|Charlie|          60|         true|          33|
+---+-------+------------+-------------+------------+

Nan values#

The Spark DataFrame contains an na attribute that provides access to the methods associated with processing missing values.


The following cell defines the dataset that we will use as an example when dealing with missing values.

df = spark_session.createDataFrame(
    data=[
        (20, 30, None),
        (None, None, None),
        (43, None, None),
        (58, 30, 12)
    ]
)

df.show()
+----+----+----+
|  _1|  _2|  _3|
+----+----+----+
|  20|  30|NULL|
|NULL|NULL|NULL|
|  43|NULL|NULL|
|  58|  30|  12|
+----+----+----+

Drop#

The na.drop method removes rows that contain a specified number of empty values.


The following cell illustrates the purpose of the how='any' argument, which replaces all rows with at least one missing value.

df.na.drop(how='any').show()
+---+---+---+
| _1| _2| _3|
+---+---+---+
| 58| 30| 12|
+---+---+---+

In case how='all', only the row cantaining all missing values are deleted.

df.na.drop(how='all').show()
+---+----+----+
| _1|  _2|  _3|
+---+----+----+
| 20|  30|NULL|
| 43|NULL|NULL|
| 58|  30|  12|
+---+----+----+

The thresh=2 instructs the method to ignore the value of the how parameter, and drop all rows with 2 or more empty values.

df.na.drop(how="any", thresh=2).show()
+---+---+----+
| _1| _2|  _3|
+---+---+----+
| 20| 30|NULL|
| 58| 30|  12|
+---+---+----+