Native Delta Lake Python binding based on delta-rs with Pandas integration
—
Schema definition, evolution, and type system for Delta Lake tables including field definitions, data types, and schema operations for maintaining table structure over time.
class Schema:
def __init__(self, fields: list[Field]): ...
@property
def fields(self) -> list[Field]: ...
def to_pyarrow(self) -> pyarrow.Schema: ...
def to_json(self) -> str: ...
@classmethod
def from_pyarrow(cls, schema: pyarrow.Schema) -> Schema: ...
@classmethod
def from_json(cls, json_str: str) -> Schema: ...Main schema class for defining table structure.
class Field:
def __init__(
self,
name: str,
data_type: DataType,
nullable: bool = True,
metadata: dict[str, Any] | None = None
): ...
@property
def name(self) -> str: ...
@property
def data_type(self) -> DataType: ...
@property
def nullable(self) -> bool: ...
@property
def metadata(self) -> dict[str, Any]: ...
def to_json(self) -> str: ...
@classmethod
def from_json(cls, json_str: str) -> Field: ...Individual field definition within a schema.
# Union type for all data types
DataType = Union[PrimitiveType, ArrayType, MapType, StructType]
class PrimitiveType:
def __init__(self, data_type: str): ...
@property
def data_type(self) -> str: ...
class ArrayType:
def __init__(self, element_type: DataType, contains_null: bool = True): ...
@property
def element_type(self) -> DataType: ...
@property
def contains_null(self) -> bool: ...
class MapType:
def __init__(
self,
key_type: DataType,
value_type: DataType,
value_contains_null: bool = True
): ...
@property
def key_type(self) -> DataType: ...
@property
def value_type(self) -> DataType: ...
@property
def value_contains_null(self) -> bool: ...
class StructType:
def __init__(self, fields: list[Field]): ...
@property
def fields(self) -> list[Field]: ...Type system supporting primitive types, collections, and nested structures.
from deltalake import Schema, Field
from deltalake.schema import PrimitiveType, ArrayType, MapType, StructType
# Simple schema with primitive types
schema = Schema([
Field("id", PrimitiveType("integer"), nullable=False),
Field("name", PrimitiveType("string"), nullable=True),
Field("age", PrimitiveType("integer"), nullable=True),
Field("salary", PrimitiveType("double"), nullable=True),
Field("is_active", PrimitiveType("boolean"), nullable=False),
Field("created_at", PrimitiveType("timestamp"), nullable=False)
])
# Print schema information
for field in schema.fields:
print(f"{field.name}: {field.data_type} (nullable: {field.nullable})")# Array type
tags_field = Field(
"tags",
ArrayType(PrimitiveType("string"), contains_null=False),
nullable=True
)
# Map type for key-value pairs
metadata_field = Field(
"metadata",
MapType(
key_type=PrimitiveType("string"),
value_type=PrimitiveType("string"),
value_contains_null=True
),
nullable=True
)
# Nested struct type
address_struct = StructType([
Field("street", PrimitiveType("string")),
Field("city", PrimitiveType("string")),
Field("zipcode", PrimitiveType("string")),
Field("country", PrimitiveType("string"))
])
address_field = Field("address", address_struct, nullable=True)
# Combined complex schema
complex_schema = Schema([
Field("id", PrimitiveType("integer"), nullable=False),
Field("name", PrimitiveType("string"), nullable=False),
tags_field,
metadata_field,
address_field
])import pyarrow as pa
# Create PyArrow schema
arrow_schema = pa.schema([
pa.field("id", pa.int64(), nullable=False),
pa.field("name", pa.string()),
pa.field("scores", pa.list_(pa.float64())),
pa.field("created_at", pa.timestamp('us'))
])
# Convert to Delta schema
delta_schema = Schema.from_pyarrow(arrow_schema)
# Convert back to PyArrow
converted_arrow = delta_schema.to_pyarrow()# Convert schema to JSON
schema_json = schema.to_json()
print("Schema as JSON:")
print(schema_json)
# Recreate schema from JSON
recreated_schema = Schema.from_json(schema_json)
# Verify fields match
assert len(schema.fields) == len(recreated_schema.fields)
for original, recreated in zip(schema.fields, recreated_schema.fields):
assert original.name == recreated.name
assert original.nullable == recreated.nullable# Field with metadata
documented_field = Field(
"user_id",
PrimitiveType("integer"),
nullable=False,
metadata={
"description": "Unique identifier for user",
"source_system": "user_management",
"pii": False,
"format": "int64"
}
)
# Access metadata
print(f"Field metadata: {documented_field.metadata}")
print(f"Description: {documented_field.metadata.get('description')}")
print(f"Contains PII: {documented_field.metadata.get('pii')}")from deltalake import DeltaTable, write_deltalake
# Original schema
original_schema = Schema([
Field("id", PrimitiveType("integer"), nullable=False),
Field("name", PrimitiveType("string"), nullable=True),
Field("age", PrimitiveType("integer"), nullable=True)
])
# Create table with original schema
dt = DeltaTable.create("path/to/evolving-table", schema=original_schema)
# Add data with additional column (schema evolution)
import pandas as pd
evolved_data = pd.DataFrame({
'id': [4, 5, 6],
'name': ['New Person 1', 'New Person 2', 'New Person 3'],
'age': [25, 30, 35],
'department': ['Engineering', 'Sales', 'Marketing'] # New column
})
# Write with schema merge mode
write_deltalake(
"path/to/evolving-table",
evolved_data,
mode="append",
schema_mode="merge" # Allow schema evolution
)
# Check evolved schema
dt = DeltaTable("path/to/evolving-table")
evolved_schema = dt.schema()
print("Evolved schema:")
for field in evolved_schema.fields:
print(f" {field.name}: {field.data_type}")Available primitive types:
"boolean" - Boolean values"byte" - 8-bit signed integer"short" - 16-bit signed integer"integer" - 32-bit signed integer"long" - 64-bit signed integer"float" - 32-bit floating point"double" - 64-bit floating point"decimal" - Arbitrary precision decimal"string" - UTF-8 string"binary" - Binary data"date" - Date (year, month, day)"timestamp" - Timestamp with microsecond precision# Examples of all primitive types
all_types_schema = Schema([
Field("bool_col", PrimitiveType("boolean")),
Field("byte_col", PrimitiveType("byte")),
Field("short_col", PrimitiveType("short")),
Field("int_col", PrimitiveType("integer")),
Field("long_col", PrimitiveType("long")),
Field("float_col", PrimitiveType("float")),
Field("double_col", PrimitiveType("double")),
Field("decimal_col", PrimitiveType("decimal")),
Field("string_col", PrimitiveType("string")),
Field("binary_col", PrimitiveType("binary")),
Field("date_col", PrimitiveType("date")),
Field("timestamp_col", PrimitiveType("timestamp"))
])# Schema validation when creating tables
try:
# This will validate the schema structure
dt = DeltaTable.create(
"path/to/validated-table",
schema=complex_schema,
mode="error"
)
print("Schema validation passed")
except Exception as e:
print(f"Schema validation failed: {e}")
# Check schema compatibility
def schemas_compatible(schema1: Schema, schema2: Schema) -> bool:
"""Check if two schemas are compatible for merging"""
schema1_fields = {f.name: f for f in schema1.fields}
schema2_fields = {f.name: f for f in schema2.fields}
# Check common fields have compatible types
for name in schema1_fields.keys() & schema2_fields.keys():
field1 = schema1_fields[name]
field2 = schema2_fields[name]
# Simple type comparison (real implementation would be more complex)
if field1.data_type != field2.data_type:
return False
# Nullable compatibility: can't make nullable field non-nullable
if field1.nullable and not field2.nullable:
return False
return True
# Test compatibility
compatible = schemas_compatible(original_schema, evolved_schema)
print(f"Schemas are compatible: {compatible}")The TableAlterer class provides advanced schema and table modification capabilities accessed through DeltaTable.alter.
class TableAlterer:
def add_feature(
self,
feature: TableFeatures | list[TableFeatures],
allow_protocol_versions_increase: bool = False,
commit_properties: CommitProperties | None = None,
post_commithook_properties: PostCommitHookProperties | None = None,
) -> None: ...
def add_columns(
self,
fields: Field | list[Field],
commit_properties: CommitProperties | None = None,
post_commithook_properties: PostCommitHookProperties | None = None,
) -> None: ...
def add_constraint(
self,
constraints: dict[str, str],
post_commithook_properties: PostCommitHookProperties | None = None,
commit_properties: CommitProperties | None = None,
) -> None: ...
def drop_constraint(
self,
name: str,
raise_if_not_exists: bool = True,
post_commithook_properties: PostCommitHookProperties | None = None,
commit_properties: CommitProperties | None = None,
) -> None: ...
def set_table_properties(
self,
properties: dict[str, str],
raise_if_not_exists: bool = True,
commit_properties: CommitProperties | None = None,
) -> None: ...
def set_table_name(
self,
name: str,
commit_properties: CommitProperties | None = None,
) -> None: ...
def set_table_description(
self,
description: str,
commit_properties: CommitProperties | None = None,
) -> None: ...
def set_column_metadata(
self,
column: str,
metadata: dict[str, str],
commit_properties: CommitProperties | None = None,
post_commithook_properties: PostCommitHookProperties | None = None,
) -> None: ...Provides comprehensive table and schema alteration capabilities including feature management, column operations, constraints, and metadata modifications.
Install with Tessl CLI
npx tessl i tessl/pypi-deltalake