Distributed Dataframes for Multimodal Data with high-performance query engine and support for complex nested data structures, AI/ML operations, and seamless cloud storage integration.
—
Session-based configuration and resource management for distributed computing. Handles catalog connections, temporary tables, execution settings, and provides a unified context for all Daft operations.
Core session management for distributed DataFrame operations.
class Session:
"""Main session class for distributed computing configuration."""
def __init__(self): ...
def get_catalog(self, name: str) -> Catalog:
"""Get attached catalog by name."""
def list_catalogs(self) -> List[str]:
"""List all attached catalogs."""
def attach_catalog(self, name: str, catalog: Catalog) -> None:
"""Attach catalog to session."""
def detach_catalog(self, name: str) -> None:
"""Detach catalog from session."""
def current_session() -> Session:
"""
Get current session instance.
Returns:
Session: Current active session
"""
def set_session(session: Session) -> None:
"""
Set current session.
Parameters:
- session: Session instance to make current
"""
def session() -> Session:
"""
Get or create session instance.
Returns:
Session: Session instance (creates if none exists)
"""Attach and manage data catalogs within sessions.
def attach_catalog(name: str, catalog: Catalog) -> None:
"""
Attach catalog to current session.
Parameters:
- name: Catalog name for reference
- catalog: Catalog instance to attach
"""
def detach_catalog(name: str) -> None:
"""
Detach catalog from current session.
Parameters:
- name: Name of catalog to detach
"""
def current_catalog() -> str:
"""
Get current catalog name.
Returns:
str: Name of current catalog
"""
def set_catalog(name: str) -> None:
"""
Set current catalog.
Parameters:
- name: Name of catalog to make current
"""
def get_catalog(name: str) -> Catalog:
"""
Get catalog by name.
Parameters:
- name: Catalog name
Returns:
Catalog: Catalog instance
"""
def has_catalog(name: str) -> bool:
"""
Check if catalog exists in session.
Parameters:
- name: Catalog name to check
Returns:
bool: True if catalog exists
"""
def list_catalogs() -> List[str]:
"""
List all attached catalogs.
Returns:
List[str]: List of catalog names
"""Manage catalog namespaces within the session context.
def create_namespace(name: str) -> None:
"""
Create namespace in current catalog.
Parameters:
- name: Namespace name to create
"""
def create_namespace_if_not_exists(name: str) -> None:
"""
Create namespace if it doesn't exist.
Parameters:
- name: Namespace name
"""
def drop_namespace(name: str) -> None:
"""
Drop namespace from current catalog.
Parameters:
- name: Namespace name to drop
"""
def current_namespace() -> str:
"""
Get current namespace name.
Returns:
str: Current namespace name
"""
def set_namespace(name: str) -> None:
"""
Set current namespace.
Parameters:
- name: Namespace name to set as current
"""
def has_namespace(name: str) -> bool:
"""
Check if namespace exists.
Parameters:
- name: Namespace name to check
Returns:
bool: True if namespace exists
"""Register and manage temporary tables and catalog tables.
def attach_table(df: DataFrame, name: str) -> None:
"""
Attach DataFrame as temporary table.
Parameters:
- df: DataFrame to attach
- name: Table name for reference
"""
def detach_table(name: str) -> None:
"""
Detach temporary table.
Parameters:
- name: Table name to detach
"""
def create_table(name: str, source: Union[Schema, DataFrame]) -> Table:
"""
Create table in current catalog.
Parameters:
- name: Table name
- source: Schema or DataFrame to create table from
Returns:
Table: Created table instance
"""
def create_table_if_not_exists(name: str, source: Union[Schema, DataFrame]) -> Table:
"""
Create table if it doesn't exist.
Parameters:
- name: Table name
- source: Schema or DataFrame
Returns:
Table: Table instance (existing or newly created)
"""
def create_temp_table(name: str, df: DataFrame) -> None:
"""
Create temporary table from DataFrame.
Parameters:
- name: Temporary table name
- df: DataFrame to use as table data
"""
def drop_table(name: str) -> None:
"""
Drop table from current catalog.
Parameters:
- name: Table name to drop
"""
def get_table(name: str) -> Table:
"""
Get table by name.
Parameters:
- name: Table name
Returns:
Table: Table instance
"""
def has_table(name: str) -> bool:
"""
Check if table exists.
Parameters:
- name: Table name to check
Returns:
bool: True if table exists
"""
def list_tables() -> List[str]:
"""
List all available tables.
Returns:
List[str]: List of table names
"""
def read_table(name: str, **options: Any) -> DataFrame:
"""
Read table as DataFrame.
Parameters:
- name: Table name to read
- options: Additional read options
Returns:
DataFrame: Table data as DataFrame
"""
def write_table(name: str, df: DataFrame, **options: Any) -> None:
"""
Write DataFrame to table.
Parameters:
- name: Table name
- df: DataFrame to write
- options: Additional write options
"""Manage data providers and external service connections.
def attach_provider(name: str, provider: Any) -> None:
"""
Attach data provider to session.
Parameters:
- name: Provider name
- provider: Provider instance
"""
def detach_provider(name: str) -> None:
"""
Detach data provider.
Parameters:
- name: Provider name to detach
"""
def current_provider() -> str:
"""
Get current provider name.
Returns:
str: Current provider name
"""
def set_provider(name: str) -> None:
"""
Set current provider.
Parameters:
- name: Provider name to set as current
"""
def get_provider(name: str) -> Any:
"""
Get provider by name.
Parameters:
- name: Provider name
Returns:
Any: Provider instance
"""
def has_provider(name: str) -> bool:
"""
Check if provider exists.
Parameters:
- name: Provider name to check
Returns:
bool: True if provider exists
"""Register custom functions for use across the session.
def attach_function(name: str, func: Callable) -> None:
"""
Attach function to session for global use.
Parameters:
- name: Function name for reference
- func: Callable function to attach
"""
def detach_function(name: str) -> None:
"""
Detach function from session.
Parameters:
- name: Function name to detach
"""Manage AI/ML models within the session context.
def current_model() -> str:
"""
Get current model name.
Returns:
str: Current model name
"""
def set_model(name: str) -> None:
"""
Set current model for AI operations.
Parameters:
- name: Model name to set as current
"""Configure execution and planning settings for the session.
def set_execution_config(config: ExecutionConfig) -> None:
"""
Set execution configuration for the session.
Parameters:
- config: Execution configuration settings
"""
def set_planning_config(config: PlanningConfig) -> None:
"""
Set query planning configuration.
Parameters:
- config: Planning configuration settings
"""
def execution_config_ctx(config: ExecutionConfig) -> ContextManager:
"""
Context manager for temporary execution config.
Parameters:
- config: Temporary execution configuration
Returns:
ContextManager: Context manager for config scope
"""
def planning_config_ctx(config: PlanningConfig) -> ContextManager:
"""
Context manager for temporary planning config.
Parameters:
- config: Temporary planning configuration
Returns:
ContextManager: Context manager for config scope
"""Generic attachment mechanism for session objects.
def attach(obj: Any, name: str) -> None:
"""
Attach generic object to session.
Parameters:
- obj: Object to attach
- name: Name for reference
"""import daft
from daft.catalog import Catalog
# Create or get current session
session = daft.current_session()
# Create catalogs
sales_catalog = Catalog.from_pydict({
"customers": {"id": [1, 2, 3], "name": ["A", "B", "C"]},
"orders": {"order_id": [101, 102], "customer_id": [1, 2]}
})
inventory_catalog = Catalog.from_pydict({
"products": {"id": [1, 2], "name": ["Widget", "Gadget"]}
})
# Attach catalogs to session
daft.attach_catalog("sales", sales_catalog)
daft.attach_catalog("inventory", inventory_catalog)
# List available catalogs
print(f"Available catalogs: {daft.list_catalogs()}")# Set current catalog
daft.set_catalog("sales")
# Work with tables in current catalog
customers_df = daft.read_table("customers")
orders_df = daft.read_table("orders")
# Switch to different catalog
daft.set_catalog("inventory")
products_df = daft.read_table("products")
# Join data across catalogs
result = customers_df.join(
orders_df,
on=daft.col("id") == daft.col("customer_id")
).join(
products_df.rename({"id": "product_id"}),
on=daft.col("product_id") == daft.col("product_id")
)# Create temporary table
temp_data = daft.from_pydict({
"region": ["North", "South", "East", "West"],
"sales": [1000, 1500, 1200, 800]
})
daft.attach_table(temp_data, "regional_sales")
# Use in SQL queries
sql_result = daft.sql("""
SELECT r.region, r.sales, c.name
FROM regional_sales r
JOIN sales.customers c ON c.id <= 2
ORDER BY r.sales DESC
""")
# Clean up temporary table
daft.detach_table("regional_sales")from daft.context import ExecutionConfig, PlanningConfig
# Configure execution settings
exec_config = ExecutionConfig(
default_morsel_size=1000000,
num_scan_tasks=16
)
daft.set_execution_config(exec_config)
# Configure planning settings
plan_config = PlanningConfig(
broadcast_join_size_threshold=100000
)
daft.set_planning_config(plan_config)
# Use temporary configuration
with daft.execution_config_ctx(ExecutionConfig(num_scan_tasks=32)):
# This operation uses 32 scan tasks
large_df = daft.read_parquet("s3://bucket/large-dataset/*.parquet")
result = large_df.groupby("category").count().collect()# Define custom function
@daft.func
def custom_transform(value: str) -> str:
return value.upper() + "_PROCESSED"
# Register function globally
daft.attach_function("global_transform", custom_transform)
# Use registered function in SQL
sql_with_udf = daft.sql("""
SELECT name, global_transform(name) as processed_name
FROM sales.customers
""")
# Detach when no longer needed
daft.detach_function("global_transform")def setup_development_session():
"""Setup session for development environment."""
# Development catalogs with sample data
dev_catalog = Catalog.from_pydict({
"users": {"id": [1, 2], "name": ["Dev User 1", "Dev User 2"]}
})
daft.attach_catalog("main", dev_catalog)
daft.set_catalog("main")
# Development-specific configuration
daft.set_execution_config(ExecutionConfig(
default_morsel_size=10000 # Smaller for dev
))
def setup_production_session():
"""Setup session for production environment."""
# Production catalogs from external systems
from pyiceberg.catalog import load_catalog
prod_iceberg = load_catalog("prod_catalog")
prod_catalog = Catalog.from_iceberg(prod_iceberg)
daft.attach_catalog("main", prod_catalog)
daft.set_catalog("main")
# Production-optimized configuration
daft.set_execution_config(ExecutionConfig(
default_morsel_size=10000000 # Larger for production
))
# Environment-specific setup
import os
if os.getenv("ENVIRONMENT") == "production":
setup_production_session()
else:
setup_development_session()class DataPipeline:
def __init__(self):
self.session = daft.session()
self.setup_catalogs()
self.register_functions()
def setup_catalogs(self):
"""Setup all required catalogs."""
# Raw data catalog
raw_catalog = Catalog.from_s3tables("arn:aws:s3:::raw-data-bucket")
daft.attach_catalog("raw", raw_catalog)
# Processed data catalog
processed_catalog = Catalog.from_unity(unity_client)
daft.attach_catalog("processed", processed_catalog)
# Set default catalog
daft.set_catalog("processed")
def register_functions(self):
"""Register pipeline-specific functions."""
@daft.func
def clean_text(text: str) -> str:
return text.strip().lower()
@daft.func
def validate_email(email: str) -> bool:
return "@" in email and "." in email
daft.attach_function("clean_text", clean_text)
daft.attach_function("validate_email", validate_email)
def run_pipeline(self):
"""Execute data pipeline."""
# Read from raw data
daft.set_catalog("raw")
raw_df = daft.read_table("user_data")
# Process data
cleaned_df = raw_df.select(
daft.col("id"),
daft.sql_expr("clean_text(name)").alias("name"),
daft.col("email")
).filter(
daft.sql_expr("validate_email(email)")
)
# Write to processed catalog
daft.set_catalog("processed")
daft.write_table("clean_users", cleaned_df)
def cleanup(self):
"""Clean up session resources."""
daft.detach_function("clean_text")
daft.detach_function("validate_email")
daft.detach_catalog("raw")
daft.detach_catalog("processed")
# Use pipeline
pipeline = DataPipeline()
try:
pipeline.run_pipeline()
finally:
pipeline.cleanup()def inspect_session():
"""Inspect current session state."""
print(f"Current session: {daft.current_session()}")
print(f"Current catalog: {daft.current_catalog()}")
print(f"Current namespace: {daft.current_namespace()}")
print(f"Available catalogs: {daft.list_catalogs()}")
print(f"Available tables: {daft.list_tables()}")
# Check specific resources
if daft.has_catalog("main"):
print("Main catalog is available")
if daft.has_table("users"):
print("Users table is available")
# Inspect current state
inspect_session()def safe_session_operation():
"""Perform session operations with error handling."""
try:
# Setup catalogs
catalog = Catalog.from_iceberg(iceberg_instance)
daft.attach_catalog("main", catalog)
# Perform operations
df = daft.read_table("main.sales.transactions")
result = df.groupby("region").sum("amount").collect()
return result
except Exception as e:
print(f"Session operation failed: {e}")
# Cleanup on error
if daft.has_catalog("main"):
daft.detach_catalog("main")
raise
finally:
# Ensure cleanup
print("Session operation completed")
# Safe execution
try:
result = safe_session_operation()
except Exception:
print("Operation failed, session cleaned up")Session management in Daft provides a comprehensive framework for organizing and coordinating distributed data operations across multiple catalogs, tables, and computational resources with proper lifecycle management and configuration control.
Install with Tessl CLI
npx tessl i tessl/pypi-daft