CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-rq-scheduler

Provides job scheduling capabilities to RQ (Redis Queue)

Pending
Overview
Eval results
Files

RQ Scheduler

RQ Scheduler provides job scheduling capabilities to RQ (Redis Queue), enabling developers to schedule jobs to run at specific times, dates, or intervals. It supports one-time scheduled jobs, periodic jobs, and cron-style scheduling patterns with Redis-based persistence and distributed scheduler coordination.

Package Information

  • Package Name: rq-scheduler
  • Language: Python
  • Installation: pip install rq-scheduler

Core Imports

from rq_scheduler import Scheduler

Utility functions:

from rq_scheduler.utils import from_unix, to_unix, get_next_scheduled_time

Basic Usage

from datetime import datetime, timedelta
from redis import Redis
from rq_scheduler import Scheduler

# Create scheduler for default queue
redis_conn = Redis()
scheduler = Scheduler(connection=redis_conn)

# Schedule a job to run at a specific time
def my_job(name):
    print(f"Hello, {name}!")

# Schedule job to run on January 1st, 2025 at 3:00 AM UTC
scheduler.enqueue_at(datetime(2025, 1, 1, 3, 0), my_job, 'World')

# Schedule job to run in 10 minutes
scheduler.enqueue_in(timedelta(minutes=10), my_job, 'Future')

# Schedule periodic job every hour
scheduler.schedule(
    scheduled_time=datetime.utcnow() + timedelta(minutes=5),
    func=my_job,
    args=['Recurring'],
    interval=3600  # Every hour
)

# Run the scheduler daemon
scheduler.run()

Architecture

RQ Scheduler extends RQ's job queue system with time-based scheduling:

  • Scheduler: Main class managing scheduled jobs using Redis sorted sets
  • Redis Storage: Jobs stored by execution time for efficient time-based retrieval
  • Lock Coordination: Distributed locking prevents multiple scheduler instances from conflicting
  • Queue Integration: Scheduled jobs move to RQ queues when execution time arrives

Capabilities

Job Scheduling

Core scheduling methods for one-time and recurring jobs including datetime-based scheduling, time delta scheduling, and cron-style patterns.

def enqueue_at(self, scheduled_time, func, *args, **kwargs):
    """Schedule a job to run at a specific datetime."""

def enqueue_in(self, time_delta, func, *args, **kwargs):
    """Schedule a job to run after a time delay."""

def schedule(self, scheduled_time, func, args=None, kwargs=None, interval=None, repeat=None, **options):
    """Schedule a periodic or repeated job."""

def cron(self, cron_string, func, args=None, kwargs=None, repeat=None, **options):
    """Schedule a job using cron syntax."""

Job Scheduling

Job Management

Functions for managing scheduled jobs including querying, canceling, and modifying execution times.

def cancel(self, job):
    """Cancel a scheduled job by removing it from the scheduler queue."""

def get_jobs(self, until=None, with_times=False, offset=None, length=None):
    """Get scheduled jobs iterator with optional filtering and pagination."""

def count(self, until=None):
    """Count scheduled jobs, optionally filtered by time."""

def change_execution_time(self, job, date_time):
    """Change the execution time of a scheduled job."""

def __contains__(self, item):
    """Check if a job is currently scheduled."""

Job Management

Scheduler Control

Methods for running and controlling the scheduler daemon including locking, heartbeats, and job processing.

def run(self, burst=False):
    """Start the scheduler daemon to process scheduled jobs."""

def acquire_lock(self):
    """Acquire distributed lock for scheduler coordination."""

def remove_lock(self):
    """Release previously acquired distributed lock."""

def heartbeat(self):
    """Send heartbeat to maintain scheduler registration."""

def enqueue_jobs(self):
    """Move all ready scheduled jobs to their target execution queues."""

def enqueue_job(self, job):
    """Move a scheduled job to a queue and handle periodic job rescheduling."""

def get_queue_for_job(self, job):
    """Get the appropriate queue instance for a job."""

Scheduler Control

Utility Functions

Time conversion and scheduling utility functions for working with timestamps and cron expressions.

def from_unix(string):
    """Convert Unix timestamp to UTC datetime object."""

def to_unix(dt):
    """Convert datetime object to Unix timestamp."""

def get_next_scheduled_time(cron_string, use_local_timezone=False):
    """Calculate the next scheduled execution time from a cron expression."""

def rationalize_until(until=None):
    """Process 'until' parameter for time-based queries."""

def setup_loghandlers(level='INFO'):
    """Configure logging for RQ Scheduler with colorized output."""

Utilities

Types

class Scheduler:
    """Main scheduler class for managing scheduled jobs."""
    
    def __init__(self, queue_name='default', queue=None, interval=60, 
                 connection=None, job_class=None, queue_class=None, name=None):
        """
        Initialize scheduler.
        
        Parameters:
        - queue_name: str, name of queue for scheduled jobs
        - queue: RQ Queue instance (optional, overrides queue_name)
        - interval: int, polling interval in seconds
        - connection: Redis connection (required)
        - job_class: Custom job class
        - queue_class: Custom queue class
        - name: Scheduler instance name for coordination
        """

VERSION: tuple = (0, 14, 0)

Console Scripts

The package provides a command-line interface for running scheduler daemons:

# Run scheduler with default settings
rqscheduler

# Redis connection options
rqscheduler --host redis.example.com --port 6380 --db 1
rqscheduler --password mypassword
rqscheduler --url redis://user:pass@host:port/db

# Execution modes
rqscheduler --burst                    # Process all jobs then exit
rqscheduler --interval 30              # Custom polling interval (seconds)

# Logging options
rqscheduler --verbose                  # Debug level logging
rqscheduler --quiet                    # Warning level logging only

# Advanced options
rqscheduler --path /custom/path        # Set import path
rqscheduler --pid /var/run/rq.pid     # Write PID to file
rqscheduler --job-class myapp.CustomJob    # Custom job class
rqscheduler --queue-class myapp.CustomQueue # Custom queue class

# Show all options
rqscheduler --help

Environment Variables:

  • RQ_REDIS_HOST - Redis host (default: localhost)
  • RQ_REDIS_PORT - Redis port (default: 6379)
  • RQ_REDIS_DB - Redis database (default: 0)
  • RQ_REDIS_PASSWORD - Redis password
  • RQ_REDIS_URL - Complete Redis URL (overrides other connection settings)

Install with Tessl CLI

npx tessl i tessl/pypi-rq-scheduler
Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/rq-scheduler@0.14.x