CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-django-celery-beat

Database-backed periodic task scheduling for Django and Celery integration

Pending
Overview
Eval results
Files

database-scheduler.mddocs/

Database Scheduler

Celery beat scheduler implementation that reads periodic tasks from the database instead of configuration files, enabling dynamic task management and real-time schedule updates.

Capabilities

Database Scheduler Class

Main scheduler class that replaces Celery's default file-based scheduler.

class DatabaseScheduler(Scheduler):
    """
    Database-backed beat scheduler for Celery.
    
    Reads periodic tasks from Django database instead of configuration files,
    allowing dynamic task management through Django ORM and Admin interface.
    """
    # Class attributes
    Entry = ModelEntry  # Entry class for database-backed entries
    Model = PeriodicTask  # Django model for periodic tasks
    Changes = PeriodicTasks  # Model for tracking changes
    
    # Configuration constants
    DEFAULT_MAX_INTERVAL = 5  # Default max interval between database checks
    SCHEDULE_SYNC_MAX_INTERVAL = 300  # Max interval for schedule synchronization
    
    def setup_schedule(self): ...
    
    def all_as_schedule(self) -> dict: ...
    
    def enabled_models(self) -> list[PeriodicTask]: ...
    
    def enabled_models_qs(self) -> QuerySet[PeriodicTask]: ...
    
    def schedule_changed(self) -> bool: ...
    
    def sync(self): ...
    
    def update_from_dict(self, mapping: dict): ...
    
    def install_default_entries(self, data: dict): ...
    
    def reserve(self, entry: ModelEntry) -> ModelEntry: ...
    
    def schedules_equal(self, *args, **kwargs) -> bool: ...

Usage Examples:

# Configure Celery to use database scheduler
from celery import Celery

app = Celery('myapp')
app.conf.update(
    beat_scheduler='django_celery_beat.schedulers:DatabaseScheduler',
    # or use the shorthand:
    # beat_scheduler='django',
)

# Start beat service with database scheduler
# Command line:
# celery -A myapp beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
# or:
# celery -A myapp beat -l info -S django

# Programmatic scheduler usage (advanced)
from django_celery_beat.schedulers import DatabaseScheduler

scheduler = DatabaseScheduler(app=app)
scheduler.setup_schedule()

# Check if schedule has changed
if scheduler.schedule_changed():
    scheduler.sync()
    
# Get all enabled tasks as schedule dict
schedule_dict = scheduler.all_as_schedule()
print(f"Found {len(schedule_dict)} enabled tasks")

Model Entry Class

Database-backed schedule entry that represents individual periodic tasks.

class ModelEntry(ScheduleEntry):
    """
    Database-backed schedule entry for periodic tasks.
    
    Represents a single periodic task with its schedule and configuration,
    managing execution state and database synchronization.
    """
    model_schedules = {
        schedules.crontab: 'crontab',
        schedules.schedule: 'interval', 
        schedules.solar: 'solar',
        clocked: 'clocked',
    }
    
    save_fields = [
        'last_run_at', 'total_run_count', 'no_changes',
    ]
    
    def __init__(self, model: PeriodicTask, app: Celery = None): ...
    
    def is_due(self) -> tuple[bool, float]: ...
    
    def save(self): ...
    
    @classmethod
    def to_model_schedule(cls, schedule: schedules.BaseSchedule) -> dict: ...
    
    @classmethod
    def from_entry(cls, name: str, app: Celery = None, **entry_kwargs) -> 'ModelEntry': ...

Usage Examples:

from django_celery_beat.schedulers import ModelEntry
from django_celery_beat.models import PeriodicTask

# Create model entry from database task
task = PeriodicTask.objects.get(name='my_task')
entry = ModelEntry(task)

# Check if task is due
is_due, next_time = entry.is_due()
if is_due:
    print(f"Task {entry.name} is due for execution")
    entry.save()  # Update last_run_at and total_run_count
else:
    print(f"Task {entry.name} next due in {next_time} seconds")

# Create entry from configuration dict (advanced usage)
entry_config = {
    'task': 'myapp.tasks.example_task',
    'schedule': schedules.schedule(run_every=30),  # Every 30 seconds
    'args': (),
    'kwargs': {},
    'options': {'queue': 'default'}
}
entry = ModelEntry.from_entry('example_task', **entry_config)

Scheduler Configuration

Beat Service Setup

Configure and run the Celery beat service with database scheduler.

# settings.py or celery.py
from celery import Celery

app = Celery('myproject')

# Method 1: Full scheduler path
app.conf.beat_scheduler = 'django_celery_beat.schedulers:DatabaseScheduler'

# Method 2: Short form (requires django-celery-beat entry point)
app.conf.beat_scheduler = 'django'

# Additional beat scheduler configuration
app.conf.update(
    beat_max_loop_interval=60,  # Max seconds between database checks
    beat_sync_every=0,  # Sync immediately on changes (0 = immediate)
)

Django Integration

# settings.py
INSTALLED_APPS = [
    # ... other apps
    'django_celery_beat',
]

# Optional: Configure timezone for cron schedules
CELERY_TIMEZONE = 'America/New_York'

# Database configuration (standard Django)
DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.postgresql',
        'NAME': 'myproject',
        # ... other database settings
    }
}

Command Line Usage

# Start beat service with database scheduler
celery -A myproject beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler

# Short form using entry point
celery -A myproject beat -l info -S django

# Combined worker and beat (development only)
celery -A myproject worker --beat --scheduler django --loglevel=info

# Beat service with custom settings
celery -A myproject beat -l info -S django --max-interval=30

Advanced Scheduler Operations

Schedule Synchronization

The scheduler automatically detects and synchronizes changes from the database.

from django_celery_beat.schedulers import DatabaseScheduler
from django_celery_beat.models import PeriodicTasks

# Manual schedule sync trigger
scheduler = DatabaseScheduler(app)

# Check if database has changes
if scheduler.schedule_changed():
    print("Database schedule has changed, syncing...")
    scheduler.sync()

# Force schedule reload
scheduler.setup_schedule()

# Manually trigger change detection
PeriodicTasks.update_changed()

Custom Scheduler Extensions

from django_celery_beat.schedulers import DatabaseScheduler
from django_celery_beat.models import PeriodicTask

class CustomDatabaseScheduler(DatabaseScheduler):
    """
    Extended database scheduler with custom behavior.
    """
    
    def enabled_models_qs(self):
        """Override to add custom filtering."""
        qs = super().enabled_models_qs()
        # Add custom filtering, e.g., by environment
        return qs.filter(description__contains='production')
    
    def sync(self):
        """Override to add custom sync behavior."""
        print("Performing custom sync operations...")
        super().sync()
        print("Custom sync completed")
    
    def install_default_entries(self, data):
        """Override to customize default task installation."""
        # Add custom default tasks
        custom_defaults = {
            'custom-cleanup': {
                'task': 'myapp.tasks.custom_cleanup',
                'schedule': schedules.crontab(hour=1, minute=0),
                'options': {'queue': 'maintenance'}
            }
        }
        data.update(custom_defaults)
        super().install_default_entries(data)

# Use custom scheduler
app.conf.beat_scheduler = 'myproject.schedulers:CustomDatabaseScheduler'

Monitoring and Debugging

from django_celery_beat.schedulers import DatabaseScheduler
from django_celery_beat.models import PeriodicTask, PeriodicTasks

def debug_scheduler_state():
    """Debug function to inspect scheduler state."""
    
    # Check enabled tasks
    enabled_tasks = PeriodicTask.objects.filter(enabled=True)
    print(f"Enabled tasks: {enabled_tasks.count()}")
    
    for task in enabled_tasks:
        print(f"  - {task.name}: {task.task}")
        print(f"    Schedule: {task.scheduler}")
        print(f"    Last run: {task.last_run_at}")
        print(f"    Run count: {task.total_run_count}")
    
    # Check change tracking
    last_change = PeriodicTasks.last_change()
    print(f"Last schedule change: {last_change}")
    
    # Test scheduler detection
    scheduler = DatabaseScheduler(app)
    changed = scheduler.schedule_changed()
    print(f"Scheduler detects changes: {changed}")

# Performance monitoring
def monitor_scheduler_performance():
    """Monitor scheduler database query performance."""
    from django.db import connection
    from django.conf import settings
    
    # Enable query logging
    settings.LOGGING = {
        'version': 1,
        'handlers': {
            'console': {
                'class': 'logging.StreamHandler',
            },
        },
        'loggers': {
            'django.db.backends': {
                'handlers': ['console'],
                'level': 'DEBUG',
            },
        },
    }
    
    scheduler = DatabaseScheduler(app)
    
    # Monitor enabled_models_qs performance
    with connection.cursor() as cursor:
        initial_queries = len(connection.queries)
        tasks = scheduler.enabled_models()
        final_queries = len(connection.queries)
        print(f"Scheduler loaded {len(tasks)} tasks with {final_queries - initial_queries} queries")

Error Handling

Common Scheduler Errors

from django_celery_beat.schedulers import DatabaseScheduler
from django.db import OperationalError
import logging

logger = logging.getLogger(__name__)

def robust_scheduler_setup():
    """Set up scheduler with error handling."""
    try:
        scheduler = DatabaseScheduler(app)
        scheduler.setup_schedule()
        return scheduler
    except OperationalError as e:
        logger.error(f"Database connection error in scheduler: {e}")
        # Fallback to empty schedule or retry logic
        return None
    except Exception as e:
        logger.error(f"Unexpected scheduler error: {e}")
        return None

def safe_schedule_sync(scheduler):
    """Safely sync schedule with error handling."""
    try:
        if scheduler and scheduler.schedule_changed():
            scheduler.sync()
            logger.info("Schedule synchronized successfully")
    except Exception as e:
        logger.error(f"Error syncing schedule: {e}")
        # Continue with current schedule

Scheduler Health Checks

from django_celery_beat.models import PeriodicTask, PeriodicTasks
from django.core.management.base import BaseCommand

class Command(BaseCommand):
    """Management command to check scheduler health."""
    
    def handle(self, *args, **options):
        # Check database connectivity
        try:
            task_count = PeriodicTask.objects.count()
            self.stdout.write(f"✓ Database accessible, {task_count} tasks found")
        except Exception as e:
            self.stdout.write(f"✗ Database error: {e}")
            return
        
        # Check for tasks with invalid schedules
        invalid_tasks = []
        for task in PeriodicTask.objects.filter(enabled=True):
            try:
                _ = task.schedule  # This will raise if schedule is invalid
            except Exception as e:
                invalid_tasks.append((task.name, str(e)))
        
        if invalid_tasks:
            self.stdout.write("✗ Invalid schedules found:")
            for name, error in invalid_tasks:
                self.stdout.write(f"  - {name}: {error}")
        else:
            self.stdout.write("✓ All enabled tasks have valid schedules")
        
        # Check change tracking
        try:
            last_change = PeriodicTasks.last_change()
            self.stdout.write(f"✓ Change tracking working, last change: {last_change}")
        except Exception as e:
            self.stdout.write(f"✗ Change tracking error: {e}")

Install with Tessl CLI

npx tessl i tessl/pypi-django-celery-beat

docs

admin-interface.md

database-scheduler.md

index.md

schedule-models.md

task-management.md

validation-utilities.md

tile.json