Native Delta Lake Python binding based on delta-rs with Pandas integration
—
Core table management including creation, reading, and metadata access. The DeltaTable class provides the primary interface for interacting with Delta Lake tables across various storage backends.
class DeltaTable:
def __init__(
self,
table_uri: str | Path | os.PathLike[str],
version: int | None = None,
storage_options: dict[str, str] | None = None,
without_files: bool = False,
log_buffer_size: int | None = None,
) -> None: ...Parameters:
table_uri: Path to the Delta table location (local, S3, Azure, GCS)version: Specific version to load (None for latest)storage_options: Backend-specific configuration (credentials, endpoints)without_files: Load metadata only, skip file tracking for memory efficiencylog_buffer_size: Number of files to buffer when reading transaction log@classmethod
def create(
cls,
table_uri: str | Path,
schema: Schema | ArrowSchemaExportable,
mode: Literal["error", "append", "overwrite", "ignore"] = "error",
partition_by: list[str] | str | None = None,
name: str | None = None,
description: str | None = None,
configuration: Mapping[str, str | None] | None = None,
storage_options: dict[str, str] | None = None
) -> DeltaTable: ...Creates a new Delta table with the specified schema and configuration.
@staticmethod
def is_deltatable(
table_uri: str,
storage_options: dict[str, str] | None = None
) -> bool: ...Check if a Delta table exists at the specified location.
@property
def version(self) -> int: ...
@property
def table_uri(self) -> str: ...
@property
def table_config(self) -> DeltaTableConfig: ...
def schema(self) -> Schema: ...
def metadata(self) -> Metadata: ...
def protocol(self) -> ProtocolVersions: ...
def files(self, partition_filters: list[tuple[str, str, str | list[str]]] | None = None) -> list[str]: ...
def partitions(self, partition_filters: list[tuple[str, str, Any]] | None = None) -> list[dict[str, str]]: ...
def history(self, limit: int | None = None) -> list[dict[str, Any]]: ...def load_as_version(self, version: int | str | datetime) -> None: ...
def get_latest_version(self) -> int: ...
def transaction_version(self, app_id: str) -> int | None: ...
def update_incremental(self) -> None: ...Load and navigate between different versions of the table for time travel queries.
@dataclass
class Metadata:
def __init__(self, table: RawDeltaTable) -> None: ...
@property
def id(self) -> int: ...
@property
def name(self) -> str: ...
@property
def description(self) -> str: ...
@property
def partition_columns(self) -> list[str]: ...
@property
def created_time(self) -> int: ...
@property
def configuration(self) -> dict[str, str]: ...
class ProtocolVersions(NamedTuple):
min_reader_version: int
min_writer_version: int
writer_features: list[str] | None
reader_features: list[str] | None
class DeltaTableConfig(NamedTuple):
without_files: bool
log_buffer_size: intfrom deltalake import DeltaTable, Schema, Field
from deltalake.schema import PrimitiveType
# Create a new table
schema = Schema([
Field("id", PrimitiveType("integer"), nullable=False),
Field("name", PrimitiveType("string"), nullable=True),
Field("age", PrimitiveType("integer"), nullable=True)
])
# Create table
dt = DeltaTable.create(
"path/to/table",
schema=schema,
mode="error",
partition_by=["age"]
)
# Load existing table
dt = DeltaTable("path/to/existing-table")
# Check table properties
print(f"Table version: {dt.version}")
print(f"Table URI: {dt.table_uri}")
print(f"Schema: {dt.schema()}")
print(f"Files: {len(dt.files())}")
# Get metadata
metadata = dt.metadata()
print(f"Table ID: {metadata.id}")
print(f"Partition columns: {metadata.partition_columns}")# Get current version
current_version = dt.version
# Load specific version for time travel
dt.load_as_version(0) # First version
historical_data = dt.to_pandas()
# Return to latest
dt.load_as_version(current_version)
# View history
history = dt.history(limit=10)
for commit in history:
print(f"Version {commit['version']}: {commit.get('operation', 'unknown')}")# S3 configuration
s3_options = {
"AWS_REGION": "us-west-2",
"AWS_ACCESS_KEY_ID": "your-key",
"AWS_SECRET_ACCESS_KEY": "your-secret"
}
dt = DeltaTable("s3://bucket/path/to/table", storage_options=s3_options)
# Azure configuration
azure_options = {
"AZURE_STORAGE_ACCOUNT_NAME": "account",
"AZURE_STORAGE_ACCESS_KEY": "key"
}
dt = DeltaTable("abfss://container@account.dfs.core.windows.net/path",
storage_options=azure_options)class TableFeatures(Enum):
ColumnMapping = "ColumnMapping"
DeletionVectors = "DeletionVectors"
TimestampWithoutTimezone = "TimestampWithoutTimezone"
V2Checkpoint = "V2Checkpoint"
AppendOnly = "AppendOnly"
Invariants = "Invariants"
CheckConstraints = "CheckConstraints"
ChangeDataFeed = "ChangeDataFeed"
GeneratedColumns = "GeneratedColumns"
IdentityColumns = "IdentityColumns"
RowTracking = "RowTracking"
DomainMetadata = "DomainMetadata"
IcebergCompatV1 = "IcebergCompatV1"Enumeration of Delta Lake table features that can be enabled on tables to extend the base Delta protocol.
class Transaction:
def __init__(
self,
app_id: str,
version: int,
last_updated: int | None = None
) -> None: ...
app_id: str
version: int
last_updated: int | NoneRepresents an application transaction for Delta Lake table operations, used to coordinate concurrent operations and ensure transaction isolation.
Install with Tessl CLI
npx tessl i tessl/pypi-deltalake