CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-taskiq

Distributed task queue with full async support for Python applications

Overview
Eval results
Files

events-state.mddocs/

Events and State

Event system for lifecycle management and global state container for coordinating between brokers, tasks, and application components. Enables custom logic execution at key points in the task processing lifecycle.

Capabilities

Event System

Event-driven hooks that allow custom code execution at specific points in the task processing lifecycle.

class TaskiqEvents:
    """
    Event types for broker and task lifecycle.
    
    Events are triggered at specific points during broker and task
    execution, allowing custom logic to be executed via event handlers.
    """
    
    CLIENT_STARTUP: str = "CLIENT_STARTUP"
    """Triggered when broker starts up in client mode."""
    
    CLIENT_SHUTDOWN: str = "CLIENT_SHUTDOWN"
    """Triggered when broker shuts down in client mode."""
    
    WORKER_STARTUP: str = "WORKER_STARTUP"  
    """Triggered when broker starts up in worker mode."""
    
    WORKER_SHUTDOWN: str = "WORKER_SHUTDOWN"
    """Triggered when broker shuts down in worker mode."""

Event Handler Registration

Methods for registering event handlers on brokers to respond to lifecycle events.

def on_event(
    self,
    *events: TaskiqEvents,
) -> Callable[[EventHandler], EventHandler]:
    """
    Decorator for registering event handlers.
    
    Args:
        *events: One or more events to handle
        
    Returns:
        Decorator function for handler registration
    """

def add_event_handler(
    self,
    event: TaskiqEvents,
    handler: EventHandler,
) -> None:
    """
    Programmatically add event handler.
    
    Args:
        event: Event type to handle
        handler: Handler function to execute
    """

def with_event_handlers(
    self,
    event: TaskiqEvents,
    *handlers: EventHandler,
) -> Self:
    """
    Builder method for adding event handlers.
    
    Args:
        event: Event type to handle
        *handlers: Handler functions to execute
        
    Returns:
        Updated broker instance
    """

State Management

Global state container for sharing data across broker, tasks, and application components.

class TaskiqState:
    """
    Global state container for broker and task coordination.
    
    Provides thread-safe storage for sharing data between
    different parts of the taskiq system including brokers,
    tasks, middleware, and application components.
    """
    
    def __init__(self) -> None:
        """Initialize empty state container."""
    
    def set_value(self, key: str, value: Any) -> None:
        """
        Store value in global state.
        
        Args:
            key: State key identifier
            value: Value to store
        """
    
    def get_value(self, key: str, default: Any = None) -> Any:
        """
        Retrieve value from global state.
        
        Args:
            key: State key identifier  
            default: Default value if key not found
            
        Returns:
            Stored value or default
        """
    
    def delete_value(self, key: str) -> None:
        """
        Remove value from global state.
        
        Args:
            key: State key identifier
        """
    
    def clear(self) -> None:
        """Clear all values from state."""
    
    def keys(self) -> List[str]:
        """Get list of all state keys."""
    
    def values(self) -> List[Any]:
        """Get list of all state values."""
    
    def items(self) -> List[Tuple[str, Any]]:
        """Get list of all state key-value pairs."""

Usage Examples

Basic Event Handlers

from taskiq import InMemoryBroker, TaskiqEvents

broker = InMemoryBroker()

@broker.on_event(TaskiqEvents.WORKER_STARTUP)
async def on_worker_startup(state):
    """Initialize worker resources."""
    print("Worker starting up...")
    
    # Initialize database connections
    state.set_value("db_connection", await create_db_connection())
    
    # Initialize cache
    state.set_value("cache", await create_cache_client())
    
    # Log startup
    print("Worker initialized successfully")

@broker.on_event(TaskiqEvents.WORKER_SHUTDOWN)  
async def on_worker_shutdown(state):
    """Cleanup worker resources."""
    print("Worker shutting down...")
    
    # Close database connections
    db_conn = state.get_value("db_connection")
    if db_conn:
        await db_conn.close()
    
    # Close cache connections
    cache = state.get_value("cache")
    if cache:
        await cache.close()
    
    print("Worker shutdown complete")

@broker.task
async def database_task(query: str) -> dict:
    """Task that uses shared database connection."""
    # Access shared resources from state
    db_conn = broker.state.get_value("db_connection")
    cache = broker.state.get_value("cache")
    
    # Check cache first
    cached_result = await cache.get(query)
    if cached_result:
        return cached_result
    
    # Execute database query
    result = await db_conn.execute(query)
    
    # Cache result
    await cache.set(query, result, ttl=300)
    
    return result

Multiple Event Handlers

# Register handler for multiple events
@broker.on_event(
    TaskiqEvents.CLIENT_STARTUP,
    TaskiqEvents.WORKER_STARTUP,
)
async def initialize_logging(state):
    """Initialize logging for both client and worker modes."""
    import logging
    
    # Configure logging
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    
    logger = logging.getLogger('taskiq')
    state.set_value("logger", logger)
    
    logger.info("Logging initialized")

@broker.on_event(TaskiqEvents.WORKER_STARTUP)
async def setup_metrics(state):
    """Initialize metrics collection."""
    from prometheus_client import Counter, Histogram
    
    # Create metrics
    task_counter = Counter('taskiq_tasks_total', 'Total tasks executed')
    task_duration = Histogram('taskiq_task_duration_seconds', 'Task duration')
    
    state.set_value("task_counter", task_counter)
    state.set_value("task_duration", task_duration)
    
    logger = state.get_value("logger")
    logger.info("Metrics initialized")

Programmatic Event Handler Registration

async def database_startup_handler(state):
    """Startup handler for database initialization."""
    import asyncpg
    
    # Create connection pool
    pool = await asyncpg.create_pool(
        "postgresql://user:password@localhost/mydb",
        min_size=5,
        max_size=20,
    )
    state.set_value("db_pool", pool)

async def database_shutdown_handler(state):
    """Shutdown handler for database cleanup."""
    pool = state.get_value("db_pool")
    if pool:
        await pool.close()

# Register handlers programmatically
broker.add_event_handler(
    TaskiqEvents.WORKER_STARTUP,
    database_startup_handler
)
broker.add_event_handler(
    TaskiqEvents.WORKER_SHUTDOWN,
    database_shutdown_handler
)

# Or using builder pattern
broker = (InMemoryBroker()
    .with_event_handlers(
        TaskiqEvents.WORKER_STARTUP,
        database_startup_handler,
        setup_metrics,
    )
    .with_event_handlers(
        TaskiqEvents.WORKER_SHUTDOWN,
        database_shutdown_handler,
    ))

State Management Examples

@broker.on_event(TaskiqEvents.WORKER_STARTUP)
async def initialize_shared_data(state):
    """Initialize shared configuration and resources."""
    # Load configuration
    config = await load_configuration()
    state.set_value("config", config)
    
    # Initialize shared cache
    cache_data = {}
    state.set_value("shared_cache", cache_data)
    
    # Track worker statistics
    stats = {
        "tasks_executed": 0,
        "start_time": time.time(),
        "errors": 0,
    }
    state.set_value("worker_stats", stats)

@broker.task
async def cached_computation(input_data: str) -> dict:
    """Task that uses shared cache."""
    # Access shared cache
    cache = broker.state.get_value("shared_cache", {})
    
    # Check cache
    if input_data in cache:
        return cache[input_data]
    
    # Perform computation
    result = await expensive_computation(input_data)
    
    # Update cache
    cache[input_data] = result
    broker.state.set_value("shared_cache", cache)
    
    # Update statistics
    stats = broker.state.get_value("worker_stats", {})
    stats["tasks_executed"] = stats.get("tasks_executed", 0) + 1
    broker.state.set_value("worker_stats", stats)
    
    return result

@broker.task
async def get_worker_statistics() -> dict:
    """Task to retrieve worker statistics."""
    stats = broker.state.get_value("worker_stats", {})
    config = broker.state.get_value("config", {})
    
    return {
        "statistics": stats,
        "configuration": config,
        "state_keys": broker.state.keys(),
    }

Error Handling in Event Handlers

@broker.on_event(TaskiqEvents.WORKER_STARTUP)
async def robust_startup_handler(state):
    """Startup handler with error handling."""
    logger = logging.getLogger('taskiq.startup')
    
    try:
        # Initialize critical resources
        db_pool = await create_database_pool()
        state.set_value("db_pool", db_pool)
        logger.info("Database pool initialized")
        
    except Exception as e:
        logger.error(f"Failed to initialize database: {e}")
        # Set fallback or raise to prevent worker start
        raise
    
    try:
        # Initialize optional resources
        cache = await create_cache_client()
        state.set_value("cache", cache)
        logger.info("Cache initialized")
        
    except Exception as e:
        logger.warning(f"Cache initialization failed: {e}")
        # Continue without cache
        state.set_value("cache", None)

@broker.on_event(TaskiqEvents.WORKER_SHUTDOWN)
async def safe_shutdown_handler(state):
    """Shutdown handler with safe resource cleanup."""
    logger = logging.getLogger('taskiq.shutdown')
    
    # Safe cleanup of database pool
    try:
        db_pool = state.get_value("db_pool")
        if db_pool:
            await db_pool.close()
            logger.info("Database pool closed")
    except Exception as e:
        logger.error(f"Error closing database pool: {e}")
    
    # Safe cleanup of cache
    try:
        cache = state.get_value("cache")
        if cache:
            await cache.close()
            logger.info("Cache closed")
    except Exception as e:
        logger.error(f"Error closing cache: {e}")

Types

EventHandler = Callable[[TaskiqState], Optional[Awaitable[None]]]
"""
Type for event handler functions.

Event handlers can be either sync or async functions that take
a TaskiqState object as their only parameter.
"""

Install with Tessl CLI

npx tessl i tessl/pypi-taskiq

docs

brokers.md

events-state.md

exceptions.md

index.md

middleware.md

result-backends.md

scheduling.md

tasks-results.md

tile.json