An app that provides django integration for RQ (Redis Queue)
—
Prometheus metrics collection, Sentry integration, and Django-specific monitoring features for comprehensive observability and error tracking.
Collect and expose RQ metrics for Prometheus monitoring.
from django_rq.contrib.prometheus import RQCollector
class RQCollector:
"""
Prometheus metrics collector for RQ statistics.
Metrics exposed:
- rq_workers: Number of workers by queue and state
- rq_jobs: Job counts by queue and status
- rq_job_successful_total: Total successful jobs by worker
- rq_job_failed_total: Total failed jobs by worker
- rq_working_seconds_total: Total working time by worker
"""
def collect(self):
"""
Collect current RQ metrics.
Returns:
Generator: Prometheus metric families
"""Access Prometheus metrics via HTTP endpoint.
def prometheus_metrics(request):
"""
Prometheus metrics endpoint.
Authentication:
- Django staff user session
- Bearer token authentication
Returns:
HttpResponse: Prometheus format metrics
URL: /django-rq/metrics/
"""Usage example:
# Install prometheus support
# pip install django-rq[prometheus]
# settings.py - Enable metrics collection
INSTALLED_APPS = [
'django_rq',
# ... other apps
]
# Access metrics
# GET /django-rq/metrics/
# Authorization: Bearer your-api-tokenConfigure Prometheus to scrape Django-RQ metrics:
# prometheus.yml
scrape_configs:
- job_name: 'django-rq'
static_configs:
- targets: ['localhost:8000']
metrics_path: '/django-rq/metrics/'
bearer_token: 'your-api-token'
scrape_interval: 30sConfigure Sentry for error tracking and monitoring.
def configure_sentry(sentry_dsn, **options):
"""
Configure Sentry client for RQ workers.
Args:
sentry_dsn: Sentry DSN URL
**options: Additional Sentry configuration options
Options:
sentry_debug: Enable debug mode
sentry_ca_certs: Path to CA certificates
Integrations:
- RedisIntegration
- RqIntegration
- DjangoIntegration
"""Configure Sentry for RQ workers via command line:
# Override Django Sentry configuration for workers
python manage.py rqworker --sentry-dsn=https://key@sentry.io/project
# With additional options
python manage.py rqworker \
--sentry-dsn=https://key@sentry.io/project \
--sentry-debug \
--sentry-ca-certs=/path/to/certsConfigure Sentry in Django settings for automatic integration:
# settings.py
import sentry_sdk
from sentry_sdk.integrations.django import DjangoIntegration
from sentry_sdk.integrations.rq import RqIntegration
from sentry_sdk.integrations.redis import RedisIntegration
sentry_sdk.init(
dsn="https://key@sentry.io/project",
integrations=[
DjangoIntegration(),
RqIntegration(),
RedisIntegration(),
],
traces_sample_rate=1.0,
send_default_pii=True
)Comprehensive statistics collection for monitoring and analysis.
from django_rq.utils import get_statistics, get_scheduler_statistics
def get_statistics(run_maintenance_tasks=False):
"""
Get comprehensive queue and worker statistics.
Note: This function is in the utils module and must be imported directly.
Args:
run_maintenance_tasks: Whether to run cleanup tasks
Returns:
dict: Statistics including:
- queues: List of queue statistics
- workers: Worker counts and details
- jobs: Job counts by status
- connections: Redis connection info
"""
def get_scheduler_statistics():
"""
Get scheduler statistics across all Redis connections.
Note: This function is in the utils module and must be imported directly.
Returns:
dict: Scheduler statistics including:
- schedulers: Scheduler status by connection
- scheduled_jobs: Count of scheduled jobs
"""Statistics are returned in a structured format:
{
"queues": [
{
"name": "default",
"jobs": 10,
"workers": 2,
"finished_jobs": 100,
"failed_jobs": 5,
"started_jobs": 1,
"deferred_jobs": 0,
"scheduled_jobs": 3,
"oldest_job_timestamp": "2024-01-01 12:00:00",
"connection_kwargs": {...},
"scheduler_pid": 1234
}
],
"schedulers": {
"localhost:6379/0": {
"count": 5,
"index": 0
}
}
}Configure RQ logging to integrate with Django's logging system.
# settings.py
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'rq_console': {
'format': '%(asctime)s %(message)s',
'datefmt': '%H:%M:%S',
},
},
'handlers': {
'rq_console': {
'level': 'DEBUG',
'class': 'rq.logutils.ColorizingStreamHandler',
'formatter': 'rq_console',
'exclude': ['%(asctime)s'],
},
'file': {
'level': 'INFO',
'class': 'logging.FileHandler',
'filename': '/var/log/django-rq.log',
},
},
'loggers': {
'rq.worker': {
'handlers': ['rq_console', 'file'],
'level': 'DEBUG'
},
}
}Configure custom exception handlers for specialized error handling.
def get_exception_handlers():
"""
Get custom exception handlers from settings.
Returns:
list: Exception handler functions
"""
# settings.py
RQ_EXCEPTION_HANDLERS = [
'myapp.handlers.custom_exception_handler',
'myapp.handlers.notification_handler',
]
# Custom handler example
def custom_exception_handler(job, exc_type, exc_value, traceback):
"""
Custom exception handler for RQ jobs.
Args:
job: Failed job instance
exc_type: Exception type
exc_value: Exception instance
traceback: Traceback object
"""
# Custom error processing
logger.error(f"Job {job.id} failed: {exc_value}")
# Send notifications, update databases, etc.Implement health checks for monitoring system status.
# Health check utilities
from django_rq.utils import get_statistics
from django_rq import get_connection
def rq_health_check():
"""
Check RQ system health.
Returns:
dict: Health status information
"""
try:
# Check Redis connectivity
conn = get_connection('default')
conn.ping()
# Get queue statistics
stats = get_statistics()
# Check for stuck jobs
stuck_jobs = check_stuck_jobs()
return {
'status': 'healthy',
'redis_connected': True,
'active_workers': sum(q['workers'] for q in stats['queues']),
'total_jobs': sum(q['jobs'] for q in stats['queues']),
'stuck_jobs': stuck_jobs
}
except Exception as e:
return {
'status': 'unhealthy',
'error': str(e)
}
def check_stuck_jobs():
"""Check for jobs that may be stuck."""
# Implementation to detect stuck jobs
passMonitor RQ performance metrics and system resources.
# Performance monitoring utilities
def monitor_queue_performance():
"""
Monitor queue processing performance.
Metrics:
- Job processing rate
- Average job duration
- Queue depth trends
- Worker utilization
"""
pass
def monitor_redis_performance():
"""
Monitor Redis performance for RQ.
Metrics:
- Memory usage
- Connection count
- Command latency
- Key expiration rates
"""
passCommon integration patterns for monitoring and observability.
Create Grafana dashboards using Prometheus metrics:
{
"dashboard": {
"title": "Django-RQ Monitoring",
"panels": [
{
"title": "Queue Length",
"type": "graph",
"targets": [
{
"expr": "rq_jobs{status=\"queued\"}"
}
]
},
{
"title": "Worker Count",
"type": "stat",
"targets": [
{
"expr": "sum(rq_workers)"
}
]
}
]
}
}Configure alerting based on RQ metrics:
# alerting.yml
groups:
- name: django_rq
rules:
- alert: RQHighQueueDepth
expr: rq_jobs{status="queued"} > 100
for: 5m
labels:
severity: warning
annotations:
summary: "RQ queue depth is high"
- alert: RQNoWorkers
expr: sum(rq_workers) == 0
for: 1m
labels:
severity: critical
annotations:
summary: "No RQ workers are running"Export custom metrics for specific monitoring needs:
from prometheus_client import Counter, Histogram, Gauge
# Custom metrics
job_duration = Histogram('rq_job_duration_seconds', 'Job execution time')
custom_jobs = Counter('rq_custom_jobs_total', 'Custom job counter')
queue_age = Gauge('rq_queue_age_seconds', 'Age of oldest job in queue')
# Use in job functions
@job_duration.time()
def monitored_job():
custom_jobs.inc()
# Job implementationConfigure monitoring and integration features:
# settings.py
# API token for metrics access
RQ_API_TOKEN = 'your-secure-token'
# Exception handlers
RQ_EXCEPTION_HANDLERS = [
'myapp.handlers.sentry_handler',
'myapp.handlers.custom_handler',
]
# Enable admin link
RQ_SHOW_ADMIN_LINK = True
# Prometheus collector (automatic if prometheus_client installed)
# pip install django-rq[prometheus]Django template tags for displaying job information in templates.
from django import template
register = template.Library()
@register.filter
def to_localtime(time):
"""
Convert UTC datetime to local timezone.
Args:
time: UTC datetime object
Returns:
datetime: Localized datetime
"""
@register.filter
def show_func_name(job):
"""
Safely display job function name.
Args:
job: RQ Job instance
Returns:
str: Function name or error representation
"""
@register.filter
def force_escape(text):
"""
HTML escape text content.
Args:
text: Text to escape
Returns:
str: HTML-escaped text
"""
@register.filter
def items(dictionary):
"""
Access dictionary items in templates.
Args:
dictionary: Dictionary object
Returns:
dict_items: Dictionary items
"""Usage in templates:
{% load django_rq %}
<!-- Display job function name safely -->
{{ job|show_func_name }}
<!-- Convert UTC time to local -->
{{ job.created_at|to_localtime }}
<!-- Escape user content -->
{{ user_input|force_escape }}
<!-- Iterate dictionary items -->
{% for key, value in stats|items %}
<p>{{ key }}: {{ value }}</p>
{% endfor %}Install with Tessl CLI
npx tessl i tessl/pypi-django-rq