Database-backed periodic task scheduling for Django and Celery integration
—
Core models and utilities for defining, configuring, and tracking periodic tasks including task arguments, routing, scheduling, and execution management.
Central model for defining periodic tasks with full configuration options.
class PeriodicTask(models.Model):
"""
Model representing a periodic task to be executed by Celery.
Must be associated with exactly one schedule type (interval, crontab, solar, or clocked).
"""
# Task identification
name: str # Unique task name, max 200 chars
task: str # Celery task name to execute, max 200 chars
# Schedule associations (exactly one must be set)
interval: Optional[ForeignKey[IntervalSchedule]]
crontab: Optional[ForeignKey[CrontabSchedule]]
solar: Optional[ForeignKey[SolarSchedule]]
clocked: Optional[ForeignKey[ClockedSchedule]]
# Task arguments
args: str # JSON encoded positional arguments, default '[]'
kwargs: str # JSON encoded keyword arguments, default '{}'
# Routing and delivery
queue: Optional[str] # Queue override, max 200 chars
exchange: Optional[str] # AMQP exchange override, max 200 chars
routing_key: Optional[str] # AMQP routing key override, max 200 chars
headers: str # JSON encoded AMQP headers, default '{}'
priority: Optional[int] # Task priority 0-255
# Execution control
expires: Optional[datetime.datetime] # Absolute expiry time
expire_seconds: Optional[int] # Expiry in seconds from now
one_off: bool # Execute only once, default False
start_time: Optional[datetime.datetime] # When to start executing
enabled: bool # Whether task is enabled, default True
# Execution tracking (auto-managed)
last_run_at: Optional[datetime.datetime] # Last execution time
total_run_count: int # Number of times executed, default 0
date_changed: datetime.datetime # Last modified timestamp
# Documentation
description: str # Task description
# Manager and internal attributes
objects: PeriodicTaskQuerySet # Custom QuerySet manager
no_changes: bool # Internal flag for change tracking
@property
def scheduler(self) -> Union[IntervalSchedule, CrontabSchedule, SolarSchedule, ClockedSchedule]: ...
@property
def schedule(self) -> schedules.BaseSchedule: ...
@property
def expires_(self) -> Optional[datetime.datetime]: ...
def due_start_time(self, tz: Optional[tzinfo.tzinfo] = None) -> datetime.datetime: ...
def validate_unique(self): ...Usage Examples:
from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule
import json
from datetime import datetime, timedelta
# Create task with interval schedule
schedule = IntervalSchedule.objects.create(every=30, period=IntervalSchedule.MINUTES)
task = PeriodicTask.objects.create(
name='Process queue every 30 minutes',
task='myapp.tasks.process_queue',
interval=schedule,
args=json.dumps([]),
kwargs=json.dumps({'priority': 'high'}),
enabled=True
)
# Create task with cron schedule and expiry
cron_schedule = CrontabSchedule.objects.create(
minute='0',
hour='2',
day_of_week='*',
day_of_month='*',
month_of_year='*'
)
task = PeriodicTask.objects.create(
name='Daily backup at 2 AM',
task='myapp.tasks.backup_database',
crontab=cron_schedule,
expires=datetime.utcnow() + timedelta(days=30),
queue='backup_queue',
description='Automated daily database backup'
)
# Create one-off task
task = PeriodicTask.objects.create(
name='One-time data migration',
task='myapp.tasks.migrate_data',
interval=IntervalSchedule.objects.create(every=1, period=IntervalSchedule.SECONDS),
one_off=True,
args=json.dumps(['migration_v2'])
)Model for tracking when periodic tasks are modified to trigger scheduler updates.
class PeriodicTasks(models.Model):
"""
Helper model to track when periodic task schedules change.
Contains a single row with ID=1 that gets updated whenever tasks are modified.
"""
ident: int # Primary key, always 1
last_update: datetime.datetime # Timestamp of last change
@classmethod
def changed(cls, instance: PeriodicTask, **kwargs): ...
@classmethod
def update_changed(cls, **kwargs): ...
@classmethod
def last_change(cls) -> Optional[datetime.datetime]: ...Usage Examples:
from django_celery_beat.models import PeriodicTasks, PeriodicTask
# Check when tasks were last changed
last_change = PeriodicTasks.last_change()
print(f"Tasks last modified: {last_change}")
# Manually update change timestamp (for bulk operations)
# Note: This is automatically called when tasks are saved/deleted
PeriodicTask.objects.filter(enabled=False).update(enabled=True)
PeriodicTasks.update_changed() # Notify scheduler of bulk changes
# The changed() method is automatically called by Django signals
# when individual tasks are saved or deletedOptimized queryset for periodic task queries with prefetch relationships.
class PeriodicTaskQuerySet(QuerySet):
"""
Custom queryset for PeriodicTask with optimizations.
"""
def enabled(self) -> 'PeriodicTaskQuerySet': ...Usage Examples:
from django_celery_beat.models import PeriodicTask
# Get enabled tasks with optimized queries
enabled_tasks = PeriodicTask.objects.enabled()
# The enabled() method automatically includes:
# - prefetch_related('interval', 'crontab', 'solar', 'clocked')
# - filter(enabled=True)
for task in enabled_tasks:
print(f"Task: {task.name}, Schedule: {task.scheduler}")
# No additional queries needed due to prefetch_relatedTasks use JSON encoding for arguments to ensure database compatibility.
import json
# Positional arguments
args = json.dumps(['arg1', 'arg2', 123])
# Keyword arguments
kwargs = json.dumps({
'param1': 'value1',
'param2': True,
'param3': {'nested': 'dict'}
})
# AMQP headers
headers = json.dumps({
'priority': 9,
'retry_policy': {'max_retries': 3}
})
task = PeriodicTask.objects.create(
name='Complex task',
task='myapp.tasks.complex_task',
interval=schedule,
args=args,
kwargs=kwargs,
headers=headers
)Tasks can be routed to specific queues, exchanges, and routing keys.
# Route to specific queue
task = PeriodicTask.objects.create(
name='High priority task',
task='myapp.tasks.urgent_task',
interval=schedule,
queue='high_priority',
priority=9
)
# Use custom exchange and routing key
task = PeriodicTask.objects.create(
name='Custom routing task',
task='myapp.tasks.custom_task',
interval=schedule,
exchange='custom_exchange',
routing_key='custom.routing.key'
)from datetime import datetime, timedelta
# Temporarily disable task
task = PeriodicTask.objects.get(name='my_task')
task.enabled = False
task.save()
# Set task expiry
task.expires = datetime.utcnow() + timedelta(hours=24)
task.save()
# Create task that starts in the future
task = PeriodicTask.objects.create(
name='Future task',
task='myapp.tasks.future_task',
interval=schedule,
start_time=datetime.utcnow() + timedelta(hours=2)
)
# Reset task execution history
task.last_run_at = None
task.total_run_count = 0
task.save()The PeriodicTask model enforces several validation rules:
from django.core.exceptions import ValidationError
try:
# This will raise ValidationError - no schedule specified
task = PeriodicTask(
name='Invalid task',
task='myapp.tasks.test'
)
task.full_clean() # Triggers validation
except ValidationError as e:
print(f"Validation error: {e}")
try:
# This will raise ValidationError - multiple schedules
task = PeriodicTask(
name='Invalid task',
task='myapp.tasks.test',
interval=interval_schedule,
crontab=crontab_schedule # Can't have both
)
task.full_clean()
except ValidationError as e:
print(f"Validation error: {e}")# Safe task creation with error handling
def create_periodic_task(name, task_name, schedule, **kwargs):
try:
task = PeriodicTask.objects.create(
name=name,
task=task_name,
**{schedule_type: schedule for schedule_type in ['interval', 'crontab', 'solar', 'clocked']
if schedule_type in kwargs},
**{k: v for k, v in kwargs.items()
if k not in ['interval', 'crontab', 'solar', 'clocked']}
)
return task
except ValidationError as e:
print(f"Failed to create task {name}: {e}")
return None
except Exception as e:
print(f"Unexpected error creating task {name}: {e}")
return None
# Get or create pattern with proper error handling
def get_or_create_task(name, task_name, schedule_kwargs, task_kwargs=None):
try:
task, created = PeriodicTask.objects.get_or_create(
name=name,
defaults={
'task': task_name,
**schedule_kwargs,
**(task_kwargs or {})
}
)
return task, created
except Exception as e:
print(f"Error getting/creating task {name}: {e}")
return None, FalseInstall with Tessl CLI
npx tessl i tessl/pypi-django-celery-beat