CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-deltalake

Native Delta Lake Python binding based on delta-rs with Pandas integration

Pending
Overview
Eval results
Files

schema-management.mddocs/

Schema Management

Schema definition, evolution, and type system for Delta Lake tables including field definitions, data types, and schema operations for maintaining table structure over time.

Capabilities

Schema Definition

class Schema:
    def __init__(self, fields: list[Field]): ...
    
    @property
    def fields(self) -> list[Field]: ...
    
    def to_pyarrow(self) -> pyarrow.Schema: ...
    
    def to_json(self) -> str: ...
    
    @classmethod
    def from_pyarrow(cls, schema: pyarrow.Schema) -> Schema: ...
    
    @classmethod
    def from_json(cls, json_str: str) -> Schema: ...

Main schema class for defining table structure.

Field Definition

class Field:
    def __init__(
        self,
        name: str,
        data_type: DataType,
        nullable: bool = True,
        metadata: dict[str, Any] | None = None
    ): ...
    
    @property
    def name(self) -> str: ...
    
    @property
    def data_type(self) -> DataType: ...
    
    @property
    def nullable(self) -> bool: ...
    
    @property
    def metadata(self) -> dict[str, Any]: ...
    
    def to_json(self) -> str: ...
    
    @classmethod
    def from_json(cls, json_str: str) -> Field: ...

Individual field definition within a schema.

Data Types

# Union type for all data types
DataType = Union[PrimitiveType, ArrayType, MapType, StructType]

class PrimitiveType:
    def __init__(self, data_type: str): ...
    
    @property
    def data_type(self) -> str: ...

class ArrayType:
    def __init__(self, element_type: DataType, contains_null: bool = True): ...
    
    @property
    def element_type(self) -> DataType: ...
    
    @property
    def contains_null(self) -> bool: ...

class MapType:
    def __init__(
        self,
        key_type: DataType,
        value_type: DataType,
        value_contains_null: bool = True
    ): ...
    
    @property
    def key_type(self) -> DataType: ...
    
    @property
    def value_type(self) -> DataType: ...
    
    @property
    def value_contains_null(self) -> bool: ...

class StructType:
    def __init__(self, fields: list[Field]): ...
    
    @property
    def fields(self) -> list[Field]: ...

Type system supporting primitive types, collections, and nested structures.

Usage Examples

Basic Schema Creation

from deltalake import Schema, Field
from deltalake.schema import PrimitiveType, ArrayType, MapType, StructType

# Simple schema with primitive types
schema = Schema([
    Field("id", PrimitiveType("integer"), nullable=False),
    Field("name", PrimitiveType("string"), nullable=True),
    Field("age", PrimitiveType("integer"), nullable=True),
    Field("salary", PrimitiveType("double"), nullable=True),
    Field("is_active", PrimitiveType("boolean"), nullable=False),
    Field("created_at", PrimitiveType("timestamp"), nullable=False)
])

# Print schema information
for field in schema.fields:
    print(f"{field.name}: {field.data_type} (nullable: {field.nullable})")

Complex Data Types

# Array type
tags_field = Field(
    "tags",
    ArrayType(PrimitiveType("string"), contains_null=False),
    nullable=True
)

# Map type for key-value pairs
metadata_field = Field(
    "metadata",
    MapType(
        key_type=PrimitiveType("string"),
        value_type=PrimitiveType("string"),
        value_contains_null=True
    ),
    nullable=True
)

# Nested struct type
address_struct = StructType([
    Field("street", PrimitiveType("string")),
    Field("city", PrimitiveType("string")),
    Field("zipcode", PrimitiveType("string")),
    Field("country", PrimitiveType("string"))
])

address_field = Field("address", address_struct, nullable=True)

# Combined complex schema
complex_schema = Schema([
    Field("id", PrimitiveType("integer"), nullable=False),
    Field("name", PrimitiveType("string"), nullable=False),
    tags_field,
    metadata_field,
    address_field
])

Schema from PyArrow

import pyarrow as pa

# Create PyArrow schema
arrow_schema = pa.schema([
    pa.field("id", pa.int64(), nullable=False),
    pa.field("name", pa.string()),
    pa.field("scores", pa.list_(pa.float64())),
    pa.field("created_at", pa.timestamp('us'))
])

# Convert to Delta schema
delta_schema = Schema.from_pyarrow(arrow_schema)

# Convert back to PyArrow
converted_arrow = delta_schema.to_pyarrow()

Schema Serialization

# Convert schema to JSON
schema_json = schema.to_json()
print("Schema as JSON:")
print(schema_json)

# Recreate schema from JSON
recreated_schema = Schema.from_json(schema_json)

# Verify fields match
assert len(schema.fields) == len(recreated_schema.fields)
for original, recreated in zip(schema.fields, recreated_schema.fields):
    assert original.name == recreated.name
    assert original.nullable == recreated.nullable

Working with Field Metadata

# Field with metadata
documented_field = Field(
    "user_id",
    PrimitiveType("integer"),
    nullable=False,
    metadata={
        "description": "Unique identifier for user",
        "source_system": "user_management",
        "pii": False,
        "format": "int64"
    }
)

# Access metadata
print(f"Field metadata: {documented_field.metadata}")
print(f"Description: {documented_field.metadata.get('description')}")
print(f"Contains PII: {documented_field.metadata.get('pii')}")

Schema Evolution Examples

from deltalake import DeltaTable, write_deltalake

# Original schema
original_schema = Schema([
    Field("id", PrimitiveType("integer"), nullable=False),
    Field("name", PrimitiveType("string"), nullable=True),
    Field("age", PrimitiveType("integer"), nullable=True)
])

# Create table with original schema
dt = DeltaTable.create("path/to/evolving-table", schema=original_schema)

# Add data with additional column (schema evolution)
import pandas as pd

evolved_data = pd.DataFrame({
    'id': [4, 5, 6],
    'name': ['New Person 1', 'New Person 2', 'New Person 3'],
    'age': [25, 30, 35],
    'department': ['Engineering', 'Sales', 'Marketing']  # New column
})

# Write with schema merge mode
write_deltalake(
    "path/to/evolving-table",
    evolved_data,
    mode="append",
    schema_mode="merge"  # Allow schema evolution
)

# Check evolved schema
dt = DeltaTable("path/to/evolving-table")
evolved_schema = dt.schema()
print("Evolved schema:")
for field in evolved_schema.fields:
    print(f"  {field.name}: {field.data_type}")

Primitive Data Types

Available primitive types:

  • "boolean" - Boolean values
  • "byte" - 8-bit signed integer
  • "short" - 16-bit signed integer
  • "integer" - 32-bit signed integer
  • "long" - 64-bit signed integer
  • "float" - 32-bit floating point
  • "double" - 64-bit floating point
  • "decimal" - Arbitrary precision decimal
  • "string" - UTF-8 string
  • "binary" - Binary data
  • "date" - Date (year, month, day)
  • "timestamp" - Timestamp with microsecond precision
# Examples of all primitive types
all_types_schema = Schema([
    Field("bool_col", PrimitiveType("boolean")),
    Field("byte_col", PrimitiveType("byte")),
    Field("short_col", PrimitiveType("short")),
    Field("int_col", PrimitiveType("integer")),
    Field("long_col", PrimitiveType("long")),
    Field("float_col", PrimitiveType("float")),
    Field("double_col", PrimitiveType("double")),
    Field("decimal_col", PrimitiveType("decimal")),
    Field("string_col", PrimitiveType("string")),
    Field("binary_col", PrimitiveType("binary")),
    Field("date_col", PrimitiveType("date")),
    Field("timestamp_col", PrimitiveType("timestamp"))
])

Validation and Constraints

# Schema validation when creating tables
try:
    # This will validate the schema structure
    dt = DeltaTable.create(
        "path/to/validated-table",
        schema=complex_schema,
        mode="error"
    )
    print("Schema validation passed")
except Exception as e:
    print(f"Schema validation failed: {e}")

# Check schema compatibility
def schemas_compatible(schema1: Schema, schema2: Schema) -> bool:
    """Check if two schemas are compatible for merging"""
    schema1_fields = {f.name: f for f in schema1.fields}
    schema2_fields = {f.name: f for f in schema2.fields}
    
    # Check common fields have compatible types
    for name in schema1_fields.keys() & schema2_fields.keys():
        field1 = schema1_fields[name]
        field2 = schema2_fields[name]
        
        # Simple type comparison (real implementation would be more complex)
        if field1.data_type != field2.data_type:
            return False
            
        # Nullable compatibility: can't make nullable field non-nullable
        if field1.nullable and not field2.nullable:
            return False
    
    return True

# Test compatibility
compatible = schemas_compatible(original_schema, evolved_schema)
print(f"Schemas are compatible: {compatible}")

TableAlterer Class

The TableAlterer class provides advanced schema and table modification capabilities accessed through DeltaTable.alter.

class TableAlterer:
    def add_feature(
        self,
        feature: TableFeatures | list[TableFeatures],
        allow_protocol_versions_increase: bool = False,
        commit_properties: CommitProperties | None = None,
        post_commithook_properties: PostCommitHookProperties | None = None,
    ) -> None: ...
    
    def add_columns(
        self,
        fields: Field | list[Field],
        commit_properties: CommitProperties | None = None,
        post_commithook_properties: PostCommitHookProperties | None = None,
    ) -> None: ...
    
    def add_constraint(
        self,
        constraints: dict[str, str],
        post_commithook_properties: PostCommitHookProperties | None = None,
        commit_properties: CommitProperties | None = None,
    ) -> None: ...
    
    def drop_constraint(
        self,
        name: str,
        raise_if_not_exists: bool = True,
        post_commithook_properties: PostCommitHookProperties | None = None,
        commit_properties: CommitProperties | None = None,
    ) -> None: ...
    
    def set_table_properties(
        self,
        properties: dict[str, str],
        raise_if_not_exists: bool = True,
        commit_properties: CommitProperties | None = None,
    ) -> None: ...
    
    def set_table_name(
        self,
        name: str,
        commit_properties: CommitProperties | None = None,
    ) -> None: ...
    
    def set_table_description(
        self,
        description: str,
        commit_properties: CommitProperties | None = None,
    ) -> None: ...
    
    def set_column_metadata(
        self,
        column: str,
        metadata: dict[str, str],
        commit_properties: CommitProperties | None = None,
        post_commithook_properties: PostCommitHookProperties | None = None,
    ) -> None: ...

Provides comprehensive table and schema alteration capabilities including feature management, column operations, constraints, and metadata modifications.

Install with Tessl CLI

npx tessl i tessl/pypi-deltalake

docs

data-reading.md

index.md

query-operations.md

schema-management.md

table-maintenance.md

table-operations.md

transaction-management.md

writing-modification.md

tile.json