CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-delta-spark

Python APIs for using Delta Lake with Apache Spark

Pending
Overview
Eval results
Files

time-travel.mddocs/

Time Travel and History

Version control capabilities for Delta Lake tables including time travel queries, table restoration, and comprehensive history exploration. Supports both version-based and timestamp-based operations for data lineage and recovery.

Capabilities

Table History

Explore the complete transaction history of Delta tables.

class DeltaTable:
    def history(self, limit: Optional[int] = None) -> DataFrame:
        """
        Get table commit history in reverse chronological order.
        
        Parameters:
        - limit: Optional maximum number of commits to return
        
        Returns:
        DataFrame with commit history including version, timestamp, operation, etc.
        """
class DeltaTable {
  def history(): DataFrame
  def history(limit: Int): DataFrame
}

History DataFrame contains columns:

  • version: Table version number
  • timestamp: Commit timestamp
  • operation: Operation type (WRITE, UPDATE, DELETE, MERGE, etc.)
  • operationParameters: Operation-specific parameters
  • readVersion: Version read during operation
  • isBlindAppend: Whether operation was append-only
  • operationMetrics: Metrics like number of files, rows, etc.

Table Restoration

Restore tables to previous versions or timestamps.

class DeltaTable:
    def restoreToVersion(self, version: int) -> DataFrame:
        """
        Restore table to specific version number.
        
        Parameters:
        - version: Target version number to restore to
        
        Returns:
        DataFrame with restoration metrics
        """
    
    def restoreToTimestamp(self, timestamp: str) -> DataFrame:
        """
        Restore table to specific timestamp.
        
        Parameters:
        - timestamp: Target timestamp in format 'yyyy-MM-dd' or 'yyyy-MM-dd HH:mm:ss'
        
        Returns:
        DataFrame with restoration metrics
        """
class DeltaTable {
  def restoreToVersion(version: Long): DataFrame
  def restoreToTimestamp(timestamp: String): DataFrame
}

Time Travel Queries

Query historical versions of tables using SQL syntax.

# Time travel with DataFrame API
df = spark.read.format("delta").option("versionAsOf", 5).load("/path/to/table")
df = spark.read.format("delta").option("timestampAsOf", "2023-01-01").load("/path/to/table")

# Time travel with SQL
spark.sql("SELECT * FROM delta.`/path/to/table` VERSION AS OF 5")
spark.sql("SELECT * FROM delta.`/path/to/table` TIMESTAMP AS OF '2023-01-01'")

Clone Operations

Create table clones at specific versions or timestamps.

class DeltaTable:
    def clone(
        self,
        target: str,
        is_shallow: bool,
        replace: bool = False,
        properties: Optional[Dict[str, str]] = None
    ) -> DeltaTable:
        """
        Clone table to destination.
        
        Parameters:
        - target: Target path or table name for clone
        - is_shallow: True for shallow clone, False for deep clone
        - replace: Whether to replace existing target
        - properties: Optional table properties to override
        
        Returns:
        DeltaTable instance for cloned table
        """
    
    def cloneAtVersion(
        self,
        version: int,
        target: str,
        is_shallow: bool,
        replace: bool = False,
        properties: Optional[Dict[str, str]] = None
    ) -> DeltaTable:
        """
        Clone table at specific version.
        
        Parameters:
        - version: Source version to clone from
        - target: Target path or table name
        - is_shallow: Clone type (shallow vs deep)
        - replace: Whether to replace existing target
        - properties: Optional table properties to override
        
        Returns:
        DeltaTable instance for cloned table
        """
    
    def cloneAtTimestamp(
        self,
        timestamp: str,
        target: str,
        is_shallow: bool,
        replace: bool = False,
        properties: Optional[Dict[str, str]] = None
    ) -> DeltaTable:
        """
        Clone table at specific timestamp.
        
        Parameters:
        - timestamp: Source timestamp to clone from
        - target: Target path or table name
        - is_shallow: Clone type (shallow vs deep)
        - replace: Whether to replace existing target
        - properties: Optional table properties to override
        
        Returns:
        DeltaTable instance for cloned table
        """
class DeltaTable {
  def clone(target: String, isShallow: Boolean): DeltaTable
  def clone(target: String, isShallow: Boolean, replace: Boolean): DeltaTable
  def clone(
    target: String, 
    isShallow: Boolean, 
    replace: Boolean, 
    properties: Map[String, String]
  ): DeltaTable
  
  def cloneAtVersion(version: Long, target: String, isShallow: Boolean): DeltaTable
  def cloneAtVersion(
    version: Long,
    target: String, 
    isShallow: Boolean, 
    replace: Boolean
  ): DeltaTable
  def cloneAtVersion(
    version: Long,
    target: String,
    isShallow: Boolean, 
    replace: Boolean,
    properties: Map[String, String]
  ): DeltaTable
  
  def cloneAtTimestamp(timestamp: String, target: String, isShallow: Boolean): DeltaTable
  def cloneAtTimestamp(
    timestamp: String,
    target: String,
    isShallow: Boolean,
    replace: Boolean
  ): DeltaTable
  def cloneAtTimestamp(
    timestamp: String,
    target: String,
    isShallow: Boolean,
    replace: Boolean,
    properties: Map[String, String]
  ): DeltaTable
}

Usage Examples

Exploring Table History

# Get full history
history_df = delta_table.history()
history_df.select("version", "timestamp", "operation", "operationParameters").show()

# Get last 10 commits
recent_history = delta_table.history(10)
recent_history.show()

# Analyze specific operations
history_df.filter(col("operation") == "MERGE").show()

Time Travel Queries

# Query table as it was 5 versions ago
old_df = spark.read.format("delta").option("versionAsOf", 5).load("/path/to/table")

# Query table state from yesterday
yesterday_df = spark.read.format("delta").option("timestampAsOf", "2023-12-01").load("/path/to/table")

# Compare current vs historical data
current_count = delta_table.toDF().count()
historical_count = spark.read.format("delta").option("versionAsOf", 0).load("/path/to/table").count()
print(f"Rows added: {current_count - historical_count}")

Table Restoration

# Restore to specific version (e.g., before bad data was written)
restore_metrics = delta_table.restoreToVersion(10)
restore_metrics.show()

# Restore to timestamp (e.g., state from this morning)
restore_metrics = delta_table.restoreToTimestamp("2023-12-01 09:00:00")
restore_metrics.show()

Table Cloning

# Create shallow clone for testing
test_table = delta_table.clone(
    target="/path/to/test/table",
    is_shallow=True,
    replace=True
)

# Clone specific version for analysis
analysis_table = delta_table.cloneAtVersion(
    version=5,
    target="analysis_db.temp_table",
    is_shallow=False,
    properties={"owner": "data_team"}
)

# Clone yesterday's state for comparison
comparison_table = delta_table.cloneAtTimestamp(
    timestamp="2023-12-01",
    target="/path/to/comparison",
    is_shallow=True
)

Data Lineage Analysis

# Track changes over time
history = delta_table.history()

# Find when specific column was added
schema_changes = history.filter(
    col("operationParameters.newSchema").isNotNull()
).select("version", "timestamp", "operationParameters.newSchema")

# Analyze write patterns
write_operations = history.filter(col("operation").isin(["WRITE", "APPEND"]))
write_operations.select(
    "version", 
    "timestamp",
    col("operationMetrics.numFiles").alias("files_written"),
    col("operationMetrics.numOutputRows").alias("rows_written")
).show()

Clone Types

Shallow Clone

  • Copies only metadata and references to data files
  • Fast and storage-efficient
  • Changes to source files affect the clone
  • Ideal for testing and experimentation

Deep Clone

  • Copies both metadata and data files
  • Independent of source table
  • Requires more storage and time
  • Ideal for production backups and branching

Time Travel Limitations

  • History retention controlled by delta.logRetentionDuration (default: 30 days)
  • Data files retained based on delta.deletedFileRetentionDuration (default: 7 days)
  • Cannot time travel beyond available transaction logs
  • Vacuum operations may remove files needed for time travel
  • Performance impact for very old versions due to metadata overhead

Install with Tessl CLI

npx tessl i tessl/pypi-delta-spark

docs

configuration.md

index.md

merge-operations.md

optimization.md

table-management.md

table-operations.md

time-travel.md

tile.json