Data sources

Data sources#

This section describes the tools implemented in Spark for saving, loading, and versioning the data.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark_session = SparkSession.builder.appName("Temp").getOrCreate()
WARNING: Using incubator modules: jdk.incubator.vector
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/09/22 14:20:40 WARN Utils: Your hostname, user-ThinkPad-E16-Gen-2, resolves to a loopback address: 127.0.1.1; using 10.202.22.210 instead (on interface enp0s31f6)
25/09/22 14:20:40 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/22 14:20:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Saving#

Spark has a set of methods for saving data frames for future use. Consider the most important ones. For that data frame has an attribute write that refers to a set of methods: parquiet, csv, and json; with really transparent naming.

There is also a saveAsTable method, that saves the data to the special storage managed by the Spark. This storage could be: Hive Metastore or DeltaLake.


Consider saving the simple table as a JSON table.

df = spark_session.createDataFrame(
    data=[("Alice", 25), ("Bob", 30), ("Cathy", 35)],
    schema=["Name", "Age"]
)

data_path = "/tmp/my_data"
df.write.json(data_path, mode='overwrite')
                                                                                

A set of .json files is created in the destination folder to store the data.

import os
os.listdir(data_path)
['part-00011-87a1ac33-4c79-4760-b7f5-16147b167620-c000.json',
 '.part-00000-87a1ac33-4c79-4760-b7f5-16147b167620-c000.json.crc',
 '.part-00005-87a1ac33-4c79-4760-b7f5-16147b167620-c000.json.crc',
 '.part-00011-87a1ac33-4c79-4760-b7f5-16147b167620-c000.json.crc',
 '._SUCCESS.crc',
 'part-00000-87a1ac33-4c79-4760-b7f5-16147b167620-c000.json',
 'part-00017-87a1ac33-4c79-4760-b7f5-16147b167620-c000.json',
 '.part-00017-87a1ac33-4c79-4760-b7f5-16147b167620-c000.json.crc',
 'part-00005-87a1ac33-4c79-4760-b7f5-16147b167620-c000.json',
 '_SUCCESS']

Consider important for now:

import json

for file_name in os.listdir(data_path):
    if file_name.endswith(".json"):
        with open(data_path + "/" + file_name, "r") as f:
            try:
                print(json.load(f))
            except: pass
{'Name': 'Bob', 'Age': 30}
{'Name': 'Cathy', 'Age': 35}
{'Name': 'Alice', 'Age': 25}

Read csv#

Use the read.csv method of the spark session to read a CSV file.


The following cell reads the spark.csv file that I prepared earlier.

df = spark_session.read.csv(
    "data_sources_files/scv_example.csv",
    header=True,
    inferSchema=True,
    multiLine=True,
    escape=','
)
display(df)
DataFrame[Name: string,  Age: double,  Salary: double]

Shcema#

Use the schema argument to define the schema. The schema can be specified as a simple string that matches column names with their expected data types.


The following cell shows the matching of the int data type to the Age column instead of the default double data type.

schema = """
Name string,
Age int,
Salary double
"""

spark_session.read.csv(
    "data_sources_files/scv_example.csv",
    schema=schema
)
DataFrame[Name: string, Age: int, Salary: double]

SQL catalog#

The Spark SQL catalog is a special file system that provies SQL access and data is described by a special metadata provided by PySpark. This section demonstrates how to access the capabilitites of the SQL catalog from the python SDK.


The SQL catalog is specified by the spark.sql.warehouse.dir attribute in the Spark configuration. The following cell displays the SQL catalog for the current Spark session.

Note It should be specified when creating of the session.

The following cell recreates the session with required configuration:

if SparkSession.getActiveSession() is not None:
    spark_session.stop()

spark_session = (
    SparkSession
    .builder
    .config("spark.sql.warehouse.dir", "/tmp/spark-warehouse")
    .getOrCreate()
)
spark_session.conf.get("spark.sql.warehouse.dir")
stopping
'file:/tmp/spark-warehouse'

The following cell uses the write.saveAsTable method to store the data frame.

df = spark_session.createDataFrame(
    data=[("Alice", 25), ("Bob", 30), ("Cathy", 35)],
    schema=["Name", "Age"]
)
df.write.saveAsTable("example_save")

The corresponding folder should now be in the warehouse storage:

!ls /tmp/spark-warehouse
example_save

This folder contains the partitions of the saved dataset.

!ls /tmp/spark-warehouse/example_save
part-00000-74ba1525-2161-4c89-94b1-c925b52a41ff-c000.snappy.parquet
part-00005-74ba1525-2161-4c89-94b1-c925b52a41ff-c000.snappy.parquet
part-00011-74ba1525-2161-4c89-94b1-c925b52a41ff-c000.snappy.parquet
part-00017-74ba1525-2161-4c89-94b1-c925b52a41ff-c000.snappy.parquet
_SUCCESS

You can now access the saved using the SQL syntax provided by the sql method of the session.

spark_session.sql("SELECT * FROM example_save;").show()
+-----+---+
| Name|Age|
+-----+---+
|Cathy| 35|
|Alice| 25|
|  Bob| 30|
+-----+---+

Delta lake#

You can specify the delta lake to be a SQL catalog for your spark session you need:

  • Install delta-lake python package.

  • Create session from builded by using delta.configure_sparak_with_delta_pip.

  • Set parameters:

    • spark.sql.extensions to the io.delta.sql.DeltaSparkSessionExtension.

    • spark.sql.catalog.spark_catalog to the org.apache.spark.sql.delta.catalog.DeltaCatalog.

Delta Lake only provides just a way to store information, so all the other parameters that configure the SQL catalog are still valid.


The following cell build session which configuration includes the delta lake.

from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip

builder = (
    SparkSession.builder
    .appName("DeltaLocalExample")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.sql.warehouse.dir", "/tmp/spark-warehouse")
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()
WARNING: Using incubator modules: jdk.incubator.vector
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/09/22 15:14:05 WARN Utils: Your hostname, user-ThinkPad-E16-Gen-2, resolves to a loopback address: 127.0.1.1; using 10.202.22.210 instead (on interface enp0s31f6)
25/09/22 15:14:05 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/home/user/.virtualenvironments/python/lib/python3.13/site-packages/pyspark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/user/.ivy2.5.2/cache
The jars for the packages stored in: /home/user/.ivy2.5.2/jars
io.delta#delta-spark_2.13 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-f8cbeb2d-5543-4026-b5b3-d384e23d527e;1.0
	confs: [default]
	found io.delta#delta-spark_2.13;4.0.0 in central
	found io.delta#delta-storage;4.0.0 in central
	found org.antlr#antlr4-runtime;4.13.1 in central
:: resolution report :: resolve 176ms :: artifacts dl 6ms
	:: modules in use:
	io.delta#delta-spark_2.13;4.0.0 from central in [default]
	io.delta#delta-storage;4.0.0 from central in [default]
	org.antlr#antlr4-runtime;4.13.1 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0   |   0   |   0   ||   3   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-f8cbeb2d-5543-4026-b5b3-d384e23d527e
	confs: [default]
	0 artifacts copied, 3 already retrieved (0kB/8ms)
25/09/22 15:14:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

The following cell creates a table in the delta format, allowing it to take advantage of delta lake benefits.

df = spark.createDataFrame(
    data=[
        (1, 2),
        (4, 1),
        (2, 3)
    ],
    schema=["val1", "val2"]
)
df.write.format('delta').saveAsTable("version_example")
25/09/22 15:15:13 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

One important feature of the delta lake is its ability to version data. The following cell adds a new column and saves the resulting data.

(
    df
    .withColumn("val3", df["val1"] + df["val2"])
    .write.format("delta").mode("overwrite")
    .option("overwriteSchema", True)
    .saveAsTable("version_example")
)

Using the sql command DESCRIBE HISTORY <table_name>; allows you to load the table’s change log.

spark.sql("DESCRIBE HISTORY version_example;").show()
+-------+--------------------+------+--------+--------------------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|userId|userName|           operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+--------------------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|      1|2025-09-22 15:15:...|  NULL|    NULL|CREATE OR REPLACE...|{partitionBy -> [...|NULL|    NULL|     NULL|          0|  Serializable|        false|{numFiles -> 4, n...|        NULL|Apache-Spark/4.0....|
|      0|2025-09-22 15:15:...|  NULL|    NULL|CREATE TABLE AS S...|{partitionBy -> [...|NULL|    NULL|     NULL|       NULL|  Serializable|         true|{numFiles -> 4, n...|        NULL|Apache-Spark/4.0....|
+-------+--------------------+------+--------+--------------------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+

To retrieve the corresponding data version as a data frame, specify the desired version you want to get in the versionAsOf option of the read constructor. The two following cells load both versions created earlier.

spark.read.format("delta").option("versionAsOf", 0).table("version_example").show()
+----+----+
|val1|val2|
+----+----+
|   1|   2|
|   2|   3|
|   4|   1|
+----+----+
spark.read.format("delta").option("versionAsOf", 1).table("version_example").show()
+----+----+----+
|val1|val2|val3|
+----+----+----+
|   2|   3|   5|
|   1|   2|   3|
|   4|   1|   5|
+----+----+----+