CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-taskiq

Distributed task queue with full async support for Python applications

Overview
Eval results
Files

scheduling.mddocs/

Scheduling

Task scheduling system for executing tasks at specific times, on recurring schedules, or based on custom triggers. Supports cron-like expressions, one-time scheduling, and extensible schedule sources.

Capabilities

Scheduler Management

Core scheduler class that coordinates between schedule sources and task brokers to execute scheduled tasks.

class TaskiqScheduler:
    """
    Main scheduler class that manages scheduled task execution.
    
    Coordinates between schedule sources and brokers to execute tasks
    at the appropriate times based on cron expressions, fixed times,
    or custom scheduling logic.
    """
    
    def __init__(
        self,
        broker: AsyncBroker,
        sources: List[ScheduleSource],
    ) -> None:
        """
        Initialize scheduler with broker and schedule sources.
        
        Args:
            broker: Broker instance for task execution
            sources: List of schedule sources providing tasks
        """
    
    async def startup(self) -> None:
        """
        Start the scheduler and initialize all components.
        
        Calls startup on the broker and prepares schedule sources
        for task execution.
        """
    
    async def shutdown(self) -> None:
        """
        Shutdown the scheduler and cleanup resources.
        
        Stops all scheduled tasks and shuts down the broker.
        """
    
    async def run_forever(self) -> None:
        """
        Run the scheduler continuously until shutdown.
        
        Main scheduler loop that checks schedule sources
        and executes tasks when they become ready.
        """
    
    async def on_ready(
        self,
        source: ScheduleSource,
        task: ScheduledTask,
    ) -> None:
        """
        Handler called when a scheduled task is ready for execution.
        
        Args:
            source: Schedule source that triggered the task
            task: Scheduled task to be executed
        """

Scheduled Tasks

Representation of tasks with scheduling metadata including timing, arguments, and execution context.

class ScheduledTask:
    """
    Represents a task scheduled for future execution.
    
    Contains task identification, scheduling information,
    and execution parameters.
    """
    
    task_name: str
    """Name of the task to execute."""
    
    cron: Optional[str]
    """Cron expression for recurring schedules (e.g., '0 0 * * *' for daily)."""
    
    time: Optional[datetime]
    """Specific datetime for one-time execution."""
    
    labels: Dict[str, Any]
    """Additional labels and metadata for the task."""
    
    args: Tuple[Any, ...]
    """Positional arguments to pass to the task function."""
    
    kwargs: Dict[str, Any]
    """Keyword arguments to pass to the task function."""
    
    def __init__(
        self,
        task_name: str,
        cron: Optional[str] = None,
        time: Optional[datetime] = None,
        labels: Optional[Dict[str, Any]] = None,
        args: Optional[Tuple[Any, ...]] = None,
        kwargs: Optional[Dict[str, Any]] = None,
    ) -> None: ...

Schedule Sources

Abstract interface and implementations for providing scheduled tasks to the scheduler.

class ScheduleSource:
    """
    Abstract base class for schedule sources.
    
    Schedule sources provide scheduled tasks to the scheduler
    and can implement custom scheduling logic.
    """
    
    async def startup(self) -> None:
        """Initialize the schedule source."""
    
    async def shutdown(self) -> None:
        """Cleanup the schedule source."""
    
    async def get_schedules(self) -> List[ScheduledTask]:
        """
        Get list of scheduled tasks.
        
        Returns:
            List of scheduled tasks from this source
        """
    
    async def pre_send(self, task: ScheduledTask) -> None:
        """
        Pre-processing hook before task execution.
        
        Can modify task or raise ScheduledTaskCancelledError to cancel.
        
        Args:
            task: Scheduled task about to be executed
            
        Raises:
            ScheduledTaskCancelledError: To cancel task execution
        """
    
    async def post_send(self, task: ScheduledTask) -> None:
        """
        Post-processing hook after task is sent.
        
        Args:
            task: Scheduled task that was sent
        """

class LabelBasedScheduleSource(ScheduleSource):
    """
    Schedule source that discovers tasks based on labels.
    
    Automatically finds tasks with scheduling labels in the broker's
    task registry and creates appropriate scheduled tasks.
    """
    
    def __init__(
        self,
        broker: AsyncBroker,
        schedule_label: str = "schedule",
    ) -> None:
        """
        Initialize label-based schedule source.
        
        Args:
            broker: Broker containing tasks to schedule
            schedule_label: Label name containing schedule information
        """
    
    async def get_schedules(self) -> List[ScheduledTask]:
        """Extract scheduled tasks from broker task registry."""

Usage Examples

Basic Cron Scheduling

import asyncio
from datetime import datetime
from taskiq import InMemoryBroker
from taskiq.scheduler import TaskiqScheduler, ScheduledTask
from taskiq.schedule_sources import LabelBasedScheduleSource

broker = InMemoryBroker()

# Define scheduled tasks using labels
@broker.task(schedule="0 8 * * *")  # Daily at 8 AM
async def daily_report() -> None:
    print(f"Generating daily report at {datetime.now()}")
    # Generate and send report

@broker.task(schedule="*/15 * * * *")  # Every 15 minutes
async def health_check() -> None:
    print(f"Health check at {datetime.now()}")
    # Check system health

# Set up scheduler
schedule_source = LabelBasedScheduleSource(broker)
scheduler = TaskiqScheduler(broker, [schedule_source])

async def run_scheduler():
    await scheduler.startup()
    try:
        await scheduler.run_forever()
    finally:
        await scheduler.shutdown()

# Run scheduler
asyncio.run(run_scheduler())

One-time Scheduled Tasks

from datetime import datetime, timedelta
from taskiq.scheduler import ScheduledTask

# Schedule task for specific time
future_time = datetime.now() + timedelta(hours=2)
scheduled_task = ScheduledTask(
    task_name="my_module:delayed_task",
    time=future_time,
    args=("important_data",),
    kwargs={"priority": "high"},
    labels={"category": "one-time"},
)

# Custom schedule source for one-time tasks
class OneTimeScheduleSource(ScheduleSource):
    def __init__(self):
        self.tasks = []
    
    def add_task(self, task: ScheduledTask):
        self.tasks.append(task)
    
    async def get_schedules(self) -> List[ScheduledTask]:
        return self.tasks
    
    async def post_send(self, task: ScheduledTask) -> None:
        # Remove one-time tasks after execution
        if task.time and task in self.tasks:
            self.tasks.remove(task)

# Use custom source
one_time_source = OneTimeScheduleSource()
one_time_source.add_task(scheduled_task)

scheduler = TaskiqScheduler(
    broker,
    [LabelBasedScheduleSource(broker), one_time_source]
)

Advanced Scheduling with Custom Logic

class ConditionalScheduleSource(ScheduleSource):
    """Schedule source with custom conditions."""
    
    def __init__(self, broker: AsyncBroker):
        self.broker = broker
        self.last_execution = {}
    
    async def get_schedules(self) -> List[ScheduledTask]:
        schedules = []
        
        # Only schedule backup task if it's been more than 6 hours
        last_backup = self.last_execution.get("backup_task")
        if (not last_backup or 
            datetime.now() - last_backup > timedelta(hours=6)):
            schedules.append(ScheduledTask(
                task_name="my_module:backup_data",
                time=datetime.now() + timedelta(minutes=1),
                labels={"type": "maintenance"},
            ))
        
        return schedules
    
    async def pre_send(self, task: ScheduledTask) -> None:
        # Check system load before executing maintenance tasks
        if task.labels.get("type") == "maintenance":
            if await self._system_load_too_high():
                raise ScheduledTaskCancelledError("System load too high")
    
    async def post_send(self, task: ScheduledTask) -> None:
        # Track execution time
        self.last_execution[task.task_name.split(":")[-1]] = datetime.now()
    
    async def _system_load_too_high(self) -> bool:
        # Custom system load check
        import psutil
        return psutil.cpu_percent() > 80.0

Integration with Task Labels

# Define tasks with various scheduling options
@broker.task(
    schedule="0 2 * * 0",  # Weekly on Sunday at 2 AM
    priority="low",
    timeout=3600,  # 1 hour timeout
)
async def weekly_cleanup() -> None:
    """Weekly maintenance task."""
    print("Running weekly cleanup")
    # Cleanup old data

@broker.task(
    schedule="*/5 * * * *",  # Every 5 minutes
    max_retries=3,
    retry_delay=30,
)
async def monitoring_task() -> None:
    """Frequent monitoring task with retry logic."""
    # Monitor system metrics
    pass

# Schedule source automatically discovers these tasks
schedule_source = LabelBasedScheduleSource(
    broker,
    schedule_label="schedule"  # Look for 'schedule' label
)

Types

ScheduledTaskCancelledError = Exception
"""Exception raised to cancel scheduled task execution."""

Cron Expression Format

Taskiq uses standard cron expression format with five fields:

┌───────────── minute (0 - 59)
│ ┌───────────── hour (0 - 23)
│ │ ┌───────────── day of month (1 - 31)
│ │ │ ┌───────────── month (1 - 12)
│ │ │ │ ┌───────────── day of week (0 - 6) (Sunday to Saturday)
│ │ │ │ │
* * * * *

Common examples:

  • "0 0 * * *" - Daily at midnight
  • "30 8 * * 1-5" - Weekdays at 8:30 AM
  • "0 */4 * * *" - Every 4 hours
  • "15 2 1 * *" - First day of month at 2:15 AM
  • "0 9-17 * * 1-5" - Hourly during business hours

Install with Tessl CLI

npx tessl i tessl/pypi-taskiq

docs

brokers.md

events-state.md

exceptions.md

index.md

middleware.md

result-backends.md

scheduling.md

tasks-results.md

tile.json