CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-daft

Distributed Dataframes for Multimodal Data with high-performance query engine and support for complex nested data structures, AI/ML operations, and seamless cloud storage integration.

Pending
Overview
Eval results
Files

catalog.mddocs/

Data Catalog Integration

Integration with data catalogs for metadata management, table discovery, and governance. Supports Unity Catalog, Apache Iceberg, AWS Glue, S3 Tables, and custom catalog implementations with comprehensive namespace and table management.

Capabilities

Catalog Interface

Abstract catalog interface for connecting to various catalog systems.

class Catalog:
    """Interface for data catalog implementations."""
    
    @property
    def name(self) -> str:
        """Returns the catalog's name."""
    
    def list_namespaces(self, pattern: Optional[str] = None) -> List[Identifier]:
        """
        List namespaces in the catalog.
        
        Parameters:
        - pattern: Optional pattern to filter namespaces
        
        Returns:
        List[Identifier]: List of namespace identifiers
        """
    
    def list_tables(self, pattern: Optional[str] = None) -> List[Identifier]:
        """
        List tables in the catalog.
        
        Parameters:
        - pattern: Optional pattern to filter tables
        
        Returns:
        List[Identifier]: List of table identifiers
        """
    
    def get_table(self, identifier: Union[Identifier, str]) -> Table:
        """
        Get table by identifier.
        
        Parameters:
        - identifier: Table identifier or name
        
        Returns:
        Table: Table instance
        """
    
    def create_table(
        self, 
        identifier: Union[Identifier, str], 
        source: Union[Schema, DataFrame],
        properties: Optional[Dict[str, Any]] = None
    ) -> Table:
        """
        Create new table in catalog.
        
        Parameters:
        - identifier: Table identifier or name
        - source: Schema or DataFrame to create table from
        - properties: Additional table properties
        
        Returns:
        Table: Created table instance
        """
    
    def drop_table(self, identifier: Union[Identifier, str]) -> None:
        """
        Drop table from catalog.
        
        Parameters:
        - identifier: Table identifier or name
        """
    
    def has_table(self, identifier: Union[Identifier, str]) -> bool:
        """
        Check if table exists in catalog.
        
        Parameters:
        - identifier: Table identifier or name
        
        Returns:
        bool: True if table exists
        """

Catalog Factory Methods

Create catalog instances from various systems.

class Catalog:
    @staticmethod
    def from_pydict(tables: Dict[Union[Identifier, str], Any], name: str = "default") -> Catalog:
        """
        Create in-memory catalog from dictionary.
        
        Parameters:
        - tables: Dictionary of table-like objects
        - name: Catalog name
        
        Returns:
        Catalog: In-memory catalog instance
        """
    
    @staticmethod
    def from_iceberg(catalog: Any) -> Catalog:
        """
        Create catalog from PyIceberg catalog.
        
        Parameters:
        - catalog: PyIceberg catalog instance
        
        Returns:
        Catalog: Daft catalog wrapping Iceberg catalog
        """
    
    @staticmethod
    def from_unity(catalog: Any) -> Catalog:
        """
        Create catalog from Unity Catalog client.
        
        Parameters:
        - catalog: Unity Catalog client instance
        
        Returns:
        Catalog: Daft catalog wrapping Unity catalog
        """
    
    @staticmethod
    def from_glue(
        name: str,
        client: Optional[Any] = None,
        session: Optional[Any] = None
    ) -> Catalog:
        """
        Create catalog from AWS Glue.
        
        Parameters:
        - name: Glue database name
        - client: Optional boto3 Glue client
        - session: Optional boto3 session
        
        Returns:
        Catalog: Daft catalog wrapping Glue catalog
        """
    
    @staticmethod
    def from_s3tables(
        table_bucket_arn: str,
        client: Optional[Any] = None,
        session: Optional[Any] = None
    ) -> Catalog:
        """
        Create catalog from S3 Tables.
        
        Parameters:
        - table_bucket_arn: S3 Tables bucket ARN
        - client: Optional boto3 client
        - session: Optional boto3 session
        
        Returns:
        Catalog: Daft catalog for S3 Tables
        """

Table Interface

Abstract table interface for catalog tables.

class Table:
    """Interface for catalog table implementations."""
    
    @property
    def name(self) -> str:
        """Returns the table's name."""
    
    def schema(self) -> Schema:
        """
        Returns the table's schema.
        
        Returns:
        Schema: Table schema definition
        """
    
    def read(self, **options: Any) -> DataFrame:
        """
        Read table as DataFrame.
        
        Parameters:
        - options: Additional read options
        
        Returns:
        DataFrame: Table data as DataFrame
        """
    
    def write(
        self, 
        df: DataFrame, 
        mode: Literal["append", "overwrite"] = "append",
        **options: Any
    ) -> None:
        """
        Write DataFrame to table.
        
        Parameters:
        - df: DataFrame to write
        - mode: Write mode ('append' or 'overwrite')
        - options: Additional write options
        """
    
    def append(self, df: DataFrame, **options: Any) -> None:
        """
        Append DataFrame to table.
        
        Parameters:
        - df: DataFrame to append
        - options: Additional options
        """
    
    def overwrite(self, df: DataFrame, **options: Any) -> None:
        """
        Overwrite table with DataFrame.
        
        Parameters:
        - df: DataFrame to overwrite with
        - options: Additional options
        """

Identifier System

Hierarchical identifiers for catalog objects.

class Identifier:
    """Reference to catalog object (namespace.table or catalog.namespace.table)."""
    
    def __init__(self, *parts: str):
        """
        Create identifier from parts.
        
        Parameters:
        - parts: Identifier components (namespace, table name, etc.)
        """
    
    @staticmethod
    def from_str(input: str) -> Identifier:
        """
        Parse identifier from dot-delimited string.
        
        Parameters:
        - input: Dot-delimited identifier string
        
        Returns:
        Identifier: Parsed identifier
        """
    
    @staticmethod
    def from_sql(input: str, normalize: bool = False) -> Identifier:
        """
        Parse identifier from SQL string.
        
        Parameters:
        - input: SQL identifier string
        - normalize: Whether to normalize case
        
        Returns:
        Identifier: Parsed SQL identifier
        """
    
    def drop(self, n: int = 1) -> Identifier:
        """
        Drop first n parts from identifier.
        
        Parameters:
        - n: Number of parts to drop
        
        Returns:
        Identifier: New identifier with parts dropped
        """

Usage Examples

In-Memory Catalog

import daft
from daft.catalog import Catalog

# Create catalog from Python data
data = {
    "users": {
        "id": [1, 2, 3],
        "name": ["Alice", "Bob", "Charlie"],
        "email": ["alice@example.com", "bob@example.com", "charlie@example.com"]
    },
    "orders": {
        "order_id": [101, 102, 103],
        "user_id": [1, 2, 1],
        "amount": [250.0, 180.0, 320.0]
    }
}

catalog = Catalog.from_pydict(data, name="sales_catalog")

# List available tables
tables = catalog.list_tables()
print(f"Available tables: {tables}")

# Read table as DataFrame
users_df = catalog.get_table("users").read()
orders_df = catalog.get_table("orders").read()

Iceberg Catalog Integration

# Connect to Iceberg catalog
try:
    from pyiceberg.catalog import load_catalog
    
    # Load Iceberg catalog configuration
    iceberg_catalog = load_catalog("my_iceberg_catalog", 
                                   uri="http://localhost:8181",
                                   warehouse="s3://my-warehouse/")
    
    # Create Daft catalog wrapper
    catalog = Catalog.from_iceberg(iceberg_catalog)
    
    # List namespaces and tables
    namespaces = catalog.list_namespaces()
    tables = catalog.list_tables()
    
    # Read Iceberg table
    sales_table = catalog.get_table("sales.transactions")
    sales_df = sales_table.read()
    
except ImportError:
    print("Install iceberg support: pip install 'daft[iceberg]'")

Unity Catalog Integration

# Connect to Unity Catalog
try:
    from unitycatalog import UnityCatalogClient
    
    # Create Unity Catalog client
    unity_client = UnityCatalogClient(
        base_url="https://unity-catalog-server.com",
        token="your-access-token"
    )
    
    # Create Daft catalog
    catalog = Catalog.from_unity(unity_client)
    
    # Work with Unity Catalog tables
    table = catalog.get_table("main.sales.customers")
    customers_df = table.read()
    
    # Write back to Unity Catalog
    processed_df = customers_df.filter(daft.col("active") == True)
    table.append(processed_df)
    
except ImportError:
    print("Install Unity support: pip install 'daft[unity]'")

AWS Glue Catalog

# Connect to AWS Glue
try:
    import boto3
    
    # Create Glue catalog
    catalog = Catalog.from_glue(
        name="my-glue-database",
        session=boto3.Session(region_name="us-west-2")
    )
    
    # List Glue tables
    tables = catalog.list_tables()
    
    # Read from Glue table
    glue_table = catalog.get_table("customer_data")
    df = glue_table.read()
    
except ImportError:
    print("Install AWS support: pip install 'daft[aws]'")

S3 Tables Integration

# Connect to S3 Tables
try:
    catalog = Catalog.from_s3tables(
        table_bucket_arn="arn:aws:s3:::my-s3tables-bucket"
    )
    
    # List S3 Tables
    tables = catalog.list_tables()
    
    # Read S3 Table
    s3_table = catalog.get_table("analytics.user_events")
    events_df = s3_table.read()
    
except ImportError:
    print("Install AWS support: pip install 'daft[aws]'")

Multi-Catalog Operations

# Work with multiple catalogs
iceberg_catalog = Catalog.from_iceberg(iceberg_instance)
unity_catalog = Catalog.from_unity(unity_instance) 

# Read from different catalogs
source_df = iceberg_catalog.get_table("source.raw_data").read()
reference_df = unity_catalog.get_table("reference.lookup_table").read()

# Join data from different catalogs
joined_df = source_df.join(
    reference_df,
    on=daft.col("key") == daft.col("reference_key")
)

# Write result to another catalog
result_table = iceberg_catalog.create_table("processed.joined_data", joined_df)
result_table.append(joined_df)

Namespace Management

from daft.catalog import Identifier

# Create hierarchical namespace
namespace_id = Identifier("analytics", "customer_data")

# Check if namespace exists
if catalog.has_namespace(namespace_id):
    print("Namespace exists")
    
# Create namespace if needed
catalog.create_namespace_if_not_exists(namespace_id)

# List tables in namespace
tables_in_namespace = catalog.list_tables(pattern="analytics.customer_data.*")

Table Management

# Create table from DataFrame
df = daft.from_pydict({
    "product_id": [1, 2, 3],
    "name": ["Widget A", "Widget B", "Widget C"],
    "price": [19.99, 29.99, 39.99]
})

# Create table in catalog
table_id = Identifier("inventory", "products")
products_table = catalog.create_table(table_id, df)

# Check table properties
schema = products_table.schema()
table_name = products_table.name

# Update table data
new_products = daft.from_pydict({
    "product_id": [4, 5],
    "name": ["Widget D", "Widget E"],
    "price": [49.99, 59.99]
})

products_table.append(new_products)

Advanced Table Operations

# Table with partitioning and properties
partitioned_df = daft.read_parquet("s3://data/sales/*.parquet")

# Create partitioned table
properties = {
    "format-version": "2",
    "write.parquet.compression-codec": "snappy"
}

sales_table = catalog.create_table(
    "sales.transactions",
    partitioned_df,
    properties=properties
)

# Read with predicate pushdown
filtered_df = sales_table.read(
    columns=["date", "amount", "customer_id"],
    predicate=daft.col("date") >= "2024-01-01"
)

Catalog Discovery and Metadata

# Discover catalog structure
def explore_catalog(catalog: Catalog):
    print(f"Catalog: {catalog.name}")
    
    # List all namespaces
    namespaces = catalog.list_namespaces()
    for namespace in namespaces:
        print(f"  Namespace: {namespace}")
        
        # List tables in namespace
        tables = catalog.list_tables(pattern=f"{namespace}.*")
        for table_id in tables:
            table = catalog.get_table(table_id)
            schema = table.schema()
            print(f"    Table: {table.name}")
            print(f"      Columns: {schema.column_names}")

# Explore all connected catalogs
explore_catalog(catalog)

Error Handling

from daft.catalog import NotFoundError

try:
    # Attempt to get non-existent table
    table = catalog.get_table("non_existent.table")
except NotFoundError:
    print("Table not found")

# Safe table access
if catalog.has_table("sales.customers"):
    customers = catalog.get_table("sales.customers").read()
else:
    print("Creating customers table...")
    # Create table logic here

Integration with Session Management

# Register catalog in session
daft.attach_catalog("main_catalog", catalog)

# Use catalog tables in SQL
result = daft.sql("SELECT * FROM main_catalog.sales.customers WHERE active = true")

# List registered catalogs
catalogs = daft.list_catalogs()

Data Catalog Types

class DataCatalogTable:
    """Representation of table in data catalog."""

class DataCatalogType:
    """Enumeration of supported catalog types."""

Daft's catalog integration provides a unified interface for working with diverse data catalog systems, enabling metadata-driven data discovery and governance across different platforms and storage systems.

Install with Tessl CLI

npx tessl i tessl/pypi-daft

docs

ai-ml.md

catalog.md

data-io.md

dataframe-operations.md

expressions.md

index.md

session.md

sql.md

udf.md

tile.json