CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-rq-scheduler

Provides job scheduling capabilities to RQ (Redis Queue)

Pending
Overview
Eval results
Files

scheduler-control.mddocs/

Scheduler Control

RQ Scheduler provides methods for running and controlling the scheduler daemon process. These functions handle distributed coordination, job processing, and lifecycle management for scheduler instances.

Capabilities

Scheduler Daemon

Run the scheduler daemon to continuously monitor scheduled jobs and move them to execution queues when their time arrives.

def run(self, burst=False):
    """
    Start the scheduler daemon to process scheduled jobs.
    
    Parameters:
    - burst: bool, if True run once and exit, if False run continuously
    
    Returns:
    None (runs until interrupted or burst mode completes)
    
    Behavior:
    - Registers scheduler birth and installs signal handlers
    - Runs polling loop at configured interval
    - Acquires distributed lock before processing jobs
    - Automatically handles scheduler cleanup on exit
    - Supports graceful shutdown via SIGINT/SIGTERM
    """

Usage Examples:

from rq_scheduler import Scheduler
from redis import Redis

# Basic daemon mode (runs forever)
scheduler = Scheduler(connection=Redis())
scheduler.run()  # Blocks until interrupted

# Burst mode (process all ready jobs then exit)
scheduler = Scheduler(connection=Redis())
scheduler.run(burst=True)

# Custom polling interval
scheduler = Scheduler(connection=Redis(), interval=30)  # Check every 30 seconds
scheduler.run()

# With signal handling in a script
import signal
import sys

def signal_handler(sig, frame):
    print('Shutting down scheduler...')
    sys.exit(0)

signal.signal(signal.SIGINT, signal_handler)
scheduler.run()

Distributed Locking

Coordinate multiple scheduler instances to prevent duplicate job processing using Redis-based distributed locking.

def acquire_lock(self):
    """
    Acquire distributed lock for scheduler coordination.
    
    Returns:
    bool, True if lock acquired successfully, False if another scheduler holds it
    
    Behavior:
    - Lock expires automatically after interval + 10 seconds
    - Only one scheduler instance can hold lock at a time
    - Lock prevents duplicate job processing across instances
    """

def remove_lock(self):
    """
    Release previously acquired distributed lock.
    
    Returns:
    None
    
    Behavior:
    - Only removes lock if this instance acquired it
    - Safe to call even if lock not held
    - Automatically called during scheduler shutdown
    """

Usage Examples:

# Manual lock management for custom scheduler logic
scheduler = Scheduler(connection=Redis())

if scheduler.acquire_lock():
    try:
        # Process jobs while holding lock
        jobs_processed = scheduler.enqueue_jobs()
        print(f"Processed {len(jobs_processed)} jobs")
    finally:
        scheduler.remove_lock()
else:
    print("Another scheduler is active")

# Check multiple schedulers coordination
scheduler1 = Scheduler(connection=Redis(), name="scheduler-1")
scheduler2 = Scheduler(connection=Redis(), name="scheduler-2")

print(f"Scheduler 1 lock: {scheduler1.acquire_lock()}")  # True
print(f"Scheduler 2 lock: {scheduler2.acquire_lock()}")  # False

scheduler1.remove_lock()
print(f"Scheduler 2 lock: {scheduler2.acquire_lock()}")  # True

Job Processing

Move scheduled jobs that are ready for execution into RQ queues.

def enqueue_jobs(self):
    """
    Move all ready scheduled jobs to their target execution queues.
    
    Returns:
    list, Job instances that were moved to queues
    
    Behavior:
    - Only processes jobs with scheduled time <= current time
    - Handles periodic job rescheduling automatically
    - Manages cron job next execution calculation
    - Decrements repeat counters for limited repetition jobs
    """

Usage Examples:

# Manual job processing (useful for testing or custom logic)
scheduler = Scheduler(connection=Redis())

# Process ready jobs once
processed_jobs = scheduler.enqueue_jobs()
print(f"Moved {len(processed_jobs)} jobs to execution queues")

for job in processed_jobs:
    print(f"Processed job: {job.description} -> queue: {job.origin}")

# Custom processing loop with additional logic
import time
from datetime import datetime

while True:
    if scheduler.acquire_lock():
        try:
            jobs = scheduler.enqueue_jobs()
            if jobs:
                print(f"{datetime.now()}: Processed {len(jobs)} jobs")
            
            # Custom business logic here
            check_scheduler_health()
            
        finally:
            scheduler.remove_lock()
    
    time.sleep(60)  # Wait 1 minute

Individual Job Processing

Process single scheduled jobs and manage queue assignment for jobs.

def enqueue_job(self, job):
    """
    Move a scheduled job to a queue and handle periodic job rescheduling.
    
    Parameters:
    - job: Job instance to process
    
    Returns:
    None
    
    Behavior:
    - Moves job from scheduler queue to appropriate execution queue
    - Handles periodic job rescheduling automatically
    - Manages cron job next execution calculation
    - Decrements repeat counters for limited repetition jobs
    """

def get_queue_for_job(self, job):
    """
    Get the appropriate queue instance for a job.
    
    Parameters:
    - job: Job instance to get queue for
    
    Returns:
    Queue instance where job should be executed
    
    Behavior:
    - Uses job.origin to determine target queue
    - Respects custom queue_class from job metadata
    - Creates queue instance with proper connection and job_class
    """

Usage Examples:

# Manual job processing with custom logic
scheduler = Scheduler(connection=Redis())

# Get specific jobs and process individually
for job in scheduler.get_jobs_to_queue():
    # Custom validation before processing
    if validate_job_conditions(job):
        target_queue = scheduler.get_queue_for_job(job)
        print(f"Processing job {job.id} to queue {target_queue.name}")
        scheduler.enqueue_job(job)
    else:
        print(f"Skipping job {job.id} - conditions not met")

# Custom queue routing logic
def custom_job_processor(scheduler, job):
    """Process job with custom queue selection."""
    if job.meta.get('priority') == 'high':
        # High priority jobs go to fast queue
        job.origin = 'high_priority'
    
    queue = scheduler.get_queue_for_job(job)
    scheduler.enqueue_job(job)
    return queue

# Process jobs with custom routing
ready_jobs = scheduler.get_jobs_to_queue()
for job in ready_jobs:
    queue = custom_job_processor(scheduler, job)
    print(f"Job {job.id} sent to {queue.name}")

Scheduler Heartbeat

Maintain scheduler registration and prevent timeout in distributed setups.

def heartbeat(self):
    """
    Send heartbeat to maintain scheduler registration.
    
    Returns:
    None
    
    Behavior:
    - Extends scheduler key expiration time
    - Prevents scheduler from appearing inactive
    - Called automatically during run() loop
    """

Usage Examples:

# Manual heartbeat for custom scheduler loops
scheduler = Scheduler(connection=Redis())
scheduler.register_birth()

try:
    while True:
        scheduler.heartbeat()  # Keep scheduler registered
        
        # Custom processing logic
        if custom_condition():
            process_special_jobs()
        
        time.sleep(30)
        
finally:
    scheduler.register_death()

Instance Registration

Manage scheduler instance lifecycle for distributed coordination and monitoring.

def register_birth(self):
    """
    Register scheduler instance startup.
    
    Returns:
    None
    
    Raises:
    ValueError: if scheduler with same name already active
    
    Behavior:
    - Creates scheduler instance key in Redis
    - Sets automatic expiration based on polling interval
    - Prevents duplicate scheduler names
    """

def register_death(self):
    """
    Register scheduler instance shutdown.
    
    Returns:
    None
    
    Behavior:
    - Marks scheduler as inactive in Redis
    - Allows other schedulers to detect shutdown
    - Called automatically during graceful shutdown
    """

Usage Examples:

# Manual instance lifecycle management
scheduler = Scheduler(connection=Redis(), name="worker-1")

try:
    scheduler.register_birth()
    print("Scheduler registered successfully")
    
    # Run scheduler logic
    while running:
        process_jobs()
        
except ValueError as e:
    print(f"Registration failed: {e}")
    # Handle duplicate scheduler name
    
finally:
    scheduler.register_death()
    print("Scheduler shutdown registered")

# Check for existing schedulers before starting
import uuid

unique_name = f"scheduler-{uuid.uuid4().hex[:8]}"
scheduler = Scheduler(connection=Redis(), name=unique_name)

try:
    scheduler.register_birth()
    scheduler.run()
except ValueError:
    print("Scheduler name conflict - using burst mode instead")
    scheduler.run(burst=True)

Scheduler Properties

Access scheduler instance information and configuration:

@property
def key(self):
    """
    Returns the scheduler's Redis hash key.
    
    Returns:
    str, Redis key for this scheduler instance
    """

@property 
def pid(self):
    """
    Returns the current process ID.
    
    Returns:
    int, process ID of scheduler
    """

Usage Examples:

scheduler = Scheduler(connection=Redis(), name="main-scheduler")

print(f"Scheduler key: {scheduler.key}")
print(f"Process ID: {scheduler.pid}")

# Useful for monitoring and debugging
import os
print(f"Scheduler {scheduler.name} running as PID {scheduler.pid}")
assert scheduler.pid == os.getpid()

Signal Handling

The scheduler automatically installs signal handlers for graceful shutdown:

  • SIGINT (Ctrl+C): Triggers clean shutdown sequence
  • SIGTERM: Triggers clean shutdown sequence

Shutdown Sequence:

  1. Stop polling loop
  2. Release distributed lock
  3. Register scheduler death
  4. Exit process

Custom Signal Handling:

import signal
import sys

def custom_shutdown(signum, frame):
    print(f"Received signal {signum}")
    scheduler.remove_lock()
    scheduler.register_death()
    print("Custom cleanup completed")
    sys.exit(0)

# Override default handlers if needed
signal.signal(signal.SIGINT, custom_shutdown)
signal.signal(signal.SIGTERM, custom_shutdown)

scheduler.run()  # Will use custom handlers

Install with Tessl CLI

npx tessl i tessl/pypi-rq-scheduler

docs

index.md

job-management.md

job-scheduling.md

scheduler-control.md

utilities.md

tile.json