CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-daft

Distributed Dataframes for Multimodal Data with high-performance query engine and support for complex nested data structures, AI/ML operations, and seamless cloud storage integration.

Pending
Overview
Eval results
Files

session.mddocs/

Session Management

Session-based configuration and resource management for distributed computing. Handles catalog connections, temporary tables, execution settings, and provides a unified context for all Daft operations.

Capabilities

Session Interface

Core session management for distributed DataFrame operations.

class Session:
    """Main session class for distributed computing configuration."""
    
    def __init__(self): ...
    
    def get_catalog(self, name: str) -> Catalog:
        """Get attached catalog by name."""
    
    def list_catalogs(self) -> List[str]:
        """List all attached catalogs."""
    
    def attach_catalog(self, name: str, catalog: Catalog) -> None:
        """Attach catalog to session."""
    
    def detach_catalog(self, name: str) -> None:
        """Detach catalog from session."""

def current_session() -> Session:
    """
    Get current session instance.
    
    Returns:
    Session: Current active session
    """

def set_session(session: Session) -> None:
    """
    Set current session.
    
    Parameters:
    - session: Session instance to make current
    """

def session() -> Session:
    """
    Get or create session instance.
    
    Returns:
    Session: Session instance (creates if none exists)
    """

Catalog Management

Attach and manage data catalogs within sessions.

def attach_catalog(name: str, catalog: Catalog) -> None:
    """
    Attach catalog to current session.
    
    Parameters:
    - name: Catalog name for reference
    - catalog: Catalog instance to attach
    """

def detach_catalog(name: str) -> None:
    """
    Detach catalog from current session.
    
    Parameters:
    - name: Name of catalog to detach
    """

def current_catalog() -> str:
    """
    Get current catalog name.
    
    Returns:
    str: Name of current catalog
    """

def set_catalog(name: str) -> None:
    """
    Set current catalog.
    
    Parameters:
    - name: Name of catalog to make current
    """

def get_catalog(name: str) -> Catalog:
    """
    Get catalog by name.
    
    Parameters:
    - name: Catalog name
    
    Returns:
    Catalog: Catalog instance
    """

def has_catalog(name: str) -> bool:
    """
    Check if catalog exists in session.
    
    Parameters:
    - name: Catalog name to check
    
    Returns:
    bool: True if catalog exists
    """

def list_catalogs() -> List[str]:
    """
    List all attached catalogs.
    
    Returns:
    List[str]: List of catalog names
    """

Namespace Management

Manage catalog namespaces within the session context.

def create_namespace(name: str) -> None:
    """
    Create namespace in current catalog.
    
    Parameters:
    - name: Namespace name to create
    """

def create_namespace_if_not_exists(name: str) -> None:
    """
    Create namespace if it doesn't exist.
    
    Parameters:
    - name: Namespace name
    """

def drop_namespace(name: str) -> None:
    """
    Drop namespace from current catalog.
    
    Parameters:
    - name: Namespace name to drop
    """

def current_namespace() -> str:
    """
    Get current namespace name.
    
    Returns:
    str: Current namespace name
    """

def set_namespace(name: str) -> None:
    """
    Set current namespace.
    
    Parameters:
    - name: Namespace name to set as current
    """

def has_namespace(name: str) -> bool:
    """
    Check if namespace exists.
    
    Parameters:
    - name: Namespace name to check
    
    Returns:
    bool: True if namespace exists
    """

Table Management

Register and manage temporary tables and catalog tables.

def attach_table(df: DataFrame, name: str) -> None:
    """
    Attach DataFrame as temporary table.
    
    Parameters:
    - df: DataFrame to attach
    - name: Table name for reference
    """

def detach_table(name: str) -> None:
    """
    Detach temporary table.
    
    Parameters:
    - name: Table name to detach
    """

def create_table(name: str, source: Union[Schema, DataFrame]) -> Table:
    """
    Create table in current catalog.
    
    Parameters:
    - name: Table name
    - source: Schema or DataFrame to create table from
    
    Returns:
    Table: Created table instance
    """

def create_table_if_not_exists(name: str, source: Union[Schema, DataFrame]) -> Table:
    """
    Create table if it doesn't exist.
    
    Parameters:
    - name: Table name
    - source: Schema or DataFrame
    
    Returns:
    Table: Table instance (existing or newly created)
    """

def create_temp_table(name: str, df: DataFrame) -> None:
    """
    Create temporary table from DataFrame.
    
    Parameters:
    - name: Temporary table name
    - df: DataFrame to use as table data
    """

def drop_table(name: str) -> None:
    """
    Drop table from current catalog.
    
    Parameters:
    - name: Table name to drop
    """

def get_table(name: str) -> Table:
    """
    Get table by name.
    
    Parameters:
    - name: Table name
    
    Returns:
    Table: Table instance
    """

def has_table(name: str) -> bool:
    """
    Check if table exists.
    
    Parameters:
    - name: Table name to check
    
    Returns:
    bool: True if table exists
    """

def list_tables() -> List[str]:
    """
    List all available tables.
    
    Returns:
    List[str]: List of table names
    """

def read_table(name: str, **options: Any) -> DataFrame:
    """
    Read table as DataFrame.
    
    Parameters:
    - name: Table name to read
    - options: Additional read options
    
    Returns:
    DataFrame: Table data as DataFrame
    """

def write_table(name: str, df: DataFrame, **options: Any) -> None:
    """
    Write DataFrame to table.
    
    Parameters:
    - name: Table name
    - df: DataFrame to write
    - options: Additional write options
    """

Provider Management

Manage data providers and external service connections.

def attach_provider(name: str, provider: Any) -> None:
    """
    Attach data provider to session.
    
    Parameters:
    - name: Provider name
    - provider: Provider instance
    """

def detach_provider(name: str) -> None:
    """
    Detach data provider.
    
    Parameters:
    - name: Provider name to detach
    """

def current_provider() -> str:
    """
    Get current provider name.
    
    Returns:
    str: Current provider name
    """

def set_provider(name: str) -> None:
    """
    Set current provider.
    
    Parameters:
    - name: Provider name to set as current
    """

def get_provider(name: str) -> Any:
    """
    Get provider by name.
    
    Parameters:
    - name: Provider name
    
    Returns:
    Any: Provider instance
    """

def has_provider(name: str) -> bool:
    """
    Check if provider exists.
    
    Parameters:
    - name: Provider name to check
    
    Returns:
    bool: True if provider exists
    """

Function Management

Register custom functions for use across the session.

def attach_function(name: str, func: Callable) -> None:
    """
    Attach function to session for global use.
    
    Parameters:
    - name: Function name for reference
    - func: Callable function to attach
    """

def detach_function(name: str) -> None:
    """
    Detach function from session.
    
    Parameters:
    - name: Function name to detach
    """

Model Management

Manage AI/ML models within the session context.

def current_model() -> str:
    """
    Get current model name.
    
    Returns:
    str: Current model name
    """

def set_model(name: str) -> None:
    """
    Set current model for AI operations.
    
    Parameters:
    - name: Model name to set as current
    """

Configuration Management

Configure execution and planning settings for the session.

def set_execution_config(config: ExecutionConfig) -> None:
    """
    Set execution configuration for the session.
    
    Parameters:
    - config: Execution configuration settings
    """

def set_planning_config(config: PlanningConfig) -> None:
    """
    Set query planning configuration.
    
    Parameters:
    - config: Planning configuration settings
    """

def execution_config_ctx(config: ExecutionConfig) -> ContextManager:
    """
    Context manager for temporary execution config.
    
    Parameters:
    - config: Temporary execution configuration
    
    Returns:
    ContextManager: Context manager for config scope
    """

def planning_config_ctx(config: PlanningConfig) -> ContextManager:
    """
    Context manager for temporary planning config.
    
    Parameters:
    - config: Temporary planning configuration
    
    Returns:
    ContextManager: Context manager for config scope
    """

General Attachment

Generic attachment mechanism for session objects.

def attach(obj: Any, name: str) -> None:
    """
    Attach generic object to session.
    
    Parameters:
    - obj: Object to attach
    - name: Name for reference
    """

Usage Examples

Basic Session Setup

import daft
from daft.catalog import Catalog

# Create or get current session
session = daft.current_session()

# Create catalogs
sales_catalog = Catalog.from_pydict({
    "customers": {"id": [1, 2, 3], "name": ["A", "B", "C"]},
    "orders": {"order_id": [101, 102], "customer_id": [1, 2]}
})

inventory_catalog = Catalog.from_pydict({
    "products": {"id": [1, 2], "name": ["Widget", "Gadget"]}
})

# Attach catalogs to session
daft.attach_catalog("sales", sales_catalog)
daft.attach_catalog("inventory", inventory_catalog)

# List available catalogs
print(f"Available catalogs: {daft.list_catalogs()}")

Working with Multiple Catalogs

# Set current catalog
daft.set_catalog("sales")

# Work with tables in current catalog
customers_df = daft.read_table("customers")
orders_df = daft.read_table("orders")

# Switch to different catalog
daft.set_catalog("inventory")
products_df = daft.read_table("products")

# Join data across catalogs
result = customers_df.join(
    orders_df,
    on=daft.col("id") == daft.col("customer_id")
).join(
    products_df.rename({"id": "product_id"}),
    on=daft.col("product_id") == daft.col("product_id")
)

Temporary Tables and SQL

# Create temporary table
temp_data = daft.from_pydict({
    "region": ["North", "South", "East", "West"],
    "sales": [1000, 1500, 1200, 800]
})

daft.attach_table(temp_data, "regional_sales")

# Use in SQL queries
sql_result = daft.sql("""
    SELECT r.region, r.sales, c.name
    FROM regional_sales r
    JOIN sales.customers c ON c.id <= 2
    ORDER BY r.sales DESC
""")

# Clean up temporary table
daft.detach_table("regional_sales")

Session Configuration

from daft.context import ExecutionConfig, PlanningConfig

# Configure execution settings
exec_config = ExecutionConfig(
    default_morsel_size=1000000,
    num_scan_tasks=16
)
daft.set_execution_config(exec_config)

# Configure planning settings  
plan_config = PlanningConfig(
    broadcast_join_size_threshold=100000
)
daft.set_planning_config(plan_config)

# Use temporary configuration
with daft.execution_config_ctx(ExecutionConfig(num_scan_tasks=32)):
    # This operation uses 32 scan tasks
    large_df = daft.read_parquet("s3://bucket/large-dataset/*.parquet")
    result = large_df.groupby("category").count().collect()

Function Registration

# Define custom function
@daft.func
def custom_transform(value: str) -> str:
    return value.upper() + "_PROCESSED"

# Register function globally
daft.attach_function("global_transform", custom_transform)

# Use registered function in SQL
sql_with_udf = daft.sql("""
    SELECT name, global_transform(name) as processed_name
    FROM sales.customers
""")

# Detach when no longer needed
daft.detach_function("global_transform")

Multi-Environment Session Management

def setup_development_session():
    """Setup session for development environment."""
    # Development catalogs with sample data
    dev_catalog = Catalog.from_pydict({
        "users": {"id": [1, 2], "name": ["Dev User 1", "Dev User 2"]}
    })
    
    daft.attach_catalog("main", dev_catalog)
    daft.set_catalog("main")
    
    # Development-specific configuration
    daft.set_execution_config(ExecutionConfig(
        default_morsel_size=10000  # Smaller for dev
    ))

def setup_production_session():
    """Setup session for production environment."""
    # Production catalogs from external systems
    from pyiceberg.catalog import load_catalog
    
    prod_iceberg = load_catalog("prod_catalog")
    prod_catalog = Catalog.from_iceberg(prod_iceberg)
    
    daft.attach_catalog("main", prod_catalog)
    daft.set_catalog("main")
    
    # Production-optimized configuration
    daft.set_execution_config(ExecutionConfig(
        default_morsel_size=10000000  # Larger for production
    ))

# Environment-specific setup
import os
if os.getenv("ENVIRONMENT") == "production":
    setup_production_session()
else:
    setup_development_session()

Advanced Session Management

class DataPipeline:
    def __init__(self):
        self.session = daft.session()
        self.setup_catalogs()
        self.register_functions()
    
    def setup_catalogs(self):
        """Setup all required catalogs."""
        # Raw data catalog
        raw_catalog = Catalog.from_s3tables("arn:aws:s3:::raw-data-bucket")
        daft.attach_catalog("raw", raw_catalog)
        
        # Processed data catalog
        processed_catalog = Catalog.from_unity(unity_client)
        daft.attach_catalog("processed", processed_catalog)
        
        # Set default catalog
        daft.set_catalog("processed")
    
    def register_functions(self):
        """Register pipeline-specific functions."""
        @daft.func
        def clean_text(text: str) -> str:
            return text.strip().lower()
        
        @daft.func
        def validate_email(email: str) -> bool:
            return "@" in email and "." in email
        
        daft.attach_function("clean_text", clean_text)
        daft.attach_function("validate_email", validate_email)
    
    def run_pipeline(self):
        """Execute data pipeline."""
        # Read from raw data
        daft.set_catalog("raw")
        raw_df = daft.read_table("user_data")
        
        # Process data
        cleaned_df = raw_df.select(
            daft.col("id"),
            daft.sql_expr("clean_text(name)").alias("name"),
            daft.col("email")
        ).filter(
            daft.sql_expr("validate_email(email)")
        )
        
        # Write to processed catalog
        daft.set_catalog("processed")
        daft.write_table("clean_users", cleaned_df)
    
    def cleanup(self):
        """Clean up session resources."""
        daft.detach_function("clean_text")
        daft.detach_function("validate_email")
        daft.detach_catalog("raw")
        daft.detach_catalog("processed")

# Use pipeline
pipeline = DataPipeline()
try:
    pipeline.run_pipeline()
finally:
    pipeline.cleanup()

Session State Inspection

def inspect_session():
    """Inspect current session state."""
    print(f"Current session: {daft.current_session()}")
    print(f"Current catalog: {daft.current_catalog()}")
    print(f"Current namespace: {daft.current_namespace()}")
    print(f"Available catalogs: {daft.list_catalogs()}")
    print(f"Available tables: {daft.list_tables()}")
    
    # Check specific resources
    if daft.has_catalog("main"):
        print("Main catalog is available")
    
    if daft.has_table("users"):
        print("Users table is available")

# Inspect current state
inspect_session()

Error Handling and Recovery

def safe_session_operation():
    """Perform session operations with error handling."""
    try:
        # Setup catalogs
        catalog = Catalog.from_iceberg(iceberg_instance)
        daft.attach_catalog("main", catalog)
        
        # Perform operations
        df = daft.read_table("main.sales.transactions")
        result = df.groupby("region").sum("amount").collect()
        
        return result
        
    except Exception as e:
        print(f"Session operation failed: {e}")
        
        # Cleanup on error
        if daft.has_catalog("main"):
            daft.detach_catalog("main")
        
        raise
    
    finally:
        # Ensure cleanup
        print("Session operation completed")

# Safe execution
try:
    result = safe_session_operation()
except Exception:
    print("Operation failed, session cleaned up")

Session management in Daft provides a comprehensive framework for organizing and coordinating distributed data operations across multiple catalogs, tables, and computational resources with proper lifecycle management and configuration control.

Install with Tessl CLI

npx tessl i tessl/pypi-daft

docs

ai-ml.md

catalog.md

data-io.md

dataframe-operations.md

expressions.md

index.md

session.md

sql.md

udf.md

tile.json