Python APIs for using Delta Lake with Apache Spark
—
Version control capabilities for Delta Lake tables including time travel queries, table restoration, and comprehensive history exploration. Supports both version-based and timestamp-based operations for data lineage and recovery.
Explore the complete transaction history of Delta tables.
class DeltaTable:
def history(self, limit: Optional[int] = None) -> DataFrame:
"""
Get table commit history in reverse chronological order.
Parameters:
- limit: Optional maximum number of commits to return
Returns:
DataFrame with commit history including version, timestamp, operation, etc.
"""class DeltaTable {
def history(): DataFrame
def history(limit: Int): DataFrame
}History DataFrame contains columns:
version: Table version numbertimestamp: Commit timestampoperation: Operation type (WRITE, UPDATE, DELETE, MERGE, etc.)operationParameters: Operation-specific parametersreadVersion: Version read during operationisBlindAppend: Whether operation was append-onlyoperationMetrics: Metrics like number of files, rows, etc.Restore tables to previous versions or timestamps.
class DeltaTable:
def restoreToVersion(self, version: int) -> DataFrame:
"""
Restore table to specific version number.
Parameters:
- version: Target version number to restore to
Returns:
DataFrame with restoration metrics
"""
def restoreToTimestamp(self, timestamp: str) -> DataFrame:
"""
Restore table to specific timestamp.
Parameters:
- timestamp: Target timestamp in format 'yyyy-MM-dd' or 'yyyy-MM-dd HH:mm:ss'
Returns:
DataFrame with restoration metrics
"""class DeltaTable {
def restoreToVersion(version: Long): DataFrame
def restoreToTimestamp(timestamp: String): DataFrame
}Query historical versions of tables using SQL syntax.
# Time travel with DataFrame API
df = spark.read.format("delta").option("versionAsOf", 5).load("/path/to/table")
df = spark.read.format("delta").option("timestampAsOf", "2023-01-01").load("/path/to/table")
# Time travel with SQL
spark.sql("SELECT * FROM delta.`/path/to/table` VERSION AS OF 5")
spark.sql("SELECT * FROM delta.`/path/to/table` TIMESTAMP AS OF '2023-01-01'")Create table clones at specific versions or timestamps.
class DeltaTable:
def clone(
self,
target: str,
is_shallow: bool,
replace: bool = False,
properties: Optional[Dict[str, str]] = None
) -> DeltaTable:
"""
Clone table to destination.
Parameters:
- target: Target path or table name for clone
- is_shallow: True for shallow clone, False for deep clone
- replace: Whether to replace existing target
- properties: Optional table properties to override
Returns:
DeltaTable instance for cloned table
"""
def cloneAtVersion(
self,
version: int,
target: str,
is_shallow: bool,
replace: bool = False,
properties: Optional[Dict[str, str]] = None
) -> DeltaTable:
"""
Clone table at specific version.
Parameters:
- version: Source version to clone from
- target: Target path or table name
- is_shallow: Clone type (shallow vs deep)
- replace: Whether to replace existing target
- properties: Optional table properties to override
Returns:
DeltaTable instance for cloned table
"""
def cloneAtTimestamp(
self,
timestamp: str,
target: str,
is_shallow: bool,
replace: bool = False,
properties: Optional[Dict[str, str]] = None
) -> DeltaTable:
"""
Clone table at specific timestamp.
Parameters:
- timestamp: Source timestamp to clone from
- target: Target path or table name
- is_shallow: Clone type (shallow vs deep)
- replace: Whether to replace existing target
- properties: Optional table properties to override
Returns:
DeltaTable instance for cloned table
"""class DeltaTable {
def clone(target: String, isShallow: Boolean): DeltaTable
def clone(target: String, isShallow: Boolean, replace: Boolean): DeltaTable
def clone(
target: String,
isShallow: Boolean,
replace: Boolean,
properties: Map[String, String]
): DeltaTable
def cloneAtVersion(version: Long, target: String, isShallow: Boolean): DeltaTable
def cloneAtVersion(
version: Long,
target: String,
isShallow: Boolean,
replace: Boolean
): DeltaTable
def cloneAtVersion(
version: Long,
target: String,
isShallow: Boolean,
replace: Boolean,
properties: Map[String, String]
): DeltaTable
def cloneAtTimestamp(timestamp: String, target: String, isShallow: Boolean): DeltaTable
def cloneAtTimestamp(
timestamp: String,
target: String,
isShallow: Boolean,
replace: Boolean
): DeltaTable
def cloneAtTimestamp(
timestamp: String,
target: String,
isShallow: Boolean,
replace: Boolean,
properties: Map[String, String]
): DeltaTable
}# Get full history
history_df = delta_table.history()
history_df.select("version", "timestamp", "operation", "operationParameters").show()
# Get last 10 commits
recent_history = delta_table.history(10)
recent_history.show()
# Analyze specific operations
history_df.filter(col("operation") == "MERGE").show()# Query table as it was 5 versions ago
old_df = spark.read.format("delta").option("versionAsOf", 5).load("/path/to/table")
# Query table state from yesterday
yesterday_df = spark.read.format("delta").option("timestampAsOf", "2023-12-01").load("/path/to/table")
# Compare current vs historical data
current_count = delta_table.toDF().count()
historical_count = spark.read.format("delta").option("versionAsOf", 0).load("/path/to/table").count()
print(f"Rows added: {current_count - historical_count}")# Restore to specific version (e.g., before bad data was written)
restore_metrics = delta_table.restoreToVersion(10)
restore_metrics.show()
# Restore to timestamp (e.g., state from this morning)
restore_metrics = delta_table.restoreToTimestamp("2023-12-01 09:00:00")
restore_metrics.show()# Create shallow clone for testing
test_table = delta_table.clone(
target="/path/to/test/table",
is_shallow=True,
replace=True
)
# Clone specific version for analysis
analysis_table = delta_table.cloneAtVersion(
version=5,
target="analysis_db.temp_table",
is_shallow=False,
properties={"owner": "data_team"}
)
# Clone yesterday's state for comparison
comparison_table = delta_table.cloneAtTimestamp(
timestamp="2023-12-01",
target="/path/to/comparison",
is_shallow=True
)# Track changes over time
history = delta_table.history()
# Find when specific column was added
schema_changes = history.filter(
col("operationParameters.newSchema").isNotNull()
).select("version", "timestamp", "operationParameters.newSchema")
# Analyze write patterns
write_operations = history.filter(col("operation").isin(["WRITE", "APPEND"]))
write_operations.select(
"version",
"timestamp",
col("operationMetrics.numFiles").alias("files_written"),
col("operationMetrics.numOutputRows").alias("rows_written")
).show()delta.logRetentionDuration (default: 30 days)delta.deletedFileRetentionDuration (default: 7 days)Install with Tessl CLI
npx tessl i tessl/pypi-delta-spark