CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-django-celery-beat

Database-backed periodic task scheduling for Django and Celery integration

Pending
Overview
Eval results
Files

task-management.mddocs/

Task Management

Core models and utilities for defining, configuring, and tracking periodic tasks including task arguments, routing, scheduling, and execution management.

Capabilities

Periodic Task Model

Central model for defining periodic tasks with full configuration options.

class PeriodicTask(models.Model):
    """
    Model representing a periodic task to be executed by Celery.
    
    Must be associated with exactly one schedule type (interval, crontab, solar, or clocked).
    """
    # Task identification
    name: str  # Unique task name, max 200 chars
    task: str  # Celery task name to execute, max 200 chars
    
    # Schedule associations (exactly one must be set)
    interval: Optional[ForeignKey[IntervalSchedule]]
    crontab: Optional[ForeignKey[CrontabSchedule]]
    solar: Optional[ForeignKey[SolarSchedule]]
    clocked: Optional[ForeignKey[ClockedSchedule]]
    
    # Task arguments
    args: str  # JSON encoded positional arguments, default '[]'
    kwargs: str  # JSON encoded keyword arguments, default '{}'
    
    # Routing and delivery
    queue: Optional[str]  # Queue override, max 200 chars
    exchange: Optional[str]  # AMQP exchange override, max 200 chars
    routing_key: Optional[str]  # AMQP routing key override, max 200 chars
    headers: str  # JSON encoded AMQP headers, default '{}'
    priority: Optional[int]  # Task priority 0-255
    
    # Execution control
    expires: Optional[datetime.datetime]  # Absolute expiry time
    expire_seconds: Optional[int]  # Expiry in seconds from now
    one_off: bool  # Execute only once, default False
    start_time: Optional[datetime.datetime]  # When to start executing
    enabled: bool  # Whether task is enabled, default True
    
    # Execution tracking (auto-managed)
    last_run_at: Optional[datetime.datetime]  # Last execution time
    total_run_count: int  # Number of times executed, default 0
    date_changed: datetime.datetime  # Last modified timestamp
    
    # Documentation
    description: str  # Task description
    
    # Manager and internal attributes
    objects: PeriodicTaskQuerySet  # Custom QuerySet manager
    no_changes: bool  # Internal flag for change tracking
    
    @property
    def scheduler(self) -> Union[IntervalSchedule, CrontabSchedule, SolarSchedule, ClockedSchedule]: ...
    
    @property
    def schedule(self) -> schedules.BaseSchedule: ...
    
    @property
    def expires_(self) -> Optional[datetime.datetime]: ...
    
    def due_start_time(self, tz: Optional[tzinfo.tzinfo] = None) -> datetime.datetime: ...
    
    def validate_unique(self): ...

Usage Examples:

from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule
import json
from datetime import datetime, timedelta

# Create task with interval schedule
schedule = IntervalSchedule.objects.create(every=30, period=IntervalSchedule.MINUTES)
task = PeriodicTask.objects.create(
    name='Process queue every 30 minutes',
    task='myapp.tasks.process_queue',
    interval=schedule,
    args=json.dumps([]),
    kwargs=json.dumps({'priority': 'high'}),
    enabled=True
)

# Create task with cron schedule and expiry
cron_schedule = CrontabSchedule.objects.create(
    minute='0',
    hour='2',
    day_of_week='*',
    day_of_month='*',
    month_of_year='*'
)
task = PeriodicTask.objects.create(
    name='Daily backup at 2 AM',
    task='myapp.tasks.backup_database',
    crontab=cron_schedule,
    expires=datetime.utcnow() + timedelta(days=30),
    queue='backup_queue',
    description='Automated daily database backup'
)

# Create one-off task
task = PeriodicTask.objects.create(
    name='One-time data migration',
    task='myapp.tasks.migrate_data',
    interval=IntervalSchedule.objects.create(every=1, period=IntervalSchedule.SECONDS),
    one_off=True,
    args=json.dumps(['migration_v2'])
)

Task Change Tracking

Model for tracking when periodic tasks are modified to trigger scheduler updates.

class PeriodicTasks(models.Model):
    """
    Helper model to track when periodic task schedules change.
    
    Contains a single row with ID=1 that gets updated whenever tasks are modified.
    """
    ident: int  # Primary key, always 1
    last_update: datetime.datetime  # Timestamp of last change
    
    @classmethod
    def changed(cls, instance: PeriodicTask, **kwargs): ...
    
    @classmethod
    def update_changed(cls, **kwargs): ...
    
    @classmethod
    def last_change(cls) -> Optional[datetime.datetime]: ...

Usage Examples:

from django_celery_beat.models import PeriodicTasks, PeriodicTask

# Check when tasks were last changed
last_change = PeriodicTasks.last_change()
print(f"Tasks last modified: {last_change}")

# Manually update change timestamp (for bulk operations)
# Note: This is automatically called when tasks are saved/deleted
PeriodicTask.objects.filter(enabled=False).update(enabled=True)
PeriodicTasks.update_changed()  # Notify scheduler of bulk changes

# The changed() method is automatically called by Django signals
# when individual tasks are saved or deleted

Custom QuerySet

Optimized queryset for periodic task queries with prefetch relationships.

class PeriodicTaskQuerySet(QuerySet):
    """
    Custom queryset for PeriodicTask with optimizations.
    """
    def enabled(self) -> 'PeriodicTaskQuerySet': ...

Usage Examples:

from django_celery_beat.models import PeriodicTask

# Get enabled tasks with optimized queries
enabled_tasks = PeriodicTask.objects.enabled()

# The enabled() method automatically includes:
# - prefetch_related('interval', 'crontab', 'solar', 'clocked')
# - filter(enabled=True)

for task in enabled_tasks:
    print(f"Task: {task.name}, Schedule: {task.scheduler}")
    # No additional queries needed due to prefetch_related

Task Configuration Patterns

JSON Argument Handling

Tasks use JSON encoding for arguments to ensure database compatibility.

import json

# Positional arguments
args = json.dumps(['arg1', 'arg2', 123])

# Keyword arguments  
kwargs = json.dumps({
    'param1': 'value1',
    'param2': True,
    'param3': {'nested': 'dict'}
})

# AMQP headers
headers = json.dumps({
    'priority': 9,
    'retry_policy': {'max_retries': 3}
})

task = PeriodicTask.objects.create(
    name='Complex task',
    task='myapp.tasks.complex_task',
    interval=schedule,
    args=args,
    kwargs=kwargs,
    headers=headers
)

Routing Configuration

Tasks can be routed to specific queues, exchanges, and routing keys.

# Route to specific queue
task = PeriodicTask.objects.create(
    name='High priority task',
    task='myapp.tasks.urgent_task',
    interval=schedule,
    queue='high_priority',
    priority=9
)

# Use custom exchange and routing key
task = PeriodicTask.objects.create(
    name='Custom routing task',
    task='myapp.tasks.custom_task',
    interval=schedule,
    exchange='custom_exchange',
    routing_key='custom.routing.key'
)

Task Lifecycle Management

from datetime import datetime, timedelta

# Temporarily disable task
task = PeriodicTask.objects.get(name='my_task')
task.enabled = False
task.save()

# Set task expiry
task.expires = datetime.utcnow() + timedelta(hours=24)
task.save()

# Create task that starts in the future
task = PeriodicTask.objects.create(
    name='Future task',
    task='myapp.tasks.future_task',
    interval=schedule,
    start_time=datetime.utcnow() + timedelta(hours=2)
)

# Reset task execution history
task.last_run_at = None
task.total_run_count = 0
task.save()

Error Handling

Validation Errors

The PeriodicTask model enforces several validation rules:

  • Exactly one schedule type must be specified
  • Task name must be unique
  • JSON fields must contain valid JSON
  • Priority must be between 0-255 if specified
from django.core.exceptions import ValidationError

try:
    # This will raise ValidationError - no schedule specified
    task = PeriodicTask(
        name='Invalid task',
        task='myapp.tasks.test'
    )
    task.full_clean()  # Triggers validation
except ValidationError as e:
    print(f"Validation error: {e}")

try:
    # This will raise ValidationError - multiple schedules
    task = PeriodicTask(
        name='Invalid task',
        task='myapp.tasks.test',
        interval=interval_schedule,
        crontab=crontab_schedule  # Can't have both
    )
    task.full_clean()
except ValidationError as e:
    print(f"Validation error: {e}")

Common Patterns

# Safe task creation with error handling
def create_periodic_task(name, task_name, schedule, **kwargs):
    try:
        task = PeriodicTask.objects.create(
            name=name,
            task=task_name,
            **{schedule_type: schedule for schedule_type in ['interval', 'crontab', 'solar', 'clocked'] 
               if schedule_type in kwargs},
            **{k: v for k, v in kwargs.items() 
               if k not in ['interval', 'crontab', 'solar', 'clocked']}
        )
        return task
    except ValidationError as e:
        print(f"Failed to create task {name}: {e}")
        return None
    except Exception as e:
        print(f"Unexpected error creating task {name}: {e}")
        return None

# Get or create pattern with proper error handling
def get_or_create_task(name, task_name, schedule_kwargs, task_kwargs=None):
    try:
        task, created = PeriodicTask.objects.get_or_create(
            name=name,
            defaults={
                'task': task_name,
                **schedule_kwargs,
                **(task_kwargs or {})
            }
        )
        return task, created
    except Exception as e:
        print(f"Error getting/creating task {name}: {e}")
        return None, False

Install with Tessl CLI

npx tessl i tessl/pypi-django-celery-beat

docs

admin-interface.md

database-scheduler.md

index.md

schedule-models.md

task-management.md

validation-utilities.md

tile.json