Web-based tool for monitoring and administrating Celery clusters with real-time task tracking and worker management.
—
Utility functions for search, template formatting, and general operations supporting Flower's monitoring and management capabilities.
Advanced search functionality for finding tasks and filtering data.
def parse_search_terms(raw_search_value):
"""
Parse search query string into structured search terms.
Args:
raw_search_value (str): Raw search query string
Returns:
list: Parsed search terms with field specifications
Supports field-specific searches like 'name:my_task state:FAILURE'
and general text searches across task content.
"""
def satisfies_search_terms(task, search_terms):
"""
Check if task matches search criteria.
Args:
task (dict): Task object to check
search_terms (list): Parsed search terms
Returns:
bool: True if task matches all search terms
"""
def stringified_dict_contains_value(key, value, str_dict):
"""
Search for value in stringified dictionary.
Args:
key (str): Dictionary key to search
value (str): Value to find
str_dict (str): Stringified dictionary content
Returns:
bool: True if value found in specified key
"""
def preprocess_search_value(raw_value):
"""
Clean and normalize search values.
Args:
raw_value (str): Raw search value
Returns:
str: Cleaned search value
"""
def task_args_contains_search_args(task_args, search_args):
"""
Check if task arguments contain search terms.
Args:
task_args (list): Task arguments to search
search_args (list): Search terms to find
Returns:
bool: True if arguments contain search terms
"""Formatting and display utilities for the web interface.
def format_time(time, tz):
"""
Format timestamp with timezone.
Args:
time (float): Unix timestamp
tz (str): Timezone identifier
Returns:
str: Formatted time string
"""
def humanize(obj, type, length):
"""
Humanize various object types for display.
Args:
obj: Object to humanize
type (str): Object type hint
length (int): Maximum display length
Returns:
str: Human-readable representation
"""
# Template constants
KEYWORDS_UP = ['PENDING', 'SUCCESS', 'FAILURE', 'RETRY', 'REVOKED']
KEYWORDS_DOWN = ['null', 'none', 'undefined']
UUID_REGEX = r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}'Common utility functions for configuration and operations.
def gen_cookie_secret():
"""
Generate secure cookie secret.
Returns:
str: Base64-encoded random secret suitable for cookies
"""
def bugreport(app):
"""
Generate bug report information.
Args:
app: Celery application instance
Returns:
str: Formatted bug report with system and configuration info
"""
def abs_path(path):
"""
Convert relative path to absolute path.
Args:
path (str): File path (relative or absolute)
Returns:
str: Absolute path
"""
def prepend_url(url, prefix):
"""
Add URL prefix for reverse proxy setups.
Args:
url (str): Original URL
prefix (str): URL prefix to prepend
Returns:
str: URL with prefix added
"""
def strtobool(val):
"""
Convert string to boolean value.
Args:
val (str): String value to convert
Returns:
bool: Boolean representation
Recognizes: 'true', 'false', 'yes', 'no', '1', '0' (case-insensitive)
"""from flower.utils.search import parse_search_terms, satisfies_search_terms
# Parse complex search query
search_query = "name:my_task state:FAILURE error database"
search_terms = parse_search_terms(search_query)
# Filter tasks based on search
matching_tasks = []
for task in all_tasks:
if satisfies_search_terms(task, search_terms):
matching_tasks.append(task)
# Search in task arguments
task_args = [1, 2, "database_connection", {"host": "db.example.com"}]
search_args = ["database", "connection"]
if task_args_contains_search_args(task_args, search_args):
print("Task arguments contain search terms")from flower.utils.template import format_time, humanize
from datetime import datetime
# Format timestamp
timestamp = datetime.now().timestamp()
formatted = format_time(timestamp, 'UTC')
print(f"Task received: {formatted}")
# Humanize objects
large_data = {"key": "value"} * 1000
humanized = humanize(large_data, 'dict', 100)
print(f"Task args: {humanized}")from flower.utils import gen_cookie_secret, abs_path, strtobool
# Generate secure cookie secret
secret = gen_cookie_secret()
print(f"Cookie secret: {secret}")
# Convert relative paths
config_file = abs_path("../config/flower.conf")
print(f"Config file: {config_file}")
# String to boolean conversion
debug_mode = strtobool(os.environ.get('DEBUG', 'false'))
persistent_storage = strtobool(os.environ.get('PERSISTENT', 'true'))from flower.utils import bugreport
# Generate bug report
report = bugreport(celery_app)
print("Bug Report:")
print(report)
# Bug report includes:
# - Flower version
# - Celery version
# - Python version
# - Platform information
# - Broker configuration
# - Registered tasks
# - Configuration optionsThe search system supports advanced query syntax:
name:my_task # Search in task name
state:FAILURE # Filter by task state
worker:celery@host1 # Filter by worker
uuid:task-id-123 # Search by task IDdatabase error # Search across all text fields
"connection timeout" # Exact phrase search
error OR timeout # Boolean OR operationname:import_data state:FAILURE database
state:SUCCESS runtime:>10
worker:celery@worker1 name:process_*Template utilities provide variables for web interface templates:
template_vars = {
'time_formatter': format_time,
'humanizer': humanize,
'KEYWORDS_UP': KEYWORDS_UP,
'KEYWORDS_DOWN': KEYWORDS_DOWN,
'UUID_REGEX': UUID_REGEX,
}Utilities include robust error handling:
# Safe search term parsing
try:
terms = parse_search_terms(user_query)
except ValueError as e:
print(f"Invalid search query: {e}")
terms = []
# Safe boolean conversion
try:
value = strtobool(env_var)
except ValueError:
value = False # Default fallback
# Safe path conversion
try:
path = abs_path(relative_path)
except (OSError, ValueError):
path = None # Handle invalid pathsInstall with Tessl CLI
npx tessl i tessl/pypi-flower