CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-deltalake

Native Delta Lake Python binding based on delta-rs with Pandas integration

Pending
Overview
Eval results
Files

transaction-management.mddocs/

Transaction Management

Transaction properties, commit configurations, and ACID transaction control for ensuring data consistency and managing concurrent access to Delta Lake tables.

Capabilities

Commit Properties

@dataclass
class CommitProperties:
    def __init__(
        self,
        max_retry_commit_attempts: int | None = None,
        app_metadata: dict[str, Any] | None = None,
        app_id: str | None = None
    ) -> None: ...
    
    @property
    def max_retry_commit_attempts(self) -> int | None: ...
    
    @property
    def app_metadata(self) -> dict[str, Any] | None: ...
    
    @property
    def app_id(self) -> str | None: ...

Configuration for commit behavior and retry logic.

Post-Commit Hook Properties

@dataclass
class PostCommitHookProperties:
    def __init__(
        self,
        create_checkpoint: bool = True,
        cleanup_expired_logs: bool | None = None,
    ) -> None: ...
    
    @property
    def create_checkpoint(self) -> bool: ...
    
    @property
    def cleanup_expired_logs(self) -> bool | None: ...

Configuration for operations that run after successful commits.

Transaction Class

class Transaction:
    def commit(
        self,
        actions: list[AddAction],
        commit_properties: CommitProperties | None = None,
        post_commit_hook_properties: PostCommitHookProperties | None = None
    ) -> int: ...

Low-level transaction interface for advanced operations.

Add Action

@dataclass
class AddAction:
    path: str
    size: int
    partition_values: Mapping[str, str | None]
    modification_time: int
    data_change: bool
    stats: str

Represents a file addition in the transaction log.

Transaction Helper Functions

def create_table_with_add_actions(
    table_uri: str,
    schema: Schema,
    add_actions: list[AddAction],
    partition_by: list[str] | None = None,
    name: str | None = None,
    description: str | None = None,
    configuration: dict[str, str | None] | None = None,
    storage_options: dict[str, str] | None = None
) -> DeltaTable: ...

Create a table directly from add actions (advanced use case).

Usage Examples

Basic Transaction Configuration

from deltalake import DeltaTable, write_deltalake
from deltalake.transaction import CommitProperties, PostCommitHookProperties

# Configure commit properties
commit_props = CommitProperties(
    max_retry_commit_attempts=5,
    app_metadata={"application": "data_pipeline", "version": "1.0.0"},
    app_id="my_application"
)

# Configure post-commit hooks
post_commit_props = PostCommitHookProperties(
    create_checkpoint=True,  # Create checkpoints automatically
    cleanup_expired_logs=True  # Clean up old log files
)

# Use in write operation
write_deltalake(
    "path/to/table",
    data,
    mode="append",
    commit_properties=commit_props,
    post_commithook_properties=post_commit_props
)

Transaction Properties in Updates

dt = DeltaTable("path/to/table")

# Update with transaction properties
result = dt.update(
    predicate="status = 'pending'",
    new_values={"status": "processed", "processed_at": "current_timestamp()"},
    commit_properties=CommitProperties(
        max_retry_commit_attempts=3,
        app_metadata={"operation": "batch_update", "batch_id": "12345"}
    ),
    post_commithook_properties=PostCommitHookProperties(
        create_checkpoint=False,  # Skip checkpoint for this update
        cleanup_expired_logs=False
    )
)

print(f"Updated {result['num_updated_rows']} rows")

Merge with Transaction Control

import pyarrow as pa

source_data = pa.table({
    'id': [1, 2, 3],
    'status': ['active', 'inactive', 'pending'],
    'last_modified': ['2023-01-01', '2023-01-02', '2023-01-03']
})

# Merge with custom transaction properties
merge_result = (
    dt.merge(
        source=source_data,
        predicate="target.id = source.id",
        commit_properties=CommitProperties(
            max_retry_commit_attempts=10,  # High retry for critical operation
            app_metadata={
                "operation": "daily_sync",
                "source": "external_system",
                "sync_timestamp": "2023-01-15T10:00:00Z"
            }
        ),
        post_commithook_properties=PostCommitHookProperties(
            create_checkpoint=True,
            cleanup_expired_logs=True
        )
    )
    .when_matched_update_all()
    .when_not_matched_insert_all()
    .execute()
)

Delete with Transaction Properties

# Delete with tracking metadata
delete_result = dt.delete(
    predicate="created_at < '2022-01-01'",
    commit_properties=CommitProperties(
        app_metadata={
            "operation": "data_retention_cleanup",
            "retention_policy": "delete_older_than_1_year",
            "executed_by": "automated_cleanup_job"
        }
    ),
    post_commithook_properties=PostCommitHookProperties(
        create_checkpoint=True,  # Create checkpoint after cleanup
        cleanup_expired_logs=True  # Clean logs after delete
    )
)

print(f"Deleted {delete_result['num_deleted_rows']} old records")

Advanced Transaction Handling

from deltalake.transaction import Transaction, AddAction

# Low-level transaction for advanced scenarios
def custom_transaction_example():
    dt = DeltaTable("path/to/table")
    
    # Create add actions manually (advanced use case)
    add_actions = [
        AddAction(
            path="data/part-00001.parquet",
            size=1024000,
            partition_values={"year": "2023", "month": "01"},
            modification_time=1672531200000,  # Unix timestamp in milliseconds
            data_change=True,
            stats='{"numRecords": 1000}'
        )
    ]
    
    # Get transaction and commit
    transaction = dt._table.create_transaction()  # Internal API
    
    commit_props = CommitProperties(
        max_retry_commit_attempts=3,
        app_metadata={"custom_operation": "manual_file_addition"}
    )
    
    # Commit the transaction
    version = transaction.commit(
        add_actions,
        commit_properties=commit_props
    )
    
    print(f"Committed transaction at version {version}")

Retry and Conflict Handling

import time
from deltalake.exceptions import CommitFailedError

def robust_update_with_retry():
    dt = DeltaTable("path/to/table")
    
    max_retries = 5
    base_delay = 1.0
    
    for attempt in range(max_retries):
        try:
            result = dt.update(
                predicate="status = 'processing'",
                new_values={"status": "completed"},
                commit_properties=CommitProperties(
                    max_retry_commit_attempts=1,  # Let us handle retries
                    app_metadata={
                        "attempt": attempt + 1,
                        "operation": "status_update"
                    }
                )
            )
            
            print(f"Update succeeded on attempt {attempt + 1}")
            return result
            
        except CommitFailedError as e:
            if attempt == max_retries - 1:
                print(f"Update failed after {max_retries} attempts")
                raise
            
            # Exponential backoff
            delay = base_delay * (2 ** attempt)
            print(f"Attempt {attempt + 1} failed, retrying in {delay}s...")
            time.sleep(delay)
            
            # Reload table to get latest state
            dt = DeltaTable(dt.table_uri)

# Use the robust update function
try:
    result = robust_update_with_retry()
    print(f"Successfully updated {result['num_updated_rows']} rows")
except Exception as e:
    print(f"All retry attempts failed: {e}")

Monitoring Transaction Metadata

def analyze_transaction_history():
    dt = DeltaTable("path/to/table")
    
    # Get commit history
    history = dt.history(limit=10)
    
    print("Recent transaction history:")
    for commit in history:
        version = commit.get("version")
        operation = commit.get("operation", "unknown")
        timestamp = commit.get("timestamp")
        
        # Extract app metadata if present
        operation_parameters = commit.get("operationParameters", {})
        app_metadata = operation_parameters.get("app_metadata")
        
        print(f"Version {version}: {operation} at {timestamp}")
        
        if app_metadata:
            print(f"  App metadata: {app_metadata}")
        
        # Show operation metrics
        if "operationMetrics" in commit:
            metrics = commit["operationMetrics"]
            for key, value in metrics.items():
                print(f"  {key}: {value}")
        
        print()

# Run analysis
analyze_transaction_history()

Checkpoint and Log Management

# Manual checkpoint creation
dt.create_checkpoint()
print("Checkpoint created successfully")

# Cleanup metadata (removes old log files)
dt.cleanup_metadata()
print("Old metadata cleaned up")

# Configure automatic cleanup
post_commit_props = PostCommitHookProperties(
    create_checkpoint=True,
    cleanup_expired_logs=True
)

# All subsequent operations will use these settings
write_deltalake(
    dt,
    new_data,
    mode="append",
    post_commithook_properties=post_commit_props
)

Install with Tessl CLI

npx tessl i tessl/pypi-deltalake

docs

data-reading.md

index.md

query-operations.md

schema-management.md

table-maintenance.md

table-operations.md

transaction-management.md

writing-modification.md

tile.json