Database-backed periodic task scheduling for Django and Celery integration
—
Django Admin integration providing web-based management interface for periodic tasks and schedules with custom forms, actions, validation, and task execution capabilities.
Main admin interface for managing periodic tasks with custom actions and forms.
class PeriodicTaskAdmin(ModelAdmin):
"""
Django Admin interface for PeriodicTask model.
Provides form-based task management with custom widgets,
bulk actions, and task execution capabilities.
"""
form = PeriodicTaskForm
model = PeriodicTask
# Admin actions
def enable_tasks(self, request, queryset): ...
def disable_tasks(self, request, queryset): ...
def toggle_tasks(self, request, queryset): ...
def run_tasks(self, request, queryset): ...
# Custom admin methods
def get_queryset(self, request): ...
def save_model(self, request, obj, form, change): ...Available Admin Actions:
Usage Examples:
# Admin actions are available in the Django Admin interface
# Select tasks and use the action dropdown to:
# 1. Enable multiple tasks at once
# - Select disabled tasks
# - Choose "Enable selected periodic tasks" from actions dropdown
# 2. Disable tasks for maintenance
# - Select enabled tasks
# - Choose "Disable selected periodic tasks" from actions dropdown
# 3. Toggle task states
# - Select any tasks
# - Choose "Toggle selected periodic tasks" from actions dropdown
# 4. Run tasks immediately (for testing)
# - Select tasks to execute
# - Choose "Run selected periodic tasks" from actions dropdown
# - Tasks will be sent to Celery workers immediatelyAdmin interfaces for managing different schedule types.
class CrontabScheduleAdmin(ModelAdmin):
"""
Admin interface for CrontabSchedule with human-readable display.
"""
list_display = ('__str__', 'timezone', 'human_readable')
list_filter = ('timezone',)
search_fields = ('minute', 'hour', 'day_of_week', 'day_of_month', 'month_of_year')
class IntervalScheduleAdmin(ModelAdmin):
"""
Admin interface for IntervalSchedule.
"""
list_display = ('__str__', 'every', 'period')
list_filter = ('period',)
search_fields = ('every',)
class SolarScheduleAdmin(ModelAdmin):
"""
Admin interface for SolarSchedule.
"""
list_display = ('__str__', 'event', 'latitude', 'longitude')
list_filter = ('event',)
search_fields = ('latitude', 'longitude')
class ClockedScheduleAdmin(ModelAdmin):
"""
Admin interface for ClockedSchedule.
"""
list_display = ('__str__', 'clocked_time')
list_filter = ('clocked_time',)
class PeriodicTaskInline(TabularInline):
"""
Inline admin interface for displaying PeriodicTasks within schedule admins.
"""
model = PeriodicTask
fields = ('name', 'task', 'args', 'kwargs')
readonly_fields = ('name', 'task', 'args', 'kwargs')
can_delete: bool
extra: int
show_change_link: bool
def has_add_permission(self, request, obj: Optional[Model] = None) -> bool: ...Specialized form components for task and schedule management.
class TaskSelectWidget(Select):
"""
Widget for selecting Celery tasks from registered task list.
Dynamically populates choices with available Celery tasks.
"""
celery_app: Celery # Current Celery app instance
_choices: Optional[tuple] # Cached task choices
def tasks_as_choices(self) -> tuple[tuple[str, str], ...]: ...
@property
def choices(self) -> tuple: ...
@choices.setter
def choices(self, _) -> None: ...
@cached_property
def _modules(self): ...
class TaskChoiceField(ChoiceField):
"""
Form field for selecting Celery tasks with validation.
Validates that selected task is registered with Celery.
"""
widget = TaskSelectWidget
def __init__(self, *args, **kwargs): ...
def valid_value(self, value: str) -> bool: ...
class PeriodicTaskForm(ModelForm):
"""
ModelForm for PeriodicTask with custom validation and widgets.
Provides enhanced task selection, JSON validation, and schedule validation.
"""
task = TaskChoiceField()
class Meta:
model = PeriodicTask
fields = '__all__'
def clean(self) -> dict: ...
def clean_args(self) -> str: ...
def clean_kwargs(self) -> str: ...
def clean_headers(self) -> str: ...Usage Examples:
# Custom form usage in Django Admin (automatic)
# The PeriodicTaskForm provides:
# 1. Task selection dropdown
# - Automatically populated with registered Celery tasks
# - Validates task exists in Celery registry
# 2. JSON validation for arguments
# - Validates args field contains valid JSON list
# - Validates kwargs field contains valid JSON object
# - Validates headers field contains valid JSON object
# 3. Schedule validation
# - Ensures exactly one schedule type is selected
# - Validates schedule parameters are correct
# Example form data validation:
form_data = {
'name': 'Test Task',
'task': 'myapp.tasks.example_task', # Must be registered with Celery
'args': '["arg1", "arg2"]', # Must be valid JSON list
'kwargs': '{"param": "value"}', # Must be valid JSON object
'headers': '{"priority": 9}', # Must be valid JSON object
'interval': interval_schedule.id, # Must select exactly one schedule
'enabled': True
}
form = PeriodicTaskForm(data=form_data)
if form.is_valid():
task = form.save()
else:
print("Validation errors:", form.errors)The Django Admin provides a comprehensive interface for task management:
List View Features:
Detail View Features:
Each schedule type has its own admin interface:
Crontab Schedule Admin:
Interval Schedule Admin:
Solar Schedule Admin:
Clocked Schedule Admin:
# Custom admin registration with extended functionality
from django.contrib import admin
from django_celery_beat.models import PeriodicTask, CrontabSchedule
from django_celery_beat.admin import PeriodicTaskAdmin, CrontabScheduleAdmin
class CustomPeriodicTaskAdmin(PeriodicTaskAdmin):
"""Extended admin with custom features."""
list_display = PeriodicTaskAdmin.list_display + ('total_run_count', 'last_run_at')
list_filter = PeriodicTaskAdmin.list_filter + ('date_changed',)
readonly_fields = ('total_run_count', 'last_run_at', 'date_changed')
def get_queryset(self, request):
"""Add custom filtering or optimization."""
qs = super().get_queryset(request)
# Add custom filters or select_related optimizations
return qs.select_related('interval', 'crontab', 'solar', 'clocked')
def save_model(self, request, obj, form, change):
"""Add custom save logic."""
if not change: # New task
obj.description = f"Created by {request.user.username}"
super().save_model(request, obj, form, change)
# Re-register with custom admin
admin.site.unregister(PeriodicTask)
admin.site.register(PeriodicTask, CustomPeriodicTaskAdmin)The package includes custom admin templates for enhanced functionality:
Custom Change Form Template:
Custom Change List Template:
from django_celery_beat.admin import PeriodicTaskAdmin
from django_celery_beat.models import PeriodicTask
from django.contrib.admin.sites import site
from django.http import HttpRequest
# Simulate admin actions programmatically
admin_instance = PeriodicTaskAdmin(PeriodicTask, site)
request = HttpRequest() # Mock request object
# Enable tasks
queryset = PeriodicTask.objects.filter(enabled=False)
admin_instance.enable_tasks(request, queryset)
# Disable tasks
queryset = PeriodicTask.objects.filter(name__startswith='test_')
admin_instance.disable_tasks(request, queryset)
# Run tasks immediately
queryset = PeriodicTask.objects.filter(name='urgent_task')
admin_instance.run_tasks(request, queryset)from django.contrib import admin
from django.urls import path
from django.shortcuts import render
from django.http import JsonResponse
from django_celery_beat.models import PeriodicTask
class ExtendedPeriodicTaskAdmin(PeriodicTaskAdmin):
"""Admin with custom views for advanced task management."""
def get_urls(self):
"""Add custom admin URLs."""
urls = super().get_urls()
custom_urls = [
path('task-status/', self.task_status_view, name='task-status'),
path('bulk-operations/', self.bulk_operations_view, name='bulk-operations'),
]
return custom_urls + urls
def task_status_view(self, request):
"""Custom view for task status overview."""
stats = {
'total_tasks': PeriodicTask.objects.count(),
'enabled_tasks': PeriodicTask.objects.filter(enabled=True).count(),
'disabled_tasks': PeriodicTask.objects.filter(enabled=False).count(),
'one_off_tasks': PeriodicTask.objects.filter(one_off=True).count(),
}
if request.headers.get('Accept') == 'application/json':
return JsonResponse(stats)
return render(request, 'admin/task_status.html', {'stats': stats})
def bulk_operations_view(self, request):
"""Custom view for bulk task operations."""
if request.method == 'POST':
operation = request.POST.get('operation')
task_ids = request.POST.getlist('task_ids')
queryset = PeriodicTask.objects.filter(id__in=task_ids)
if operation == 'enable':
queryset.update(enabled=True)
elif operation == 'disable':
queryset.update(enabled=False)
elif operation == 'delete':
queryset.delete()
return JsonResponse({'status': 'success', 'affected': len(task_ids)})
tasks = PeriodicTask.objects.all()
return render(request, 'admin/bulk_operations.html', {'tasks': tasks})from django_celery_beat.admin import PeriodicTaskForm
from django.core.exceptions import ValidationError
# Common validation scenarios
def handle_form_validation():
# Invalid JSON in args
form_data = {
'name': 'Test Task',
'task': 'myapp.tasks.test',
'args': 'invalid json', # Will cause validation error
'interval': schedule.id
}
form = PeriodicTaskForm(data=form_data)
if not form.is_valid():
print("Args validation error:", form.errors['args'])
# Missing schedule
form_data = {
'name': 'Test Task',
'task': 'myapp.tasks.test',
# No schedule specified - will cause validation error
}
form = PeriodicTaskForm(data=form_data)
if not form.is_valid():
print("Schedule validation error:", form.non_field_errors())
# Invalid task name
form_data = {
'name': 'Test Task',
'task': 'nonexistent.task', # Task not registered with Celery
'interval': schedule.id
}
form = PeriodicTaskForm(data=form_data)
if not form.is_valid():
print("Task validation error:", form.errors['task'])from django.contrib import messages
from django.contrib.admin import ModelAdmin
class RobustPeriodicTaskAdmin(PeriodicTaskAdmin):
"""Admin with enhanced error handling."""
def run_tasks(self, request, queryset):
"""Run tasks with error handling."""
success_count = 0
error_count = 0
for task in queryset:
try:
# Send task to Celery
from celery import current_app
current_app.send_task(
task.task,
args=task.args,
kwargs=task.kwargs,
queue=task.queue,
routing_key=task.routing_key,
priority=task.priority
)
success_count += 1
except Exception as e:
error_count += 1
messages.error(request, f"Failed to run task {task.name}: {e}")
if success_count:
messages.success(request, f"Successfully queued {success_count} tasks")
if error_count:
messages.error(request, f"Failed to queue {error_count} tasks")
run_tasks.short_description = "Run selected periodic tasks (with error handling)"Install with Tessl CLI
npx tessl i tessl/pypi-django-celery-beat