Distributed Task Queue for Python that enables asynchronous task execution across multiple workers and machines
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Periodic task scheduling capabilities including cron-like scheduling, interval-based schedules, solar event timing, and beat scheduler management for running recurring tasks in distributed environments.
Unix cron-style scheduling with minute, hour, day, and month specifications for precise timing control.
class crontab:
def __init__(
self,
minute='*',
hour='*',
day_of_week='*',
day_of_month='*',
month_of_year='*',
nowfun=None,
app=None
):
"""
Create crontab schedule.
Args:
minute (str): Minute field (0-59, '*', '*/5', '10,20,30')
hour (str): Hour field (0-23, '*', '*/2', '9-17')
day_of_week (str): Day of week (0-6, mon-sun, '*')
day_of_month (str): Day of month (1-31, '*', '1,15')
month_of_year (str): Month (1-12, jan-dec, '*')
nowfun (callable): Function returning current time
app: Celery app instance
"""
def is_due(self, last_run_at):
"""
Check if schedule is due for execution.
Args:
last_run_at (datetime): Last execution time
Returns:
tuple: (is_due, next_time_to_run)
"""
def remaining_estimate(self, last_run_at):
"""
Estimate time until next execution.
Args:
last_run_at (datetime): Last execution time
Returns:
timedelta: Time remaining until next run
"""
def crontab(minute='*', hour='*', day_of_week='*', day_of_month='*', month_of_year='*'):
"""
Create cron-style schedule.
Args:
minute (str): Minutes (0-59, *, */N, ranges, lists)
hour (str): Hours (0-23, *, */N, ranges, lists)
day_of_week (str): Days of week (0-6 or names, *, */N, ranges, lists)
day_of_month (str): Days of month (1-31, *, */N, ranges, lists)
month_of_year (str): Months (1-12 or names, *, */N, ranges, lists)
Returns:
crontab instance
"""Simple interval-based scheduling using timedelta objects for regular recurring execution.
class schedule:
def __init__(self, run_every, relative=False, nowfun=None, app=None):
"""
Create interval schedule.
Args:
run_every (timedelta): Interval between executions
relative (bool): If True, schedule relative to last run
nowfun (callable): Function returning current time
app: Celery app instance
"""
def is_due(self, last_run_at):
"""
Check if schedule is due.
Args:
last_run_at (datetime): Last execution time
Returns:
tuple: (is_due, next_time_to_run)
"""
def remaining_estimate(self, last_run_at):
"""
Estimate time until next execution.
Args:
last_run_at (datetime): Last execution time
Returns:
timedelta: Time remaining
"""
def schedule(run_every, relative=False):
"""
Create interval-based schedule.
Args:
run_every (timedelta): Time between executions
relative (bool): Schedule relative to last run vs absolute intervals
Returns:
schedule instance
"""Astronomical event-based scheduling for sunrise, sunset, and other solar events at specific geographic locations.
class solar:
def __init__(self, event, lat, lon, nowfun=None, app=None):
"""
Create solar event schedule.
Args:
event (str): Solar event ('sunrise', 'sunset', 'dawn_civil', 'dusk_civil')
lat (float): Latitude coordinate
lon (float): Longitude coordinate
nowfun (callable): Function returning current time
app: Celery app instance
"""
def is_due(self, last_run_at):
"""
Check if solar event schedule is due.
Args:
last_run_at (datetime): Last execution time
Returns:
tuple: (is_due, next_time_to_run)
"""
def remaining_estimate(self, last_run_at):
"""
Estimate time until next solar event.
Args:
last_run_at (datetime): Last execution time
Returns:
timedelta: Time remaining
"""
def solar(event, lat, lon):
"""
Create solar event schedule.
Args:
event (str): Solar event type
lat (float): Latitude (-90 to 90)
lon (float): Longitude (-180 to 180)
Returns:
solar instance
"""Core beat scheduler components for managing and executing periodic tasks.
class Scheduler:
def __init__(self, app, max_interval=None, Producer=None, lazy=False, **kwargs):
"""
Base scheduler class.
Args:
app: Celery application instance
max_interval (int): Maximum sleep interval between checks
Producer: Message producer class
lazy (bool): Don't start immediately
"""
def setup_schedule(self):
"""Initialize schedule from configuration."""
def sync(self):
"""Sync schedule with storage backend."""
def close(self):
"""Close scheduler and cleanup resources."""
def add(self, **kwargs):
"""
Add schedule entry.
Args:
**kwargs: Schedule entry parameters
"""
def tick(self):
"""
Check schedule and run due tasks.
Returns:
float: Seconds until next check needed
"""
@property
def schedule(self):
"""Current schedule dictionary."""
class PersistentScheduler(Scheduler):
def __init__(self, *args, **kwargs):
"""Scheduler that persists schedule to disk."""
def setup_schedule(self):
"""Load schedule from persistent storage."""
def sync(self):
"""Save schedule to persistent storage."""
class ScheduleEntry:
def __init__(
self,
task=None,
schedule=None,
args=None,
kwargs=None,
options=None,
name=None,
**kw
):
"""
Individual schedule entry.
Args:
task (str): Task name to execute
schedule: Schedule object (crontab, schedule, solar)
args (tuple): Task arguments
kwargs (dict): Task keyword arguments
options (dict): Task execution options
name (str): Entry name/identifier
"""
def is_due(self):
"""
Check if entry is due for execution.
Returns:
tuple: (is_due, next_time_to_run)
"""
def __call__(self, sender=None):
"""Execute the scheduled task."""
def update(self, other):
"""
Update entry with values from another entry.
Args:
other (ScheduleEntry): Entry to copy from
"""
class Service:
def __init__(self, app, max_interval=None, schedule_filename=None, scheduler_cls=None):
"""
Beat service for running periodic tasks.
Args:
app: Celery application instance
max_interval (int): Maximum sleep between schedule checks
schedule_filename (str): Persistent schedule file path
scheduler_cls: Custom scheduler class
"""
def start(self, embedded_process=False):
"""
Start beat service.
Args:
embedded_process (bool): Run as embedded process
"""
def sync(self):
"""Sync scheduler state."""
def stop(self, graceful=True):
"""
Stop beat service.
Args:
graceful (bool): Allow graceful shutdown
"""Configuration options and utilities for periodic task management.
# Configuration via app.conf.beat_schedule
BEAT_SCHEDULE = {
'task-name': {
'task': 'myapp.tasks.my_task',
'schedule': crontab(minute=0, hour=4), # Execute daily at 4:00 AM
'args': (16, 16),
'kwargs': {'verbose': True},
'options': {'expires': 15.0}
}
}
# Add periodic task programmatically
app.add_periodic_task(
crontab(hour=7, minute=30, day_of_week=1),
send_weekly_report.s('Weekly Report'),
name='weekly report'
)from celery import Celery
from celery.schedules import crontab
app = Celery('scheduler_example')
@app.task
def cleanup_temp_files():
# Cleanup logic here
return "Cleanup completed"
@app.task
def generate_report():
return "Report generated"
@app.task
def backup_database():
return "Database backed up"
# Configure periodic tasks
app.conf.beat_schedule = {
# Run every day at 2:30 AM
'daily-cleanup': {
'task': 'cleanup_temp_files',
'schedule': crontab(hour=2, minute=30),
},
# Run every Monday at 7:30 AM
'weekly-report': {
'task': 'generate_report',
'schedule': crontab(hour=7, minute=30, day_of_week=1),
},
# Run every 15 minutes during business hours
'frequent-check': {
'task': 'cleanup_temp_files',
'schedule': crontab(minute='*/15', hour='9-17'),
},
# Run on specific days of month
'monthly-backup': {
'task': 'backup_database',
'schedule': crontab(hour=1, minute=0, day_of_month='1,15'),
},
# Run multiple times per day
'multiple-daily': {
'task': 'cleanup_temp_files',
'schedule': crontab(hour='6,12,18', minute=0),
}
}
app.conf.timezone = 'UTC'from datetime import timedelta
from celery.schedules import schedule
@app.task
def periodic_health_check():
return "Health check completed"
@app.task
def process_queue():
return "Queue processed"
# Interval-based scheduling
app.conf.beat_schedule = {
# Run every 30 seconds
'health-check': {
'task': 'periodic_health_check',
'schedule': schedule(run_every=timedelta(seconds=30)),
},
# Run every 5 minutes
'process-queue': {
'task': 'process_queue',
'schedule': schedule(run_every=timedelta(minutes=5)),
'args': ('high_priority',),
'options': {'expires': 60} # Task expires after 60 seconds
},
# Run every hour with relative timing
'hourly-task': {
'task': 'cleanup_temp_files',
'schedule': schedule(run_every=timedelta(hours=1), relative=True),
}
}from celery.schedules import solar
@app.task
def morning_activation():
return "Morning systems activated"
@app.task
def evening_shutdown():
return "Evening systems shutdown"
# Solar event scheduling (San Francisco coordinates)
app.conf.beat_schedule = {
# Run at sunrise
'morning-startup': {
'task': 'morning_activation',
'schedule': solar('sunrise', 37.7749, -122.4194),
},
# Run at sunset
'evening-shutdown': {
'task': 'evening_shutdown',
'schedule': solar('sunset', 37.7749, -122.4194),
}
}from celery.schedules import crontab
from datetime import timedelta
@app.task
def dynamic_task(message):
return f"Executed: {message}"
# Add periodic tasks programmatically
def setup_dynamic_schedule():
# Add daily task
app.add_periodic_task(
crontab(hour=8, minute=0),
dynamic_task.s('Daily morning task'),
name='morning task'
)
# Add interval task
app.add_periodic_task(
timedelta(minutes=10),
dynamic_task.s('Every 10 minutes'),
name='frequent task'
)
# Add conditional task
import datetime
if datetime.datetime.now().weekday() == 0: # Monday
app.add_periodic_task(
crontab(hour=9, minute=0, day_of_week=1),
dynamic_task.s('Monday special task'),
name='monday task'
)
# Call during app initialization
setup_dynamic_schedule()from celery.beat import Scheduler, ScheduleEntry
import json
import os
class DatabaseScheduler(Scheduler):
"""Custom scheduler that stores schedule in database."""
def __init__(self, *args, **kwargs):
self.database_url = kwargs.pop('database_url', 'sqlite:///schedule.db')
super().__init__(*args, **kwargs)
def setup_schedule(self):
"""Load schedule from database."""
# Implementation would load from database
self.schedule = self.load_schedule_from_db()
def sync(self):
"""Save schedule to database."""
# Implementation would save to database
self.save_schedule_to_db()
def load_schedule_from_db(self):
"""Load schedule entries from database."""
# Database loading logic
return {}
def save_schedule_to_db(self):
"""Save schedule entries to database."""
# Database saving logic
pass
# Use custom scheduler
app.conf.beat_scheduler = 'myapp.schedulers:DatabaseScheduler'from celery.beat import Service
def run_beat_service():
"""Run beat service programmatically."""
# Create beat service
beat_service = Service(
app=app,
max_interval=60, # Check schedule every 60 seconds max
schedule_filename='celerybeat-schedule'
)
try:
# Start beat service (blocking)
beat_service.start()
except KeyboardInterrupt:
print("Beat service interrupted")
finally:
# Cleanup
beat_service.stop()
# Run beat as embedded process
def run_embedded_beat():
"""Run beat embedded in worker process."""
beat_service = Service(app=app)
beat_service.start(embedded_process=True)
# Continue with other work...
# Stop when needed
beat_service.stop()from celery.schedules import crontab, schedule
from datetime import datetime, timedelta
@app.task
def conditional_task():
# Task that only runs under certain conditions
current_hour = datetime.now().hour
if 9 <= current_hour <= 17: # Business hours
return "Task executed during business hours"
else:
return "Task skipped outside business hours"
@app.task
def escalating_task(attempt=1):
# Task with escalating retry intervals
try:
# Some operation that might fail
if attempt < 3:
raise Exception(f"Simulated failure, attempt {attempt}")
return f"Success on attempt {attempt}"
except Exception:
# Retry with increasing delay
escalating_task.apply_async(
args=(attempt + 1,),
countdown=attempt * 60 # 1min, 2min, 3min delays
)
# Complex scheduling configuration
app.conf.beat_schedule = {
# Task with complex cron expression
'business-hours-only': {
'task': 'conditional_task',
'schedule': crontab(minute='*/30', hour='9-17', day_of_week='1-5'),
'options': {
'expires': 15 * 60, # Expire after 15 minutes
'retry': True,
'retry_policy': {
'max_retries': 3,
'interval_start': 0,
'interval_step': 0.2,
'interval_max': 0.2,
}
}
},
# Weekend-only task
'weekend-maintenance': {
'task': 'cleanup_temp_files',
'schedule': crontab(hour=3, minute=0, day_of_week='6,0'), # Saturday and Sunday
'kwargs': {'deep_clean': True}
},
# End of month reporting
'month-end-report': {
'task': 'generate_report',
'schedule': crontab(hour=23, minute=59, day_of_month='28-31'), # Last days of month
'kwargs': {'report_type': 'monthly'}
}
}@app.task(bind=True)
def monitored_periodic_task(self):
"""Periodic task with built-in monitoring."""
try:
# Task logic here
result = "Task completed successfully"
# Update task state
self.update_state(
state='SUCCESS',
meta={'result': result, 'timestamp': datetime.now().isoformat()}
)
return result
except Exception as exc:
# Handle errors
self.update_state(
state='FAILURE',
meta={'error': str(exc), 'timestamp': datetime.now().isoformat()}
)
raise exc
# Configure with detailed options
app.conf.beat_schedule = {
'monitored-task': {
'task': 'monitored_periodic_task',
'schedule': crontab(minute='*/5'),
'options': {
'task_track_started': True,
'task_serializer': 'json',
'task_routes': {'monitored_periodic_task': {'queue': 'periodic'}}
}
}
}Install with Tessl CLI
npx tessl i tessl/pypi-celery