Web-based tool for monitoring and administrating Celery clusters with real-time task tracking and worker management.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Complete task lifecycle management including execution, monitoring, filtering, search, and control operations for Celery distributed tasks.
Advanced task filtering and iteration capabilities for processing large numbers of tasks with various criteria.
def iter_tasks(events, limit=None, offset=0, type=None, worker=None,
state=None, sort_by=None, received_start=None, received_end=None,
started_start=None, started_end=None, search=None):
"""
Iterator for filtered tasks with pagination and sorting.
Args:
events: Events instance containing task data
limit (int, optional): Maximum number of tasks to return
offset (int, optional): Number of tasks to skip (pagination)
type (str, optional): Filter by task name/type
worker (str, optional): Filter by worker hostname
state (str, optional): Filter by task state (PENDING, SUCCESS, FAILURE, etc.)
sort_by (str, optional): Sort field ('name', 'state', 'received', 'started', 'runtime')
received_start (datetime, optional): Filter tasks received after this time
received_end (datetime, optional): Filter tasks received before this time
started_start (datetime, optional): Filter tasks started after this time
started_end (datetime, optional): Filter tasks started before this time
search (str, optional): Search term for task content
Yields:
dict: Filtered task objects matching the specified criteria
Supports complex filtering combinations and efficient pagination
for large task datasets.
"""
def sort_tasks(tasks, sort_by):
"""
Sort tasks by specified field.
Args:
tasks (list): List of task objects to sort
sort_by (str): Sort field ('name', 'state', 'received', 'started', 'runtime')
Returns:
list: Sorted list of tasks
Supports sorting by multiple fields with proper handling of None values
and different data types.
"""
def get_task_by_id(events, task_id):
"""
Retrieve specific task by UUID.
Args:
events: Events instance containing task data
task_id (str): Task UUID to retrieve
Returns:
dict or None: Task object if found, None otherwise
"""
def as_dict(task):
"""
Convert task object to dictionary representation.
Args:
task: Task object from Celery state
Returns:
dict: Task data as dictionary with all relevant fields
"""Advanced search capabilities for finding tasks based on various criteria.
def parse_search_terms(raw_search_value):
"""
Parse search query string into structured search terms.
Args:
raw_search_value (str): Raw search query string
Returns:
list: Parsed search terms with field specifications
Supports field-specific searches like 'name:my_task' or 'state:FAILURE'
and general text searches across task content.
"""
def satisfies_search_terms(task, search_terms):
"""
Check if task matches search criteria.
Args:
task (dict): Task object to check
search_terms (list): Parsed search terms from parse_search_terms
Returns:
bool: True if task matches all search terms
Performs comprehensive search across task name, arguments, result,
traceback, and other task metadata.
"""
def task_args_contains_search_args(task_args, search_args):
"""
Check if task arguments contain search terms.
Args:
task_args (list): Task arguments
search_args (list): Search terms to find
Returns:
bool: True if arguments contain search terms
Searches within task arguments and keyword arguments for specified terms.
"""Standard Celery task states used throughout the system.
# Task states from Celery
TASK_STATES = [
'PENDING', # Task is waiting for execution
'STARTED', # Task has been started
'SUCCESS', # Task completed successfully
'FAILURE', # Task failed with an exception
'RETRY', # Task is being retried
'REVOKED', # Task has been revoked/cancelled
]
# Sort key mappings for task sorting
sort_keys = {
'name': lambda task: task.name or '',
'state': lambda task: task.state or '',
'received': lambda task: task.received or 0,
'started': lambda task: task.started or 0,
'runtime': lambda task: task.runtime or 0,
'worker': lambda task: task.worker.hostname if task.worker else '',
}Execute tasks remotely with various execution modes and options.
def task_apply(task_name, args=None, kwargs=None, **options):
"""
Execute task synchronously and wait for result.
Args:
task_name (str): Name of task to execute
args (list, optional): Task arguments
kwargs (dict, optional): Task keyword arguments
**options: Additional task options (queue, countdown, eta, etc.)
Returns:
dict: Task result and metadata
Executes the task and waits for completion, returning the result
or raising an exception if the task fails.
"""
def task_async_apply(task_name, args=None, kwargs=None, **options):
"""
Execute task asynchronously without waiting for result.
Args:
task_name (str): Name of task to execute
args (list, optional): Task arguments
kwargs (dict, optional): Task keyword arguments
**options: Additional task options
Returns:
dict: Task ID and submission metadata
Submits the task for execution and returns immediately with task ID.
"""
def task_send_task(task_name, args=None, kwargs=None, **options):
"""
Send task without requiring task definition on sender.
Args:
task_name (str): Name of task to send
args (list, optional): Task arguments
kwargs (dict, optional): Task keyword arguments
**options: Additional task options
Returns:
dict: Task ID and submission metadata
Sends task using Celery's send_task, which doesn't require the
task to be registered locally.
"""Retrieve and manage task results and execution status.
def get_task_result(task_id, timeout=None):
"""
Get task result by ID.
Args:
task_id (str): Task UUID
timeout (float, optional): Maximum time to wait for result
Returns:
dict: Task result data including:
- result: Task return value
- state: Current task state
- traceback: Error traceback if failed
- success: Boolean success status
Retrieves result from the configured result backend.
"""
def task_abort(task_id):
"""
Abort running task.
Args:
task_id (str): Task UUID to abort
Returns:
dict: Abort operation status
Attempts to abort a running task if it supports abortion.
Only works with AbortableTask instances.
"""Remote control operations for managing task execution and behavior.
def task_revoke(task_id, terminate=False, signal='SIGTERM'):
"""
Revoke/cancel a task.
Args:
task_id (str): Task UUID to revoke
terminate (bool): Whether to terminate if already running
signal (str): Signal to send if terminating ('SIGTERM', 'SIGKILL')
Returns:
dict: Revocation status
Revokes a task, optionally terminating it if already executing.
"""
def task_rate_limit(task_name, rate_limit, workername=None):
"""
Set rate limit for task type.
Args:
task_name (str): Name of task to limit
rate_limit (str): Rate limit specification (e.g., '10/m', '1/s')
workername (str, optional): Specific worker to apply limit
Returns:
dict: Rate limit operation status
Sets execution rate limit for the specified task type.
"""
def task_timeout(task_name, soft=None, hard=None, workername=None):
"""
Set timeout limits for task type.
Args:
task_name (str): Name of task to configure
soft (float, optional): Soft timeout in seconds
hard (float, optional): Hard timeout in seconds
workername (str, optional): Specific worker to apply timeouts
Returns:
dict: Timeout configuration status
Configures soft and hard timeout limits for task execution.
"""Comprehensive task data structure containing all relevant execution and metadata information.
TaskInfo = {
# Basic identification
'uuid': str, # Unique task identifier
'name': str, # Task name/type
'state': str, # Current task state
'hostname': str, # Worker hostname executing task
# Timing information
'timestamp': float, # Task event timestamp
'received': float, # Time task was received by worker
'started': float, # Time task execution started
'succeeded': float, # Time task completed successfully
'failed': float, # Time task failed
'retried': float, # Time task was retried
'revoked': float, # Time task was revoked
'runtime': float, # Total execution time in seconds
# Task parameters
'args': list, # Task positional arguments
'kwargs': dict, # Task keyword arguments
'retries': int, # Number of retry attempts
'eta': str, # Estimated time of arrival
'expires': str, # Task expiration time
# Execution results
'result': Any, # Task return value (if successful)
'traceback': str, # Exception traceback (if failed)
'exception': str, # Exception message (if failed)
# Routing information
'queue': str, # Queue name task was sent to
'exchange': str, # Exchange name
'routing_key': str, # Routing key used
'priority': int, # Message priority
# Worker information
'worker': {
'hostname': str, # Worker hostname
'pid': int, # Worker process ID
'sw_ident': str, # Software identifier
'sw_ver': str, # Software version
'sw_sys': str, # System information
},
# Additional metadata
'clock': int, # Logical clock value
'client': str, # Client that sent the task
'root_id': str, # Root task ID (for task chains)
'parent_id': str, # Parent task ID (for task groups)
'children': [str], # Child task IDs
}Different event types that can occur during task execution lifecycle.
TaskEventTypes = {
'task-sent': {
'description': 'Task was sent to broker',
'fields': ['uuid', 'name', 'args', 'kwargs', 'eta', 'expires']
},
'task-received': {
'description': 'Worker received task from broker',
'fields': ['uuid', 'name', 'hostname', 'timestamp']
},
'task-started': {
'description': 'Worker started executing task',
'fields': ['uuid', 'hostname', 'timestamp', 'pid']
},
'task-succeeded': {
'description': 'Task completed successfully',
'fields': ['uuid', 'result', 'runtime', 'hostname', 'timestamp']
},
'task-failed': {
'description': 'Task execution failed',
'fields': ['uuid', 'exception', 'traceback', 'hostname', 'timestamp']
},
'task-retried': {
'description': 'Task is being retried',
'fields': ['uuid', 'reason', 'traceback', 'hostname', 'timestamp']
},
'task-revoked': {
'description': 'Task was revoked/cancelled',
'fields': ['uuid', 'hostname', 'timestamp']
}
}from flower.utils.tasks import iter_tasks, get_task_by_id
from flower.events import Events
# Assume we have an events instance with task data
events = Events(celery_app, io_loop)
# Get all failed tasks
failed_tasks = list(iter_tasks(
events,
state='FAILURE',
limit=100
))
print(f"Found {len(failed_tasks)} failed tasks")
# Get tasks from specific worker
worker_tasks = list(iter_tasks(
events,
worker='celery@worker1',
limit=50
))
# Get recent tasks
from datetime import datetime, timedelta
recent_tasks = list(iter_tasks(
events,
received_start=datetime.now() - timedelta(hours=1),
sort_by='received'
))from flower.utils.search import parse_search_terms, satisfies_search_terms
# Parse search query
search_terms = parse_search_terms('name:my_task state:FAILURE')
# Find matching tasks
matching_tasks = []
for task in iter_tasks(events):
if satisfies_search_terms(task, search_terms):
matching_tasks.append(task)
# Search with text query
text_search = parse_search_terms('error database connection')
error_tasks = [
task for task in iter_tasks(events, state='FAILURE')
if satisfies_search_terms(task, text_search)
]from flower.api.tasks import TaskApply, TaskAsyncApply, TaskRevoke
# Execute task synchronously
async def execute_task():
handler = TaskApply()
result = await handler.post(
'my_task',
args=[1, 2, 3],
kwargs={'timeout': 30}
)
print(f"Task result: {result}")
# Execute task asynchronously
async def async_execute():
handler = TaskAsyncApply()
response = await handler.post(
'long_running_task',
args=['data'],
kwargs={'priority': 5}
)
task_id = response['task-id']
print(f"Task submitted: {task_id}")
# Revoke task
async def revoke_task():
handler = TaskRevoke()
await handler.post('task-uuid-here', terminate=True)from flower.api.tasks import TaskResult, TaskInfo
# Get task result
async def get_result():
handler = TaskResult()
result = await handler.get('task-uuid-here', timeout=10)
if result['state'] == 'SUCCESS':
print(f"Task completed: {result['result']}")
elif result['state'] == 'FAILURE':
print(f"Task failed: {result['traceback']}")
# Get detailed task information
async def get_task_info():
handler = TaskInfo()
info = await handler.get('task-uuid-here')
print(f"Task: {info['name']}")
print(f"State: {info['state']}")
print(f"Worker: {info['hostname']}")
print(f"Runtime: {info.get('runtime', 'N/A')} seconds")# Process tasks in batches
def process_tasks_batch(events, batch_size=1000):
offset = 0
while True:
batch = list(iter_tasks(
events,
limit=batch_size,
offset=offset,
sort_by='received'
))
if not batch:
break
# Process batch
for task in batch:
process_single_task(task)
offset += batch_size
# Find and revoke failed tasks
async def cleanup_failed_tasks():
failed_tasks = iter_tasks(events, state='FAILURE')
for task in failed_tasks:
if should_revoke_task(task):
await task_revoke(task['uuid'])from collections import Counter
from datetime import datetime, timedelta
def analyze_task_performance(events):
"""Generate task performance analytics."""
# Get tasks from last 24 hours
yesterday = datetime.now() - timedelta(days=1)
recent_tasks = list(iter_tasks(
events,
received_start=yesterday,
sort_by='received'
))
# Task counts by state
state_counts = Counter(task['state'] for task in recent_tasks)
# Task counts by name
name_counts = Counter(task['name'] for task in recent_tasks)
# Average runtime by task name
runtime_by_name = {}
for task_name in name_counts:
runtimes = [
task['runtime'] for task in recent_tasks
if task['name'] == task_name and task.get('runtime')
]
if runtimes:
runtime_by_name[task_name] = sum(runtimes) / len(runtimes)
return {
'total_tasks': len(recent_tasks),
'state_distribution': dict(state_counts),
'task_distribution': dict(name_counts),
'average_runtimes': runtime_by_name,
'success_rate': state_counts['SUCCESS'] / len(recent_tasks) if recent_tasks else 0
}Task management includes comprehensive error handling for various scenarios:
# Handle task execution errors
try:
result = await task_apply('my_task', args=[1, 2, 3])
except Exception as e:
if 'timeout' in str(e).lower():
print("Task execution timed out")
elif 'not registered' in str(e).lower():
print("Task not found on workers")
else:
print(f"Task execution failed: {e}")
# Handle missing task results
try:
result = await get_task_result('task-id')
except Exception as e:
if 'no such task' in str(e).lower():
print("Task not found in result backend")
else:
print(f"Could not retrieve result: {e}")
# Handle search and filtering edge cases
def safe_task_search(events, **filters):
try:
return list(iter_tasks(events, **filters))
except Exception as e:
print(f"Task search failed: {e}")
return []limit and offset) for large task datasetsInstall with Tessl CLI
npx tessl i tessl/pypi-flower