Distributed task queue with full async support for Python applications
Event system for lifecycle management and global state container for coordinating between brokers, tasks, and application components. Enables custom logic execution at key points in the task processing lifecycle.
Event-driven hooks that allow custom code execution at specific points in the task processing lifecycle.
class TaskiqEvents:
"""
Event types for broker and task lifecycle.
Events are triggered at specific points during broker and task
execution, allowing custom logic to be executed via event handlers.
"""
CLIENT_STARTUP: str = "CLIENT_STARTUP"
"""Triggered when broker starts up in client mode."""
CLIENT_SHUTDOWN: str = "CLIENT_SHUTDOWN"
"""Triggered when broker shuts down in client mode."""
WORKER_STARTUP: str = "WORKER_STARTUP"
"""Triggered when broker starts up in worker mode."""
WORKER_SHUTDOWN: str = "WORKER_SHUTDOWN"
"""Triggered when broker shuts down in worker mode."""Methods for registering event handlers on brokers to respond to lifecycle events.
def on_event(
self,
*events: TaskiqEvents,
) -> Callable[[EventHandler], EventHandler]:
"""
Decorator for registering event handlers.
Args:
*events: One or more events to handle
Returns:
Decorator function for handler registration
"""
def add_event_handler(
self,
event: TaskiqEvents,
handler: EventHandler,
) -> None:
"""
Programmatically add event handler.
Args:
event: Event type to handle
handler: Handler function to execute
"""
def with_event_handlers(
self,
event: TaskiqEvents,
*handlers: EventHandler,
) -> Self:
"""
Builder method for adding event handlers.
Args:
event: Event type to handle
*handlers: Handler functions to execute
Returns:
Updated broker instance
"""Global state container for sharing data across broker, tasks, and application components.
class TaskiqState:
"""
Global state container for broker and task coordination.
Provides thread-safe storage for sharing data between
different parts of the taskiq system including brokers,
tasks, middleware, and application components.
"""
def __init__(self) -> None:
"""Initialize empty state container."""
def set_value(self, key: str, value: Any) -> None:
"""
Store value in global state.
Args:
key: State key identifier
value: Value to store
"""
def get_value(self, key: str, default: Any = None) -> Any:
"""
Retrieve value from global state.
Args:
key: State key identifier
default: Default value if key not found
Returns:
Stored value or default
"""
def delete_value(self, key: str) -> None:
"""
Remove value from global state.
Args:
key: State key identifier
"""
def clear(self) -> None:
"""Clear all values from state."""
def keys(self) -> List[str]:
"""Get list of all state keys."""
def values(self) -> List[Any]:
"""Get list of all state values."""
def items(self) -> List[Tuple[str, Any]]:
"""Get list of all state key-value pairs."""from taskiq import InMemoryBroker, TaskiqEvents
broker = InMemoryBroker()
@broker.on_event(TaskiqEvents.WORKER_STARTUP)
async def on_worker_startup(state):
"""Initialize worker resources."""
print("Worker starting up...")
# Initialize database connections
state.set_value("db_connection", await create_db_connection())
# Initialize cache
state.set_value("cache", await create_cache_client())
# Log startup
print("Worker initialized successfully")
@broker.on_event(TaskiqEvents.WORKER_SHUTDOWN)
async def on_worker_shutdown(state):
"""Cleanup worker resources."""
print("Worker shutting down...")
# Close database connections
db_conn = state.get_value("db_connection")
if db_conn:
await db_conn.close()
# Close cache connections
cache = state.get_value("cache")
if cache:
await cache.close()
print("Worker shutdown complete")
@broker.task
async def database_task(query: str) -> dict:
"""Task that uses shared database connection."""
# Access shared resources from state
db_conn = broker.state.get_value("db_connection")
cache = broker.state.get_value("cache")
# Check cache first
cached_result = await cache.get(query)
if cached_result:
return cached_result
# Execute database query
result = await db_conn.execute(query)
# Cache result
await cache.set(query, result, ttl=300)
return result# Register handler for multiple events
@broker.on_event(
TaskiqEvents.CLIENT_STARTUP,
TaskiqEvents.WORKER_STARTUP,
)
async def initialize_logging(state):
"""Initialize logging for both client and worker modes."""
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('taskiq')
state.set_value("logger", logger)
logger.info("Logging initialized")
@broker.on_event(TaskiqEvents.WORKER_STARTUP)
async def setup_metrics(state):
"""Initialize metrics collection."""
from prometheus_client import Counter, Histogram
# Create metrics
task_counter = Counter('taskiq_tasks_total', 'Total tasks executed')
task_duration = Histogram('taskiq_task_duration_seconds', 'Task duration')
state.set_value("task_counter", task_counter)
state.set_value("task_duration", task_duration)
logger = state.get_value("logger")
logger.info("Metrics initialized")async def database_startup_handler(state):
"""Startup handler for database initialization."""
import asyncpg
# Create connection pool
pool = await asyncpg.create_pool(
"postgresql://user:password@localhost/mydb",
min_size=5,
max_size=20,
)
state.set_value("db_pool", pool)
async def database_shutdown_handler(state):
"""Shutdown handler for database cleanup."""
pool = state.get_value("db_pool")
if pool:
await pool.close()
# Register handlers programmatically
broker.add_event_handler(
TaskiqEvents.WORKER_STARTUP,
database_startup_handler
)
broker.add_event_handler(
TaskiqEvents.WORKER_SHUTDOWN,
database_shutdown_handler
)
# Or using builder pattern
broker = (InMemoryBroker()
.with_event_handlers(
TaskiqEvents.WORKER_STARTUP,
database_startup_handler,
setup_metrics,
)
.with_event_handlers(
TaskiqEvents.WORKER_SHUTDOWN,
database_shutdown_handler,
))@broker.on_event(TaskiqEvents.WORKER_STARTUP)
async def initialize_shared_data(state):
"""Initialize shared configuration and resources."""
# Load configuration
config = await load_configuration()
state.set_value("config", config)
# Initialize shared cache
cache_data = {}
state.set_value("shared_cache", cache_data)
# Track worker statistics
stats = {
"tasks_executed": 0,
"start_time": time.time(),
"errors": 0,
}
state.set_value("worker_stats", stats)
@broker.task
async def cached_computation(input_data: str) -> dict:
"""Task that uses shared cache."""
# Access shared cache
cache = broker.state.get_value("shared_cache", {})
# Check cache
if input_data in cache:
return cache[input_data]
# Perform computation
result = await expensive_computation(input_data)
# Update cache
cache[input_data] = result
broker.state.set_value("shared_cache", cache)
# Update statistics
stats = broker.state.get_value("worker_stats", {})
stats["tasks_executed"] = stats.get("tasks_executed", 0) + 1
broker.state.set_value("worker_stats", stats)
return result
@broker.task
async def get_worker_statistics() -> dict:
"""Task to retrieve worker statistics."""
stats = broker.state.get_value("worker_stats", {})
config = broker.state.get_value("config", {})
return {
"statistics": stats,
"configuration": config,
"state_keys": broker.state.keys(),
}@broker.on_event(TaskiqEvents.WORKER_STARTUP)
async def robust_startup_handler(state):
"""Startup handler with error handling."""
logger = logging.getLogger('taskiq.startup')
try:
# Initialize critical resources
db_pool = await create_database_pool()
state.set_value("db_pool", db_pool)
logger.info("Database pool initialized")
except Exception as e:
logger.error(f"Failed to initialize database: {e}")
# Set fallback or raise to prevent worker start
raise
try:
# Initialize optional resources
cache = await create_cache_client()
state.set_value("cache", cache)
logger.info("Cache initialized")
except Exception as e:
logger.warning(f"Cache initialization failed: {e}")
# Continue without cache
state.set_value("cache", None)
@broker.on_event(TaskiqEvents.WORKER_SHUTDOWN)
async def safe_shutdown_handler(state):
"""Shutdown handler with safe resource cleanup."""
logger = logging.getLogger('taskiq.shutdown')
# Safe cleanup of database pool
try:
db_pool = state.get_value("db_pool")
if db_pool:
await db_pool.close()
logger.info("Database pool closed")
except Exception as e:
logger.error(f"Error closing database pool: {e}")
# Safe cleanup of cache
try:
cache = state.get_value("cache")
if cache:
await cache.close()
logger.info("Cache closed")
except Exception as e:
logger.error(f"Error closing cache: {e}")EventHandler = Callable[[TaskiqState], Optional[Awaitable[None]]]
"""
Type for event handler functions.
Event handlers can be either sync or async functions that take
a TaskiqState object as their only parameter.
"""Install with Tessl CLI
npx tessl i tessl/pypi-taskiq