Database-backed periodic task scheduling for Django and Celery integration
—
Celery beat scheduler implementation that reads periodic tasks from the database instead of configuration files, enabling dynamic task management and real-time schedule updates.
Main scheduler class that replaces Celery's default file-based scheduler.
class DatabaseScheduler(Scheduler):
"""
Database-backed beat scheduler for Celery.
Reads periodic tasks from Django database instead of configuration files,
allowing dynamic task management through Django ORM and Admin interface.
"""
# Class attributes
Entry = ModelEntry # Entry class for database-backed entries
Model = PeriodicTask # Django model for periodic tasks
Changes = PeriodicTasks # Model for tracking changes
# Configuration constants
DEFAULT_MAX_INTERVAL = 5 # Default max interval between database checks
SCHEDULE_SYNC_MAX_INTERVAL = 300 # Max interval for schedule synchronization
def setup_schedule(self): ...
def all_as_schedule(self) -> dict: ...
def enabled_models(self) -> list[PeriodicTask]: ...
def enabled_models_qs(self) -> QuerySet[PeriodicTask]: ...
def schedule_changed(self) -> bool: ...
def sync(self): ...
def update_from_dict(self, mapping: dict): ...
def install_default_entries(self, data: dict): ...
def reserve(self, entry: ModelEntry) -> ModelEntry: ...
def schedules_equal(self, *args, **kwargs) -> bool: ...Usage Examples:
# Configure Celery to use database scheduler
from celery import Celery
app = Celery('myapp')
app.conf.update(
beat_scheduler='django_celery_beat.schedulers:DatabaseScheduler',
# or use the shorthand:
# beat_scheduler='django',
)
# Start beat service with database scheduler
# Command line:
# celery -A myapp beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
# or:
# celery -A myapp beat -l info -S django
# Programmatic scheduler usage (advanced)
from django_celery_beat.schedulers import DatabaseScheduler
scheduler = DatabaseScheduler(app=app)
scheduler.setup_schedule()
# Check if schedule has changed
if scheduler.schedule_changed():
scheduler.sync()
# Get all enabled tasks as schedule dict
schedule_dict = scheduler.all_as_schedule()
print(f"Found {len(schedule_dict)} enabled tasks")Database-backed schedule entry that represents individual periodic tasks.
class ModelEntry(ScheduleEntry):
"""
Database-backed schedule entry for periodic tasks.
Represents a single periodic task with its schedule and configuration,
managing execution state and database synchronization.
"""
model_schedules = {
schedules.crontab: 'crontab',
schedules.schedule: 'interval',
schedules.solar: 'solar',
clocked: 'clocked',
}
save_fields = [
'last_run_at', 'total_run_count', 'no_changes',
]
def __init__(self, model: PeriodicTask, app: Celery = None): ...
def is_due(self) -> tuple[bool, float]: ...
def save(self): ...
@classmethod
def to_model_schedule(cls, schedule: schedules.BaseSchedule) -> dict: ...
@classmethod
def from_entry(cls, name: str, app: Celery = None, **entry_kwargs) -> 'ModelEntry': ...Usage Examples:
from django_celery_beat.schedulers import ModelEntry
from django_celery_beat.models import PeriodicTask
# Create model entry from database task
task = PeriodicTask.objects.get(name='my_task')
entry = ModelEntry(task)
# Check if task is due
is_due, next_time = entry.is_due()
if is_due:
print(f"Task {entry.name} is due for execution")
entry.save() # Update last_run_at and total_run_count
else:
print(f"Task {entry.name} next due in {next_time} seconds")
# Create entry from configuration dict (advanced usage)
entry_config = {
'task': 'myapp.tasks.example_task',
'schedule': schedules.schedule(run_every=30), # Every 30 seconds
'args': (),
'kwargs': {},
'options': {'queue': 'default'}
}
entry = ModelEntry.from_entry('example_task', **entry_config)Configure and run the Celery beat service with database scheduler.
# settings.py or celery.py
from celery import Celery
app = Celery('myproject')
# Method 1: Full scheduler path
app.conf.beat_scheduler = 'django_celery_beat.schedulers:DatabaseScheduler'
# Method 2: Short form (requires django-celery-beat entry point)
app.conf.beat_scheduler = 'django'
# Additional beat scheduler configuration
app.conf.update(
beat_max_loop_interval=60, # Max seconds between database checks
beat_sync_every=0, # Sync immediately on changes (0 = immediate)
)# settings.py
INSTALLED_APPS = [
# ... other apps
'django_celery_beat',
]
# Optional: Configure timezone for cron schedules
CELERY_TIMEZONE = 'America/New_York'
# Database configuration (standard Django)
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': 'myproject',
# ... other database settings
}
}# Start beat service with database scheduler
celery -A myproject beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
# Short form using entry point
celery -A myproject beat -l info -S django
# Combined worker and beat (development only)
celery -A myproject worker --beat --scheduler django --loglevel=info
# Beat service with custom settings
celery -A myproject beat -l info -S django --max-interval=30The scheduler automatically detects and synchronizes changes from the database.
from django_celery_beat.schedulers import DatabaseScheduler
from django_celery_beat.models import PeriodicTasks
# Manual schedule sync trigger
scheduler = DatabaseScheduler(app)
# Check if database has changes
if scheduler.schedule_changed():
print("Database schedule has changed, syncing...")
scheduler.sync()
# Force schedule reload
scheduler.setup_schedule()
# Manually trigger change detection
PeriodicTasks.update_changed()from django_celery_beat.schedulers import DatabaseScheduler
from django_celery_beat.models import PeriodicTask
class CustomDatabaseScheduler(DatabaseScheduler):
"""
Extended database scheduler with custom behavior.
"""
def enabled_models_qs(self):
"""Override to add custom filtering."""
qs = super().enabled_models_qs()
# Add custom filtering, e.g., by environment
return qs.filter(description__contains='production')
def sync(self):
"""Override to add custom sync behavior."""
print("Performing custom sync operations...")
super().sync()
print("Custom sync completed")
def install_default_entries(self, data):
"""Override to customize default task installation."""
# Add custom default tasks
custom_defaults = {
'custom-cleanup': {
'task': 'myapp.tasks.custom_cleanup',
'schedule': schedules.crontab(hour=1, minute=0),
'options': {'queue': 'maintenance'}
}
}
data.update(custom_defaults)
super().install_default_entries(data)
# Use custom scheduler
app.conf.beat_scheduler = 'myproject.schedulers:CustomDatabaseScheduler'from django_celery_beat.schedulers import DatabaseScheduler
from django_celery_beat.models import PeriodicTask, PeriodicTasks
def debug_scheduler_state():
"""Debug function to inspect scheduler state."""
# Check enabled tasks
enabled_tasks = PeriodicTask.objects.filter(enabled=True)
print(f"Enabled tasks: {enabled_tasks.count()}")
for task in enabled_tasks:
print(f" - {task.name}: {task.task}")
print(f" Schedule: {task.scheduler}")
print(f" Last run: {task.last_run_at}")
print(f" Run count: {task.total_run_count}")
# Check change tracking
last_change = PeriodicTasks.last_change()
print(f"Last schedule change: {last_change}")
# Test scheduler detection
scheduler = DatabaseScheduler(app)
changed = scheduler.schedule_changed()
print(f"Scheduler detects changes: {changed}")
# Performance monitoring
def monitor_scheduler_performance():
"""Monitor scheduler database query performance."""
from django.db import connection
from django.conf import settings
# Enable query logging
settings.LOGGING = {
'version': 1,
'handlers': {
'console': {
'class': 'logging.StreamHandler',
},
},
'loggers': {
'django.db.backends': {
'handlers': ['console'],
'level': 'DEBUG',
},
},
}
scheduler = DatabaseScheduler(app)
# Monitor enabled_models_qs performance
with connection.cursor() as cursor:
initial_queries = len(connection.queries)
tasks = scheduler.enabled_models()
final_queries = len(connection.queries)
print(f"Scheduler loaded {len(tasks)} tasks with {final_queries - initial_queries} queries")from django_celery_beat.schedulers import DatabaseScheduler
from django.db import OperationalError
import logging
logger = logging.getLogger(__name__)
def robust_scheduler_setup():
"""Set up scheduler with error handling."""
try:
scheduler = DatabaseScheduler(app)
scheduler.setup_schedule()
return scheduler
except OperationalError as e:
logger.error(f"Database connection error in scheduler: {e}")
# Fallback to empty schedule or retry logic
return None
except Exception as e:
logger.error(f"Unexpected scheduler error: {e}")
return None
def safe_schedule_sync(scheduler):
"""Safely sync schedule with error handling."""
try:
if scheduler and scheduler.schedule_changed():
scheduler.sync()
logger.info("Schedule synchronized successfully")
except Exception as e:
logger.error(f"Error syncing schedule: {e}")
# Continue with current schedulefrom django_celery_beat.models import PeriodicTask, PeriodicTasks
from django.core.management.base import BaseCommand
class Command(BaseCommand):
"""Management command to check scheduler health."""
def handle(self, *args, **options):
# Check database connectivity
try:
task_count = PeriodicTask.objects.count()
self.stdout.write(f"✓ Database accessible, {task_count} tasks found")
except Exception as e:
self.stdout.write(f"✗ Database error: {e}")
return
# Check for tasks with invalid schedules
invalid_tasks = []
for task in PeriodicTask.objects.filter(enabled=True):
try:
_ = task.schedule # This will raise if schedule is invalid
except Exception as e:
invalid_tasks.append((task.name, str(e)))
if invalid_tasks:
self.stdout.write("✗ Invalid schedules found:")
for name, error in invalid_tasks:
self.stdout.write(f" - {name}: {error}")
else:
self.stdout.write("✓ All enabled tasks have valid schedules")
# Check change tracking
try:
last_change = PeriodicTasks.last_change()
self.stdout.write(f"✓ Change tracking working, last change: {last_change}")
except Exception as e:
self.stdout.write(f"✗ Change tracking error: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-django-celery-beat