CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-flower

Web-based tool for monitoring and administrating Celery clusters with real-time task tracking and worker management.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

tasks.mddocs/

Task Management

Complete task lifecycle management including execution, monitoring, filtering, search, and control operations for Celery distributed tasks.

Capabilities

Task Filtering and Iteration

Advanced task filtering and iteration capabilities for processing large numbers of tasks with various criteria.

def iter_tasks(events, limit=None, offset=0, type=None, worker=None, 
               state=None, sort_by=None, received_start=None, received_end=None,
               started_start=None, started_end=None, search=None):
    """
    Iterator for filtered tasks with pagination and sorting.
    
    Args:
        events: Events instance containing task data
        limit (int, optional): Maximum number of tasks to return
        offset (int, optional): Number of tasks to skip (pagination)
        type (str, optional): Filter by task name/type
        worker (str, optional): Filter by worker hostname
        state (str, optional): Filter by task state (PENDING, SUCCESS, FAILURE, etc.)
        sort_by (str, optional): Sort field ('name', 'state', 'received', 'started', 'runtime')
        received_start (datetime, optional): Filter tasks received after this time
        received_end (datetime, optional): Filter tasks received before this time
        started_start (datetime, optional): Filter tasks started after this time
        started_end (datetime, optional): Filter tasks started before this time
        search (str, optional): Search term for task content
        
    Yields:
        dict: Filtered task objects matching the specified criteria
        
    Supports complex filtering combinations and efficient pagination
    for large task datasets.
    """

def sort_tasks(tasks, sort_by):
    """
    Sort tasks by specified field.
    
    Args:
        tasks (list): List of task objects to sort
        sort_by (str): Sort field ('name', 'state', 'received', 'started', 'runtime')
        
    Returns:
        list: Sorted list of tasks
        
    Supports sorting by multiple fields with proper handling of None values
    and different data types.
    """

def get_task_by_id(events, task_id):
    """
    Retrieve specific task by UUID.
    
    Args:
        events: Events instance containing task data
        task_id (str): Task UUID to retrieve
        
    Returns:
        dict or None: Task object if found, None otherwise
    """

def as_dict(task):
    """
    Convert task object to dictionary representation.
    
    Args:
        task: Task object from Celery state
        
    Returns:
        dict: Task data as dictionary with all relevant fields
    """

Task Search and Filtering

Advanced search capabilities for finding tasks based on various criteria.

def parse_search_terms(raw_search_value):
    """
    Parse search query string into structured search terms.
    
    Args:
        raw_search_value (str): Raw search query string
        
    Returns:
        list: Parsed search terms with field specifications
        
    Supports field-specific searches like 'name:my_task' or 'state:FAILURE'
    and general text searches across task content.
    """

def satisfies_search_terms(task, search_terms):
    """
    Check if task matches search criteria.
    
    Args:
        task (dict): Task object to check
        search_terms (list): Parsed search terms from parse_search_terms
        
    Returns:
        bool: True if task matches all search terms
        
    Performs comprehensive search across task name, arguments, result,
    traceback, and other task metadata.
    """

def task_args_contains_search_args(task_args, search_args):
    """
    Check if task arguments contain search terms.
    
    Args:
        task_args (list): Task arguments
        search_args (list): Search terms to find
        
    Returns:
        bool: True if arguments contain search terms
        
    Searches within task arguments and keyword arguments for specified terms.
    """

Task State Constants

Standard Celery task states used throughout the system.

# Task states from Celery
TASK_STATES = [
    'PENDING',     # Task is waiting for execution
    'STARTED',     # Task has been started
    'SUCCESS',     # Task completed successfully  
    'FAILURE',     # Task failed with an exception
    'RETRY',       # Task is being retried
    'REVOKED',     # Task has been revoked/cancelled
]

# Sort key mappings for task sorting
sort_keys = {
    'name': lambda task: task.name or '',
    'state': lambda task: task.state or '',
    'received': lambda task: task.received or 0,
    'started': lambda task: task.started or 0,
    'runtime': lambda task: task.runtime or 0,
    'worker': lambda task: task.worker.hostname if task.worker else '',
}

Task Control Operations

Task Execution

Execute tasks remotely with various execution modes and options.

def task_apply(task_name, args=None, kwargs=None, **options):
    """
    Execute task synchronously and wait for result.
    
    Args:
        task_name (str): Name of task to execute
        args (list, optional): Task arguments
        kwargs (dict, optional): Task keyword arguments
        **options: Additional task options (queue, countdown, eta, etc.)
        
    Returns:
        dict: Task result and metadata
        
    Executes the task and waits for completion, returning the result
    or raising an exception if the task fails.
    """

def task_async_apply(task_name, args=None, kwargs=None, **options):
    """
    Execute task asynchronously without waiting for result.
    
    Args:
        task_name (str): Name of task to execute
        args (list, optional): Task arguments
        kwargs (dict, optional): Task keyword arguments
        **options: Additional task options
        
    Returns:
        dict: Task ID and submission metadata
        
    Submits the task for execution and returns immediately with task ID.
    """

def task_send_task(task_name, args=None, kwargs=None, **options):
    """
    Send task without requiring task definition on sender.
    
    Args:
        task_name (str): Name of task to send
        args (list, optional): Task arguments  
        kwargs (dict, optional): Task keyword arguments
        **options: Additional task options
        
    Returns:
        dict: Task ID and submission metadata
        
    Sends task using Celery's send_task, which doesn't require the
    task to be registered locally.
    """

Task Result Management

Retrieve and manage task results and execution status.

def get_task_result(task_id, timeout=None):
    """
    Get task result by ID.
    
    Args:
        task_id (str): Task UUID
        timeout (float, optional): Maximum time to wait for result
        
    Returns:
        dict: Task result data including:
            - result: Task return value
            - state: Current task state
            - traceback: Error traceback if failed
            - success: Boolean success status
            
    Retrieves result from the configured result backend.
    """

def task_abort(task_id):
    """
    Abort running task.
    
    Args:
        task_id (str): Task UUID to abort
        
    Returns:
        dict: Abort operation status
        
    Attempts to abort a running task if it supports abortion.
    Only works with AbortableTask instances.
    """

Task Control Commands

Remote control operations for managing task execution and behavior.

def task_revoke(task_id, terminate=False, signal='SIGTERM'):
    """
    Revoke/cancel a task.
    
    Args:
        task_id (str): Task UUID to revoke
        terminate (bool): Whether to terminate if already running
        signal (str): Signal to send if terminating ('SIGTERM', 'SIGKILL')
        
    Returns:
        dict: Revocation status
        
    Revokes a task, optionally terminating it if already executing.
    """

def task_rate_limit(task_name, rate_limit, workername=None):
    """
    Set rate limit for task type.
    
    Args:
        task_name (str): Name of task to limit
        rate_limit (str): Rate limit specification (e.g., '10/m', '1/s')
        workername (str, optional): Specific worker to apply limit
        
    Returns:
        dict: Rate limit operation status
        
    Sets execution rate limit for the specified task type.
    """

def task_timeout(task_name, soft=None, hard=None, workername=None):
    """
    Set timeout limits for task type.
    
    Args:
        task_name (str): Name of task to configure
        soft (float, optional): Soft timeout in seconds
        hard (float, optional): Hard timeout in seconds  
        workername (str, optional): Specific worker to apply timeouts
        
    Returns:
        dict: Timeout configuration status
        
    Configures soft and hard timeout limits for task execution.
    """

Task Data Structure

Core Task Information

Comprehensive task data structure containing all relevant execution and metadata information.

TaskInfo = {
    # Basic identification
    'uuid': str,              # Unique task identifier
    'name': str,              # Task name/type
    'state': str,             # Current task state
    'hostname': str,          # Worker hostname executing task
    
    # Timing information
    'timestamp': float,       # Task event timestamp
    'received': float,        # Time task was received by worker
    'started': float,         # Time task execution started
    'succeeded': float,       # Time task completed successfully
    'failed': float,          # Time task failed
    'retried': float,        # Time task was retried
    'revoked': float,        # Time task was revoked
    'runtime': float,        # Total execution time in seconds
    
    # Task parameters
    'args': list,            # Task positional arguments
    'kwargs': dict,          # Task keyword arguments
    'retries': int,          # Number of retry attempts
    'eta': str,              # Estimated time of arrival
    'expires': str,          # Task expiration time
    
    # Execution results
    'result': Any,           # Task return value (if successful)
    'traceback': str,        # Exception traceback (if failed)
    'exception': str,        # Exception message (if failed)
    
    # Routing information
    'queue': str,            # Queue name task was sent to
    'exchange': str,         # Exchange name
    'routing_key': str,      # Routing key used
    'priority': int,         # Message priority
    
    # Worker information
    'worker': {
        'hostname': str,     # Worker hostname
        'pid': int,          # Worker process ID
        'sw_ident': str,     # Software identifier
        'sw_ver': str,       # Software version
        'sw_sys': str,       # System information
    },
    
    # Additional metadata
    'clock': int,            # Logical clock value
    'client': str,           # Client that sent the task
    'root_id': str,          # Root task ID (for task chains)
    'parent_id': str,        # Parent task ID (for task groups)
    'children': [str],       # Child task IDs
}

Task Event Types

Different event types that can occur during task execution lifecycle.

TaskEventTypes = {
    'task-sent': {
        'description': 'Task was sent to broker',
        'fields': ['uuid', 'name', 'args', 'kwargs', 'eta', 'expires']
    },
    'task-received': {
        'description': 'Worker received task from broker',
        'fields': ['uuid', 'name', 'hostname', 'timestamp']
    },
    'task-started': {
        'description': 'Worker started executing task',
        'fields': ['uuid', 'hostname', 'timestamp', 'pid']
    },
    'task-succeeded': {
        'description': 'Task completed successfully',
        'fields': ['uuid', 'result', 'runtime', 'hostname', 'timestamp']
    },
    'task-failed': {
        'description': 'Task execution failed',
        'fields': ['uuid', 'exception', 'traceback', 'hostname', 'timestamp']
    },
    'task-retried': {
        'description': 'Task is being retried',
        'fields': ['uuid', 'reason', 'traceback', 'hostname', 'timestamp']
    },
    'task-revoked': {
        'description': 'Task was revoked/cancelled',
        'fields': ['uuid', 'hostname', 'timestamp']
    }
}

Usage Examples

Basic Task Filtering

from flower.utils.tasks import iter_tasks, get_task_by_id
from flower.events import Events

# Assume we have an events instance with task data
events = Events(celery_app, io_loop)

# Get all failed tasks
failed_tasks = list(iter_tasks(
    events,
    state='FAILURE',
    limit=100
))

print(f"Found {len(failed_tasks)} failed tasks")

# Get tasks from specific worker
worker_tasks = list(iter_tasks(
    events,
    worker='celery@worker1',
    limit=50
))

# Get recent tasks
from datetime import datetime, timedelta
recent_tasks = list(iter_tasks(
    events,
    received_start=datetime.now() - timedelta(hours=1),
    sort_by='received'
))

Advanced Task Search

from flower.utils.search import parse_search_terms, satisfies_search_terms

# Parse search query
search_terms = parse_search_terms('name:my_task state:FAILURE')

# Find matching tasks
matching_tasks = []
for task in iter_tasks(events):
    if satisfies_search_terms(task, search_terms):
        matching_tasks.append(task)

# Search with text query
text_search = parse_search_terms('error database connection')
error_tasks = [
    task for task in iter_tasks(events, state='FAILURE')
    if satisfies_search_terms(task, text_search)
]

Task Execution and Control

from flower.api.tasks import TaskApply, TaskAsyncApply, TaskRevoke

# Execute task synchronously
async def execute_task():
    handler = TaskApply()
    result = await handler.post(
        'my_task',
        args=[1, 2, 3],
        kwargs={'timeout': 30}
    )
    print(f"Task result: {result}")

# Execute task asynchronously
async def async_execute():
    handler = TaskAsyncApply()
    response = await handler.post(
        'long_running_task',
        args=['data'],
        kwargs={'priority': 5}
    )
    task_id = response['task-id']
    print(f"Task submitted: {task_id}")

# Revoke task
async def revoke_task():
    handler = TaskRevoke()
    await handler.post('task-uuid-here', terminate=True)

Task Result Monitoring

from flower.api.tasks import TaskResult, TaskInfo

# Get task result
async def get_result():
    handler = TaskResult()
    result = await handler.get('task-uuid-here', timeout=10)
    
    if result['state'] == 'SUCCESS':
        print(f"Task completed: {result['result']}")
    elif result['state'] == 'FAILURE':
        print(f"Task failed: {result['traceback']}")

# Get detailed task information
async def get_task_info():
    handler = TaskInfo()
    info = await handler.get('task-uuid-here')
    
    print(f"Task: {info['name']}")
    print(f"State: {info['state']}")
    print(f"Worker: {info['hostname']}")
    print(f"Runtime: {info.get('runtime', 'N/A')} seconds")

Bulk Task Operations

# Process tasks in batches
def process_tasks_batch(events, batch_size=1000):
    offset = 0
    
    while True:
        batch = list(iter_tasks(
            events,
            limit=batch_size,
            offset=offset,
            sort_by='received'
        ))
        
        if not batch:
            break
            
        # Process batch
        for task in batch:
            process_single_task(task)
            
        offset += batch_size

# Find and revoke failed tasks
async def cleanup_failed_tasks():
    failed_tasks = iter_tasks(events, state='FAILURE')
    
    for task in failed_tasks:
        if should_revoke_task(task):
            await task_revoke(task['uuid'])

Task Analytics

from collections import Counter
from datetime import datetime, timedelta

def analyze_task_performance(events):
    """Generate task performance analytics."""
    
    # Get tasks from last 24 hours
    yesterday = datetime.now() - timedelta(days=1)
    recent_tasks = list(iter_tasks(
        events,
        received_start=yesterday,
        sort_by='received'
    ))
    
    # Task counts by state
    state_counts = Counter(task['state'] for task in recent_tasks)
    
    # Task counts by name
    name_counts = Counter(task['name'] for task in recent_tasks)
    
    # Average runtime by task name
    runtime_by_name = {}
    for task_name in name_counts:
        runtimes = [
            task['runtime'] for task in recent_tasks
            if task['name'] == task_name and task.get('runtime')
        ]
        if runtimes:
            runtime_by_name[task_name] = sum(runtimes) / len(runtimes)
    
    return {
        'total_tasks': len(recent_tasks),
        'state_distribution': dict(state_counts),
        'task_distribution': dict(name_counts),
        'average_runtimes': runtime_by_name,
        'success_rate': state_counts['SUCCESS'] / len(recent_tasks) if recent_tasks else 0
    }

Error Handling and Edge Cases

Task management includes comprehensive error handling for various scenarios:

# Handle task execution errors
try:
    result = await task_apply('my_task', args=[1, 2, 3])
except Exception as e:
    if 'timeout' in str(e).lower():
        print("Task execution timed out")
    elif 'not registered' in str(e).lower():
        print("Task not found on workers")
    else:
        print(f"Task execution failed: {e}")

# Handle missing task results
try:
    result = await get_task_result('task-id')
except Exception as e:
    if 'no such task' in str(e).lower():
        print("Task not found in result backend")
    else:
        print(f"Could not retrieve result: {e}")

# Handle search and filtering edge cases
def safe_task_search(events, **filters):
    try:
        return list(iter_tasks(events, **filters))
    except Exception as e:
        print(f"Task search failed: {e}")
        return []

Performance Considerations

  • Use pagination (limit and offset) for large task datasets
  • Consider memory usage when processing many tasks
  • Use specific filters to reduce dataset size before processing
  • Cache frequent queries when possible
  • Monitor search performance with complex queries
  • Use appropriate sorting fields based on query patterns

Install with Tessl CLI

npx tessl i tessl/pypi-flower

docs

application.md

authentication.md

broker.md

command-line.md

events.md

index.md

rest-api.md

tasks.md

utilities.md

web-interface.md

workers.md

tile.json