Preparations
The Apache Spark distribution does not contain any Delta Table functions. Instruct the spark-shell with a parameter to load the modules from the internet and activate the Delta package. In my spark-tools project, the libraries are already packaged together with Apache Spark in a Docker container. You can simply start it with “docker run -ti bfblog/spark ./bin/spark-shell”.
Loading the Delta Package Extension
# ./bin/spark-shell --packages io.delta:delta-core_2.12:1.2.1
:: loading settings :: url = jar:file:/opt/bitnami/spark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-5ad0be03-7b7a-4dde-8823-3bff0dc0bd95;1.0
confs: [default]
found io.delta#delta-core_2.12;1.2.1 in central
found io.delta#delta-storage;1.2.1 in central
found org.antlr#antlr4-runtime;4.8 in central
found org.codehaus.jackson#jackson-core-asl;1.9.13 in central
downloading https://repo1.maven.org/maven2/io/delta/delta-core_2.12/1.2.1/delta-core_2.12-1.2.1.jar ...
[SUCCESSFUL ] io.delta#delta-core_2.12;1.2.1!delta-core_2.12.jar (311ms)
downloading https://repo1.maven.org/maven2/io/delta/delta-storage/1.2.1/delta-storage-1.2.1.jar ...
[SUCCESSFUL ] io.delta#delta-storage;1.2.1!delta-storage.jar (19ms)
downloading https://repo1.maven.org/maven2/org/antlr/antlr4-runtime/4.8/antlr4-runtime-4.8.jar ...
[SUCCESSFUL ] org.antlr#antlr4-runtime;4.8!antlr4-runtime.jar (52ms)
downloading https://repo1.maven.org/maven2/org/codehaus/jackson/jackson-core-asl/1.9.13/jackson-core-asl-1.9.13.jar ...
[SUCCESSFUL ] org.codehaus.jackson#jackson-core-asl;1.9.13!jackson-core-asl.jar (43ms)
:: resolution report :: resolve 1883ms :: artifacts dl 430ms
:: modules in use:
io.delta#delta-core_2.12;1.2.1 from central in [default]
io.delta#delta-storage;1.2.1 from central in [default]
org.antlr#antlr4-runtime;4.8 from central in [default]
org.codehaus.jackson#jackson-core-asl;1.9.13 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 4 | 4 | 4 | 0 || 4 | 4 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-5ad0be03-7b7a-4dde-8823-3bff0dc0bd95
confs: [default]
4 artifacts copied, 0 already retrieved (3348kB/10ms)
22/06/01 16:52:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://8cd4c37d792d:4040
Spark context available as 'sc' (master = local[*], app id = local-1654102356302).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.2.1
/_/
Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 1.8.0_332)
Type in expressions to have them evaluated.
Read Delta Table
Reading a delta table is done with format(“delta”) and where(“…”) restricts the partitions (=folders on the disk) to be read.
Read Delta Table
val path = "/data/samples/delta-2"
val partition = "COUNTRY = 'AI'"
val numFilesPerPartition = 1
spark.read
.format("delta")
.load(path)
.where(partition)
Write Delta Table
Writing a delta table is done in reverse. The command format(“delta”) activates the framework, partitionBy(“…”) creates folders for partitioning the data.
With mode(“overwrite”) the affected partition folders are deleted, i.e. data in them are removed. The situation is different with mode(“append”), here the newly created data is placed next to the existing data.
Write Delta Table
targetDF
.sortWithinPartitions("country")
.write
.format("delta")
.partitionBy("country")
.mode("overwrite")
.save("/data/samples/delta-2")
Reduce Files
The query performance depends on the number of files. Many small files have a negative effect. As a result, the overhead for finding, opening, and reading increases with the number of files. Also, writing files benefits from the parallelism of the partitions. In consequence, sufficiently many and well-utilized partitions distribute the work evenly from all worker nodes. “Out of heap” messages are often related to too large or too few partitions. Here, a good compromise is a partition with a size of 128MB. This corresponds to the block size and fits uncompressed into the memory of a 1GB worker node. In the following example, a statistic with lines per file is formed.
Gather Statistics
spark
.read
.format("delta")
.load(archiveBaseDir)
.select("date")
.withColumn("delta_filename", input_file_name() )
.groupBy("date")
.agg(
count("delta_filename").as("records_per_partition"),
countDistinct("delta_filename").as("no_of_files")
)
.withColumn("avg_records_per_file", expr("records_per_partition / no_of_files").cast(LongType))
.withColumn("estimated_no_files", expr( "INT(records_per_partition / 1000000) + 1" ) )
In emprical tests, a number of 100,000 entries per file has proven to be optimal. And so, by counting the entries per file and over the whole partition, a prediction is made for the optimal number of files. Accordingly, deviations from the forecast and the current value are an indicator of the need for action.
Partition Statistics
+----------+---------------------+-----------+--------------------+------------------+
| date|records_per_partition|no_of_files|avg_records_per_file|estimated_no_files|
+----------+---------------------+-----------+--------------------+------------------+
|2021-12-18| 23671118| 24| 986296| 24|
|2021-11-13| 23871053| 24| 994627| 24|
|2021-10-02| 31422964| 32| 981967| 32|
|2018-08-08| 32| 8| 4| 1|
|2022-01-31| 22104874| 23| 961081| 23|
|2018-08-11| 8| 4| 2| 1|
|2021-09-14| 23836697| 24| 993195| 24|
|2022-01-29| 23580085| 24| 982503| 24|
|2022-05-26| 23575350| 24| 982306| 24|
|2022-03-16| 23736937| 24| 989039| 24|
|2022-06-06| 24111697| 25| 964467| 25|
|2022-04-02| 23655082| 24| 985628| 24|
|2021-12-05| 24075930| 25| 963037| 25|
|2022-03-15| 23708672| 24| 987861| 24|
|2022-04-10| 23999533| 24| 999980| 24|
|2022-03-12| 23608498| 24| 983687| 24|
|2021-10-08| 32220077| 33| 976365| 33|
|2022-02-25| 23394173| 81| 288816| 24|
|2022-02-16| 23597495| 24| 983228| 24|
|2015-10-13| 5| 5| 1| 1|
+----------+---------------------+-----------+--------------------+------------------+
A coalesce adjusts the number of partitions to the predicted value. For that reasson, many small files are combined to a few large ones. And so partitions do not become too large, there are no storage problems.
reduce files per disk partition
spark
.read
.format("delta")
.load( archiveBaseDir )
.where( whereClause )
.coalesce(numberOfFilesEst)
.write
.option("dataChange", "false")
.format("delta")
.mode("overwrite")
.option("replaceWhere", whereClause )
.save( archiveBaseDir )
A coalesce adjusts the number of partitions to the predicted value. Many small files are combined to a few large ones. And because partitions do not become too large, there are no storage problems.
Delta Table and Time Travel
The time travel function of the delta table allows access to historical versions of the table. With the DeltaTable class and the functions “restoreToVersion” or “restoreToTime” a historical snapshot of the table is restored. This change is then the new and permanent state. Practically, this is an undo function to take back accidental changes.
Permanent Time Travel
val deltaTable = DeltaTable.forPath(spark, pathToTable)
// rollback to given Version (see history output)
deltaTable.restoreToVersion( 4711 );
// rollback to given Timestamp (see history output)
val timeStamp = java.sql.Timestamp.valueOf("2020-06-29 22:41:30")
deltaTable.restoreToTimestamp( timeStamp );
Another alternative is temporary access to the historical states of the table. While the above functions change the state for all future accesses, here the access reads the recorded version. While other clients or accesses continue to see the current state, our entry is in the past.
Temporary Time Travel
// load snapshot by timestamp
val historyDF = spark
.read
.format("delta")
.option("timestampAsOf", t1)
.load(DELTA_PATH).count()
// load snapshot by version
val historyDF = spark
.read.format("delta")
.option("versionAsOf", t2)
.load(DELTA_PATH).count()
// concurrent client see the recent version of the table
Vacuum
Historical transactions and terminations leave file relics in the folders. Accordingly, the vacuum command removes residues from the folders and frees up disk space. Operations on the Delta Table can take hours or days. And so, to avoid accidentally deleting transactions that have not yet been completed, a safety buffer of several days is defined. Deactivate the security check to allow deletion at shorter intervals (up to 0).
!!! Attention, only do this if you are aware of the consequences !!!
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/data/samples/delta-2")
# deactivate check
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled","false")
# cleanup files
deltaTable.vacuum(0)
# re-activate check
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled","true")
Conclusion
The company Databricks provides a free ebook with the title “Delta Lake: The definitive Guide” for free download. Here you can find more information about the topic. My self-created Docker container already contains the Delta extension. You might also be interested in the article on editing text data with regular expressions?