CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-celery

Distributed Task Queue for Python that enables asynchronous task execution across multiple workers and machines

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

scheduling-beat.mddocs/

Scheduling and Beat

Periodic task scheduling capabilities including cron-like scheduling, interval-based schedules, solar event timing, and beat scheduler management for running recurring tasks in distributed environments.

Capabilities

Crontab Scheduling

Unix cron-style scheduling with minute, hour, day, and month specifications for precise timing control.

class crontab:
    def __init__(
        self,
        minute='*',
        hour='*', 
        day_of_week='*',
        day_of_month='*',
        month_of_year='*',
        nowfun=None,
        app=None
    ):
        """
        Create crontab schedule.
        
        Args:
            minute (str): Minute field (0-59, '*', '*/5', '10,20,30')
            hour (str): Hour field (0-23, '*', '*/2', '9-17')
            day_of_week (str): Day of week (0-6, mon-sun, '*')
            day_of_month (str): Day of month (1-31, '*', '1,15')
            month_of_year (str): Month (1-12, jan-dec, '*')
            nowfun (callable): Function returning current time
            app: Celery app instance
        """

    def is_due(self, last_run_at):
        """
        Check if schedule is due for execution.
        
        Args:
            last_run_at (datetime): Last execution time
            
        Returns:
            tuple: (is_due, next_time_to_run)
        """

    def remaining_estimate(self, last_run_at):
        """
        Estimate time until next execution.
        
        Args:
            last_run_at (datetime): Last execution time
            
        Returns:
            timedelta: Time remaining until next run
        """

def crontab(minute='*', hour='*', day_of_week='*', day_of_month='*', month_of_year='*'):
    """
    Create cron-style schedule.
    
    Args:
        minute (str): Minutes (0-59, *, */N, ranges, lists)
        hour (str): Hours (0-23, *, */N, ranges, lists)
        day_of_week (str): Days of week (0-6 or names, *, */N, ranges, lists)
        day_of_month (str): Days of month (1-31, *, */N, ranges, lists) 
        month_of_year (str): Months (1-12 or names, *, */N, ranges, lists)
        
    Returns:
        crontab instance
    """

Interval Scheduling

Simple interval-based scheduling using timedelta objects for regular recurring execution.

class schedule:
    def __init__(self, run_every, relative=False, nowfun=None, app=None):
        """
        Create interval schedule.
        
        Args:
            run_every (timedelta): Interval between executions
            relative (bool): If True, schedule relative to last run
            nowfun (callable): Function returning current time
            app: Celery app instance
        """

    def is_due(self, last_run_at):
        """
        Check if schedule is due.
        
        Args:
            last_run_at (datetime): Last execution time
            
        Returns:
            tuple: (is_due, next_time_to_run)
        """

    def remaining_estimate(self, last_run_at):
        """
        Estimate time until next execution.
        
        Args:
            last_run_at (datetime): Last execution time
            
        Returns:
            timedelta: Time remaining
        """

def schedule(run_every, relative=False):
    """
    Create interval-based schedule.
    
    Args:
        run_every (timedelta): Time between executions
        relative (bool): Schedule relative to last run vs absolute intervals
        
    Returns:
        schedule instance
    """

Solar Event Scheduling

Astronomical event-based scheduling for sunrise, sunset, and other solar events at specific geographic locations.

class solar:
    def __init__(self, event, lat, lon, nowfun=None, app=None):
        """
        Create solar event schedule.
        
        Args:
            event (str): Solar event ('sunrise', 'sunset', 'dawn_civil', 'dusk_civil')
            lat (float): Latitude coordinate
            lon (float): Longitude coordinate  
            nowfun (callable): Function returning current time
            app: Celery app instance
        """

    def is_due(self, last_run_at):
        """
        Check if solar event schedule is due.
        
        Args:
            last_run_at (datetime): Last execution time
            
        Returns:
            tuple: (is_due, next_time_to_run)
        """

    def remaining_estimate(self, last_run_at):
        """
        Estimate time until next solar event.
        
        Args:
            last_run_at (datetime): Last execution time
            
        Returns:
            timedelta: Time remaining
        """

def solar(event, lat, lon):
    """
    Create solar event schedule.
    
    Args:
        event (str): Solar event type
        lat (float): Latitude (-90 to 90)
        lon (float): Longitude (-180 to 180)
        
    Returns:
        solar instance
    """

Beat Scheduler Classes

Core beat scheduler components for managing and executing periodic tasks.

class Scheduler:
    def __init__(self, app, max_interval=None, Producer=None, lazy=False, **kwargs):
        """
        Base scheduler class.
        
        Args:
            app: Celery application instance
            max_interval (int): Maximum sleep interval between checks  
            Producer: Message producer class
            lazy (bool): Don't start immediately
        """

    def setup_schedule(self):
        """Initialize schedule from configuration."""

    def sync(self):
        """Sync schedule with storage backend."""

    def close(self):
        """Close scheduler and cleanup resources."""

    def add(self, **kwargs):
        """
        Add schedule entry.
        
        Args:
            **kwargs: Schedule entry parameters
        """

    def tick(self):
        """
        Check schedule and run due tasks.
        
        Returns:
            float: Seconds until next check needed
        """

    @property
    def schedule(self):
        """Current schedule dictionary."""

class PersistentScheduler(Scheduler):
    def __init__(self, *args, **kwargs):
        """Scheduler that persists schedule to disk."""

    def setup_schedule(self):
        """Load schedule from persistent storage."""

    def sync(self):
        """Save schedule to persistent storage."""

class ScheduleEntry:
    def __init__(
        self,
        task=None,
        schedule=None,
        args=None,
        kwargs=None,
        options=None,
        name=None,
        **kw
    ):
        """
        Individual schedule entry.
        
        Args:
            task (str): Task name to execute
            schedule: Schedule object (crontab, schedule, solar)
            args (tuple): Task arguments
            kwargs (dict): Task keyword arguments
            options (dict): Task execution options
            name (str): Entry name/identifier
        """

    def is_due(self):
        """
        Check if entry is due for execution.
        
        Returns:
            tuple: (is_due, next_time_to_run)
        """

    def __call__(self, sender=None):
        """Execute the scheduled task."""

    def update(self, other):
        """
        Update entry with values from another entry.
        
        Args:
            other (ScheduleEntry): Entry to copy from
        """

class Service:
    def __init__(self, app, max_interval=None, schedule_filename=None, scheduler_cls=None):
        """
        Beat service for running periodic tasks.
        
        Args:
            app: Celery application instance
            max_interval (int): Maximum sleep between schedule checks
            schedule_filename (str): Persistent schedule file path
            scheduler_cls: Custom scheduler class
        """

    def start(self, embedded_process=False):
        """
        Start beat service.
        
        Args:
            embedded_process (bool): Run as embedded process
        """

    def sync(self):
        """Sync scheduler state."""

    def stop(self, graceful=True):
        """
        Stop beat service.
        
        Args:
            graceful (bool): Allow graceful shutdown
        """

Beat Configuration

Configuration options and utilities for periodic task management.

# Configuration via app.conf.beat_schedule
BEAT_SCHEDULE = {
    'task-name': {
        'task': 'myapp.tasks.my_task',
        'schedule': crontab(minute=0, hour=4),  # Execute daily at 4:00 AM
        'args': (16, 16),
        'kwargs': {'verbose': True},
        'options': {'expires': 15.0}
    }
}

# Add periodic task programmatically
app.add_periodic_task(
    crontab(hour=7, minute=30, day_of_week=1),
    send_weekly_report.s('Weekly Report'),
    name='weekly report'
)

Usage Examples

Crontab Scheduling

from celery import Celery
from celery.schedules import crontab

app = Celery('scheduler_example')

@app.task
def cleanup_temp_files():
    # Cleanup logic here
    return "Cleanup completed"

@app.task
def generate_report():
    return "Report generated"

@app.task
def backup_database():
    return "Database backed up"

# Configure periodic tasks
app.conf.beat_schedule = {
    # Run every day at 2:30 AM
    'daily-cleanup': {
        'task': 'cleanup_temp_files',
        'schedule': crontab(hour=2, minute=30),
    },
    
    # Run every Monday at 7:30 AM  
    'weekly-report': {
        'task': 'generate_report',
        'schedule': crontab(hour=7, minute=30, day_of_week=1),
    },
    
    # Run every 15 minutes during business hours
    'frequent-check': {
        'task': 'cleanup_temp_files', 
        'schedule': crontab(minute='*/15', hour='9-17'),
    },
    
    # Run on specific days of month
    'monthly-backup': {
        'task': 'backup_database',
        'schedule': crontab(hour=1, minute=0, day_of_month='1,15'),
    },
    
    # Run multiple times per day
    'multiple-daily': {
        'task': 'cleanup_temp_files',
        'schedule': crontab(hour='6,12,18', minute=0),
    }
}

app.conf.timezone = 'UTC'

Interval Scheduling

from datetime import timedelta
from celery.schedules import schedule

@app.task
def periodic_health_check():
    return "Health check completed"

@app.task  
def process_queue():
    return "Queue processed"

# Interval-based scheduling
app.conf.beat_schedule = {
    # Run every 30 seconds
    'health-check': {
        'task': 'periodic_health_check',
        'schedule': schedule(run_every=timedelta(seconds=30)),
    },
    
    # Run every 5 minutes  
    'process-queue': {
        'task': 'process_queue',
        'schedule': schedule(run_every=timedelta(minutes=5)),
        'args': ('high_priority',),
        'options': {'expires': 60}  # Task expires after 60 seconds
    },
    
    # Run every hour with relative timing
    'hourly-task': {
        'task': 'cleanup_temp_files',
        'schedule': schedule(run_every=timedelta(hours=1), relative=True),
    }
}

Solar Event Scheduling

from celery.schedules import solar

@app.task
def morning_activation():
    return "Morning systems activated"

@app.task
def evening_shutdown():
    return "Evening systems shutdown"

# Solar event scheduling (San Francisco coordinates)
app.conf.beat_schedule = {
    # Run at sunrise
    'morning-startup': {
        'task': 'morning_activation',
        'schedule': solar('sunrise', 37.7749, -122.4194),
    },
    
    # Run at sunset
    'evening-shutdown': {
        'task': 'evening_shutdown', 
        'schedule': solar('sunset', 37.7749, -122.4194),
    }
}

Programmatic Schedule Management

from celery.schedules import crontab
from datetime import timedelta

@app.task
def dynamic_task(message):
    return f"Executed: {message}"

# Add periodic tasks programmatically
def setup_dynamic_schedule():
    # Add daily task
    app.add_periodic_task(
        crontab(hour=8, minute=0),
        dynamic_task.s('Daily morning task'),
        name='morning task'
    )
    
    # Add interval task  
    app.add_periodic_task(
        timedelta(minutes=10),
        dynamic_task.s('Every 10 minutes'),
        name='frequent task'
    )
    
    # Add conditional task
    import datetime
    if datetime.datetime.now().weekday() == 0:  # Monday
        app.add_periodic_task(
            crontab(hour=9, minute=0, day_of_week=1),
            dynamic_task.s('Monday special task'),
            name='monday task'
        )

# Call during app initialization
setup_dynamic_schedule()

Custom Scheduler

from celery.beat import Scheduler, ScheduleEntry
import json
import os

class DatabaseScheduler(Scheduler):
    """Custom scheduler that stores schedule in database."""
    
    def __init__(self, *args, **kwargs):
        self.database_url = kwargs.pop('database_url', 'sqlite:///schedule.db')
        super().__init__(*args, **kwargs)
    
    def setup_schedule(self):
        """Load schedule from database."""
        # Implementation would load from database
        self.schedule = self.load_schedule_from_db()
    
    def sync(self):
        """Save schedule to database."""
        # Implementation would save to database  
        self.save_schedule_to_db()
    
    def load_schedule_from_db(self):
        """Load schedule entries from database."""
        # Database loading logic
        return {}
    
    def save_schedule_to_db(self):
        """Save schedule entries to database."""
        # Database saving logic
        pass

# Use custom scheduler
app.conf.beat_scheduler = 'myapp.schedulers:DatabaseScheduler'

Beat Service Management

from celery.beat import Service

def run_beat_service():
    """Run beat service programmatically."""
    
    # Create beat service
    beat_service = Service(
        app=app,
        max_interval=60,  # Check schedule every 60 seconds max
        schedule_filename='celerybeat-schedule'
    )
    
    try:
        # Start beat service (blocking)
        beat_service.start()
    except KeyboardInterrupt:
        print("Beat service interrupted")
    finally:
        # Cleanup
        beat_service.stop()

# Run beat as embedded process
def run_embedded_beat():
    """Run beat embedded in worker process."""
    
    beat_service = Service(app=app)
    beat_service.start(embedded_process=True)
    
    # Continue with other work...
    
    # Stop when needed
    beat_service.stop()

Advanced Scheduling Patterns

from celery.schedules import crontab, schedule
from datetime import datetime, timedelta

@app.task
def conditional_task():
    # Task that only runs under certain conditions
    current_hour = datetime.now().hour
    if 9 <= current_hour <= 17:  # Business hours
        return "Task executed during business hours"
    else:
        return "Task skipped outside business hours"

@app.task
def escalating_task(attempt=1):
    # Task with escalating retry intervals
    try:
        # Some operation that might fail
        if attempt < 3:
            raise Exception(f"Simulated failure, attempt {attempt}")
        return f"Success on attempt {attempt}"
    except Exception:
        # Retry with increasing delay
        escalating_task.apply_async(
            args=(attempt + 1,),
            countdown=attempt * 60  # 1min, 2min, 3min delays
        )

# Complex scheduling configuration
app.conf.beat_schedule = {
    # Task with complex cron expression
    'business-hours-only': {
        'task': 'conditional_task',
        'schedule': crontab(minute='*/30', hour='9-17', day_of_week='1-5'),
        'options': {
            'expires': 15 * 60,  # Expire after 15 minutes
            'retry': True,
            'retry_policy': {
                'max_retries': 3,
                'interval_start': 0,
                'interval_step': 0.2,
                'interval_max': 0.2,
            }
        }
    },
    
    # Weekend-only task
    'weekend-maintenance': {
        'task': 'cleanup_temp_files',
        'schedule': crontab(hour=3, minute=0, day_of_week='6,0'),  # Saturday and Sunday
        'kwargs': {'deep_clean': True}
    },
    
    # End of month reporting
    'month-end-report': {
        'task': 'generate_report',
        'schedule': crontab(hour=23, minute=59, day_of_month='28-31'),  # Last days of month
        'kwargs': {'report_type': 'monthly'}
    }
}

Monitoring and Debugging

@app.task(bind=True)
def monitored_periodic_task(self):
    """Periodic task with built-in monitoring."""
    
    try:
        # Task logic here
        result = "Task completed successfully"
        
        # Update task state
        self.update_state(
            state='SUCCESS',
            meta={'result': result, 'timestamp': datetime.now().isoformat()}
        )
        
        return result
        
    except Exception as exc:
        # Handle errors
        self.update_state(
            state='FAILURE',
            meta={'error': str(exc), 'timestamp': datetime.now().isoformat()}
        )
        raise exc

# Configure with detailed options
app.conf.beat_schedule = {
    'monitored-task': {
        'task': 'monitored_periodic_task',
        'schedule': crontab(minute='*/5'),
        'options': {
            'task_track_started': True,
            'task_serializer': 'json',
            'task_routes': {'monitored_periodic_task': {'queue': 'periodic'}}
        }
    }
}

Install with Tessl CLI

npx tessl i tessl/pypi-celery

docs

configuration.md

core-application.md

exceptions.md

index.md

results-state.md

scheduling-beat.md

signals-events.md

workflow-primitives.md

tile.json