CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-rq-scheduler

Provides job scheduling capabilities to RQ (Redis Queue)

Pending
Overview
Eval results
Files

utilities.mddocs/

Utilities

RQ Scheduler provides utility functions for time conversion, cron expression handling, and logging configuration. These functions support the scheduler's time-based operations and provide tools for working with scheduling data.

Capabilities

Time Conversion

Convert between datetime objects and Unix timestamps for Redis storage and time calculations.

def from_unix(string):
    """
    Convert Unix timestamp to UTC datetime object.
    
    Parameters:
    - string: str or numeric, Unix timestamp (seconds since epoch)
    
    Returns:
    datetime, UTC datetime object
    
    Note:
    - Input can be string or numeric type
    - Always returns UTC datetime regardless of local timezone
    - Used internally by scheduler for time deserialization
    """

def to_unix(dt):
    """
    Convert datetime object to Unix timestamp.
    
    Parameters:
    - dt: datetime, datetime object to convert
    
    Returns:
    int, Unix timestamp (seconds since epoch)
    
    Note:
    - Assumes input datetime is UTC if timezone-naive
    - Uses calendar.timegm for UTC conversion
    - Used internally by scheduler for time serialization
    """

Usage Examples:

from datetime import datetime
from rq_scheduler.utils import from_unix, to_unix

# Convert datetime to timestamp
dt = datetime(2025, 6, 15, 14, 30, 0)
timestamp = to_unix(dt)
print(f"Timestamp: {timestamp}")  # 1750262200

# Convert timestamp back to datetime  
restored_dt = from_unix(timestamp)
print(f"DateTime: {restored_dt}")  # 2025-06-15 14:30:00

# Working with scheduler internals
restored_dt2 = from_unix("1750262200")  # String input works too
assert restored_dt == restored_dt2

# Timezone handling
import pytz
utc_dt = datetime(2025, 6, 15, 14, 30, 0, tzinfo=pytz.UTC)
naive_dt = datetime(2025, 6, 15, 14, 30, 0)

# Both produce same timestamp (treated as UTC)
assert to_unix(utc_dt) == to_unix(naive_dt)

Cron Expression Processing

Calculate next execution times for cron-style job scheduling.

def get_next_scheduled_time(cron_string, use_local_timezone=False):
    """
    Calculate the next scheduled execution time from a cron expression.
    
    Parameters:
    - cron_string: str, cron expression (minute hour day month weekday)
    - use_local_timezone: bool, use local timezone instead of UTC
    
    Returns:
    datetime, next execution time as timezone-aware datetime
    
    Note:
    - Uses crontab library for cron parsing
    - Returns timezone-aware datetime object
    - UTC timezone used by default for consistency
    - Local timezone option for user-facing scheduling
    """

Usage Examples:

from rq_scheduler.utils import get_next_scheduled_time
from datetime import datetime

# Every day at 2:30 AM UTC
next_time = get_next_scheduled_time("30 2 * * *")
print(f"Next execution: {next_time}")

# Every weekday at 9 AM local time
next_local = get_next_scheduled_time("0 9 * * 1-5", use_local_timezone=True)
print(f"Next weekday 9 AM: {next_local}")

# Every 15 minutes
next_quarter = get_next_scheduled_time("*/15 * * * *")
print(f"Next 15-minute mark: {next_quarter}")

# Complex scheduling - first Monday of each month at noon
next_complex = get_next_scheduled_time("0 12 1-7 * 1")
print(f"Next first Monday: {next_complex}")

# Working with scheduler
from rq_scheduler import Scheduler
from redis import Redis

scheduler = Scheduler(connection=Redis())

# Schedule using calculated time
cron_expr = "0 0 * * 0"  # Every Sunday at midnight
next_sunday = get_next_scheduled_time(cron_expr)
scheduler.enqueue_at(next_sunday, weekly_cleanup)

Time Parameter Processing

Process and normalize time parameters for scheduler queries.

def rationalize_until(until=None):
    """
    Process 'until' parameter for time-based queries.
    
    Parameters:
    - until: None, datetime, timedelta, or numeric - time constraint
    
    Returns:
    str or numeric, normalized time value for Redis queries
    
    Behavior:
    - None -> "+inf" (no time limit)
    - datetime -> Unix timestamp
    - timedelta -> Unix timestamp (current time + delta)
    - numeric -> passed through unchanged
    
    Note:
    - Used internally by count() and get_jobs() methods
    - Handles various time input formats consistently
    """

Usage Examples:

from datetime import datetime, timedelta
from rq_scheduler.utils import rationalize_until

# Various input types
print(rationalize_until(None))  # "+inf"
print(rationalize_until(datetime(2025, 6, 15)))  # Unix timestamp
print(rationalize_until(timedelta(hours=1)))  # Current time + 1 hour timestamp
print(rationalize_until(1750262400))  # 1750262400 (unchanged)

# Usage in custom queries (internal scheduler pattern)
from rq_scheduler import Scheduler
from redis import Redis

scheduler = Scheduler(connection=Redis())

# These calls use rationalize_until internally:
scheduler.count(until=datetime(2025, 12, 31))
scheduler.count(until=timedelta(days=7))
scheduler.count(until=None)  # All jobs

# Manual usage for custom Redis queries
until_value = rationalize_until(timedelta(hours=2))
# Use until_value in custom Redis ZRANGEBYSCORE operations

Logging Configuration

Set up logging handlers optimized for scheduler operations.

def setup_loghandlers(level='INFO'):
    """
    Configure logging for RQ Scheduler with colorized output.
    
    Parameters:
    - level: str, logging level ('DEBUG', 'INFO', 'WARNING', 'ERROR')
    
    Returns:
    None
    
    Behavior:
    - Configures 'rq_scheduler.scheduler' logger
    - Uses ColorizingStreamHandler for colored console output
    - Sets timestamp format optimized for scheduler monitoring
    - Idempotent - safe to call multiple times
    """

Usage Examples:

from rq_scheduler.utils import setup_loghandlers

# Basic setup with INFO level
setup_loghandlers()

# Debug level for troubleshooting
setup_loghandlers('DEBUG')

# Warning level for production
setup_loghandlers('WARNING')

# Integration with scheduler
from rq_scheduler import Scheduler
from redis import Redis

# Setup logging before creating scheduler
setup_loghandlers('DEBUG')

scheduler = Scheduler(connection=Redis())
scheduler.run()  # Will output detailed logs

# Custom logging configuration
import logging
from rq_scheduler.utils import setup_loghandlers

# Setup scheduler logging first
setup_loghandlers('INFO')

# Add custom application logging
app_logger = logging.getLogger('myapp')
app_logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(message)s'))
app_logger.addHandler(handler)

# Both loggers will work together
scheduler = Scheduler(connection=Redis())
app_logger.info("Starting scheduler")
scheduler.run()

Integration with Scheduler Internals

These utilities are used internally by the scheduler but can be helpful for custom scheduling logic:

from datetime import datetime, timedelta
from rq_scheduler import Scheduler
from rq_scheduler.utils import to_unix, from_unix, get_next_scheduled_time
from redis import Redis

# Custom scheduler with utility functions
class CustomScheduler(Scheduler):
    def schedule_with_validation(self, cron_expr, func, *args, **kwargs):
        """Schedule job with cron validation and logging."""
        
        # Validate cron expression
        try:
            next_time = get_next_scheduled_time(cron_expr)
            print(f"Job will first run at: {next_time}")
        except Exception as e:
            raise ValueError(f"Invalid cron expression: {e}")
        
        # Schedule the job
        return self.cron(cron_expr, func, *args, **kwargs)
    
    def get_job_timestamps(self):
        """Get all scheduled jobs with Unix timestamps."""
        jobs_with_times = self.get_jobs(with_times=True)
        return [(job, to_unix(scheduled_time)) for job, scheduled_time in jobs_with_times]

# Usage
scheduler = CustomScheduler(connection=Redis())

# Use custom methods with utilities
job = scheduler.schedule_with_validation("0 */6 * * *", cleanup_task)
timestamps = scheduler.get_job_timestamps()

for job, timestamp in timestamps:
    dt = from_unix(timestamp)
    print(f"Job {job.id} scheduled for {dt}")

Install with Tessl CLI

npx tessl i tessl/pypi-rq-scheduler

docs

index.md

job-management.md

job-scheduling.md

scheduler-control.md

utilities.md

tile.json