or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdjob-management.mdjob-scheduling.mdscheduler-control.mdutilities.md
tile.json

tessl/pypi-rq-scheduler

Provides job scheduling capabilities to RQ (Redis Queue)

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/rq-scheduler@0.14.x

To install, run

npx @tessl/cli install tessl/pypi-rq-scheduler@0.14.0

index.mddocs/

RQ Scheduler

RQ Scheduler provides job scheduling capabilities to RQ (Redis Queue), enabling developers to schedule jobs to run at specific times, dates, or intervals. It supports one-time scheduled jobs, periodic jobs, and cron-style scheduling patterns with Redis-based persistence and distributed scheduler coordination.

Package Information

  • Package Name: rq-scheduler
  • Language: Python
  • Installation: pip install rq-scheduler

Core Imports

from rq_scheduler import Scheduler

Utility functions:

from rq_scheduler.utils import from_unix, to_unix, get_next_scheduled_time

Basic Usage

from datetime import datetime, timedelta
from redis import Redis
from rq_scheduler import Scheduler

# Create scheduler for default queue
redis_conn = Redis()
scheduler = Scheduler(connection=redis_conn)

# Schedule a job to run at a specific time
def my_job(name):
    print(f"Hello, {name}!")

# Schedule job to run on January 1st, 2025 at 3:00 AM UTC
scheduler.enqueue_at(datetime(2025, 1, 1, 3, 0), my_job, 'World')

# Schedule job to run in 10 minutes
scheduler.enqueue_in(timedelta(minutes=10), my_job, 'Future')

# Schedule periodic job every hour
scheduler.schedule(
    scheduled_time=datetime.utcnow() + timedelta(minutes=5),
    func=my_job,
    args=['Recurring'],
    interval=3600  # Every hour
)

# Run the scheduler daemon
scheduler.run()

Architecture

RQ Scheduler extends RQ's job queue system with time-based scheduling:

  • Scheduler: Main class managing scheduled jobs using Redis sorted sets
  • Redis Storage: Jobs stored by execution time for efficient time-based retrieval
  • Lock Coordination: Distributed locking prevents multiple scheduler instances from conflicting
  • Queue Integration: Scheduled jobs move to RQ queues when execution time arrives

Capabilities

Job Scheduling

Core scheduling methods for one-time and recurring jobs including datetime-based scheduling, time delta scheduling, and cron-style patterns.

def enqueue_at(self, scheduled_time, func, *args, **kwargs):
    """Schedule a job to run at a specific datetime."""

def enqueue_in(self, time_delta, func, *args, **kwargs):
    """Schedule a job to run after a time delay."""

def schedule(self, scheduled_time, func, args=None, kwargs=None, interval=None, repeat=None, **options):
    """Schedule a periodic or repeated job."""

def cron(self, cron_string, func, args=None, kwargs=None, repeat=None, **options):
    """Schedule a job using cron syntax."""

Job Scheduling

Job Management

Functions for managing scheduled jobs including querying, canceling, and modifying execution times.

def cancel(self, job):
    """Cancel a scheduled job by removing it from the scheduler queue."""

def get_jobs(self, until=None, with_times=False, offset=None, length=None):
    """Get scheduled jobs iterator with optional filtering and pagination."""

def count(self, until=None):
    """Count scheduled jobs, optionally filtered by time."""

def change_execution_time(self, job, date_time):
    """Change the execution time of a scheduled job."""

def __contains__(self, item):
    """Check if a job is currently scheduled."""

Job Management

Scheduler Control

Methods for running and controlling the scheduler daemon including locking, heartbeats, and job processing.

def run(self, burst=False):
    """Start the scheduler daemon to process scheduled jobs."""

def acquire_lock(self):
    """Acquire distributed lock for scheduler coordination."""

def remove_lock(self):
    """Release previously acquired distributed lock."""

def heartbeat(self):
    """Send heartbeat to maintain scheduler registration."""

def enqueue_jobs(self):
    """Move all ready scheduled jobs to their target execution queues."""

def enqueue_job(self, job):
    """Move a scheduled job to a queue and handle periodic job rescheduling."""

def get_queue_for_job(self, job):
    """Get the appropriate queue instance for a job."""

Scheduler Control

Utility Functions

Time conversion and scheduling utility functions for working with timestamps and cron expressions.

def from_unix(string):
    """Convert Unix timestamp to UTC datetime object."""

def to_unix(dt):
    """Convert datetime object to Unix timestamp."""

def get_next_scheduled_time(cron_string, use_local_timezone=False):
    """Calculate the next scheduled execution time from a cron expression."""

def rationalize_until(until=None):
    """Process 'until' parameter for time-based queries."""

def setup_loghandlers(level='INFO'):
    """Configure logging for RQ Scheduler with colorized output."""

Utilities

Types

class Scheduler:
    """Main scheduler class for managing scheduled jobs."""
    
    def __init__(self, queue_name='default', queue=None, interval=60, 
                 connection=None, job_class=None, queue_class=None, name=None):
        """
        Initialize scheduler.
        
        Parameters:
        - queue_name: str, name of queue for scheduled jobs
        - queue: RQ Queue instance (optional, overrides queue_name)
        - interval: int, polling interval in seconds
        - connection: Redis connection (required)
        - job_class: Custom job class
        - queue_class: Custom queue class
        - name: Scheduler instance name for coordination
        """

VERSION: tuple = (0, 14, 0)

Console Scripts

The package provides a command-line interface for running scheduler daemons:

# Run scheduler with default settings
rqscheduler

# Redis connection options
rqscheduler --host redis.example.com --port 6380 --db 1
rqscheduler --password mypassword
rqscheduler --url redis://user:pass@host:port/db

# Execution modes
rqscheduler --burst                    # Process all jobs then exit
rqscheduler --interval 30              # Custom polling interval (seconds)

# Logging options
rqscheduler --verbose                  # Debug level logging
rqscheduler --quiet                    # Warning level logging only

# Advanced options
rqscheduler --path /custom/path        # Set import path
rqscheduler --pid /var/run/rq.pid     # Write PID to file
rqscheduler --job-class myapp.CustomJob    # Custom job class
rqscheduler --queue-class myapp.CustomQueue # Custom queue class

# Show all options
rqscheduler --help

Environment Variables:

  • RQ_REDIS_HOST - Redis host (default: localhost)
  • RQ_REDIS_PORT - Redis port (default: 6379)
  • RQ_REDIS_DB - Redis database (default: 0)
  • RQ_REDIS_PASSWORD - Redis password
  • RQ_REDIS_URL - Complete Redis URL (overrides other connection settings)