Group by#
Spark SQL supports a typical “group by” operations. The corresponding tools are provided by the grouping data object that comes from the data frame’s groupBy
method. This page discusses the options for using the grouped data object.
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType, StructType, StructField
spark_session = SparkSession.builder.appName('Temp').getOrCreate()
WARNING: Using incubator modules: jdk.incubator.vector
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/09/20 23:20:38 WARN Utils: Your hostname, fedor-NUC10i7FNK, resolves to a loopback address: 127.0.1.1; using 192.168.100.19 instead (on interface wlp0s20f3)
25/09/20 23:20:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/20 23:20:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Direct methos#
There is a set of methods that directly return just one specific aggregation: min
, max
, avg
, mean
, and count
. You can list the columns for which you want to compute these aggregations. The meaning completely matches the functions names. They will calculate the aggregations by all available columns by default, but you can also specify the specific columns to be used in the output.
The following cell defines the data frame and grouped data that will be used as the example.
df = spark_session.createDataFrame(
[
("a", 10, 7, 9),
("a", 18, 3, 1),
("b", 12, 9, 1),
("b", 15, 7, 0),
("c", 4, 9, 12),
("c", 12, 15, 5)
],
schema=['group', "value1", "value2", "value3"]
)
gb = df.groupBy("group")
The following cell shows the application of the min
function, without specifying wich column to use.
gb.min().show()
+-----+-----------+-----------+-----------+
|group|min(value1)|min(value2)|min(value3)|
+-----+-----------+-----------+-----------+
| a| 10| 3| 1|
| b| 12| 7| 0|
| c| 4| 9| 5|
+-----+-----------+-----------+-----------+
The max
function is only used for the value1
column:
gb.max("value1").show()
+-----+-----------+
|group|max(value1)|
+-----+-----------+
| a| 18|
| b| 15|
| c| 12|
+-----+-----------+
The application of the avg
to value1
and value2
:
gb.avg("value1", "value2").show()
+-----+-----------+-----------+
|group|avg(value1)|avg(value2)|
+-----+-----------+-----------+
| a| 14.0| 5.0|
| b| 13.5| 8.0|
| c| 8.0| 12.0|
+-----+-----------+-----------+
Agg#
The agg
method of the grouped data object provides general aggregations. You only need to list the expressions that instruct Spark what to compute. The following table lists the functions that can be used to design an aggregation:
Function |
Description |
---|---|
|
Number of rows for the given column (non-null only). |
|
Count of distinct values across one or more columns. |
|
Approximate count of distinct values using HyperLogLog (faster than |
|
Sum of values in a column. |
|
Sum of distinct values in a column. |
|
Average (mean) of column values. |
|
Maximum value in the column. |
|
Minimum value in the column. |
|
First value in the group. |
|
Last value in the group. |
|
Collects values into a Python list (duplicates preserved). |
|
Collects unique values into a Python set (duplicates removed). |
|
Sample variance of values in the group. |
|
Population variance of values in the group. |
|
Sample standard deviation of values in the group. |
|
Population standard deviation of values in the group. |
|
Pearson correlation coefficient between two columns. |
|
Sample covariance between two columns. |
|
Population covariance between two columns. |
|
Skewness of values in the group. |
|
Kurtosis of values in the group. |
|
Approximate percentile of column values (for quantile analysis). |
|
Bitwise AND of all values in the group. |
|
Bitwise OR of all values in the group. |
|
Bitwise XOR of all values in the group. |
|
Returns the most frequent value (mode) in the column. |
The following cell defines a data frame that will be used as an example:
df = spark_session.createDataFrame(
[
("a", 10),
("a", 18),
("b", 12),
("b", 15),
("c", 4),
("c", 12)
],
schema=['group', "value"]
)
gb = df.groupBy("group")
There is also the usage of the agg
method with a few aggregation functions.
gb.agg(
F.sum("value"),
F.avg("value").alias("new name"),
F.mode("value")
).show()
+-----+----------+--------+-----------+
|group|sum(value)|new name|mode(value)|
+-----+----------+--------+-----------+
| a| 28| 14.0| 18|
| b| 27| 13.5| 15|
| c| 16| 8.0| 4|
+-----+----------+--------+-----------+
Pivot#
The grouped data objects have a special method, pivot
, that creates an additional grouping along a new axis. This method returns a new grouped data object that can generally be operated on as if it were a regular grouped data object. However, if a grouped data object has already undergone a pivot transformation, it can’t be applied again.
The following cell creates a data frame that we will use as an example. We applied the groupBy
and pivot
so you can see what the output object looks like.
df = spark_session.createDataFrame(
[
("a", "x", 13, 12),
("a", "y", 15, 17),
("b", "y", 18, 4),
("b", "x", 11, 5),
("c", "x", 10, 33),
("c", "x", 7, 1)
],
schema=['group1', "group2", "value1", "value2"]
)
gb = df.groupBy("group1")
pivot = gb.pivot("group2")
print(pivot)
GroupedData[grouping expressions: [group1], value: [group1: string, group2: string ... 2 more fields], type: Pivot]
The simpliest case, with just one output value for each group, looks like as presented in the following cell:
pivot.sum("value1").show()
+------+---+----+
|group1| x| y|
+------+---+----+
| c| 17|NULL|
| b| 11| 18|
| a| 13| 15|
+------+---+----+
This case infolves aggregting sevaral values for each group of different variables using different aggregation functions:
pivot.agg(F.sum("value1"), F.avg("value2")).show()
+------+-------------+-------------+-------------+-------------+
|group1|x_sum(value1)|x_avg(value2)|y_sum(value1)|y_avg(value2)|
+------+-------------+-------------+-------------+-------------+
| c| 17| 17.0| NULL| NULL|
| b| 11| 5.0| 18| 4.0|
| a| 13| 12.0| 15| 17.0|
+------+-------------+-------------+-------------+-------------+
Apply#
The apply
function in Spark allows you to apply a function built by the user-built to each group of the grouped data. Use the applyInPandas()
method on the grouped data to process the groups as Pandas data frames.
The following cell defines the data frame will be used for the examples and groups by the “group” column.
df = spark_session.createDataFrame(
[
("a", 12),
("a", 17),
("b", 4),
("b", 5),
("c", 33),
("c", 1)
],
schema=['group', "value"]
)
gb = df.groupBy("group")
The some_function
simly add up the value
for each group and packs the result pack into a dataframe as required by applyInPandas
. It also prints the input data to demonstrate the input format.
def some_function(inp: pd.DataFrame) -> pd.DataFrame:
print(inp)
return pd.DataFrame({
"group": [inp["group"][0]],
"value": [inp["value"].sum()]
})
schema = StructType([
StructField("group", StringType()),
StructField("value", IntegerType())
])
gb.applyInPandas(some_function, schema=schema).show()
+-----+-----+
|group|value|
+-----+-----+
| a| 29|
| b| 9|
| c| 34|
+-----+-----+
group value
0 a 12
1 a 17
group value
0 b 4
1 b 5
group value
0 c 33
1 c 1