Provides job scheduling capabilities to RQ (Redis Queue)
—
RQ Scheduler provides utility functions for time conversion, cron expression handling, and logging configuration. These functions support the scheduler's time-based operations and provide tools for working with scheduling data.
Convert between datetime objects and Unix timestamps for Redis storage and time calculations.
def from_unix(string):
"""
Convert Unix timestamp to UTC datetime object.
Parameters:
- string: str or numeric, Unix timestamp (seconds since epoch)
Returns:
datetime, UTC datetime object
Note:
- Input can be string or numeric type
- Always returns UTC datetime regardless of local timezone
- Used internally by scheduler for time deserialization
"""
def to_unix(dt):
"""
Convert datetime object to Unix timestamp.
Parameters:
- dt: datetime, datetime object to convert
Returns:
int, Unix timestamp (seconds since epoch)
Note:
- Assumes input datetime is UTC if timezone-naive
- Uses calendar.timegm for UTC conversion
- Used internally by scheduler for time serialization
"""Usage Examples:
from datetime import datetime
from rq_scheduler.utils import from_unix, to_unix
# Convert datetime to timestamp
dt = datetime(2025, 6, 15, 14, 30, 0)
timestamp = to_unix(dt)
print(f"Timestamp: {timestamp}") # 1750262200
# Convert timestamp back to datetime
restored_dt = from_unix(timestamp)
print(f"DateTime: {restored_dt}") # 2025-06-15 14:30:00
# Working with scheduler internals
restored_dt2 = from_unix("1750262200") # String input works too
assert restored_dt == restored_dt2
# Timezone handling
import pytz
utc_dt = datetime(2025, 6, 15, 14, 30, 0, tzinfo=pytz.UTC)
naive_dt = datetime(2025, 6, 15, 14, 30, 0)
# Both produce same timestamp (treated as UTC)
assert to_unix(utc_dt) == to_unix(naive_dt)Calculate next execution times for cron-style job scheduling.
def get_next_scheduled_time(cron_string, use_local_timezone=False):
"""
Calculate the next scheduled execution time from a cron expression.
Parameters:
- cron_string: str, cron expression (minute hour day month weekday)
- use_local_timezone: bool, use local timezone instead of UTC
Returns:
datetime, next execution time as timezone-aware datetime
Note:
- Uses crontab library for cron parsing
- Returns timezone-aware datetime object
- UTC timezone used by default for consistency
- Local timezone option for user-facing scheduling
"""Usage Examples:
from rq_scheduler.utils import get_next_scheduled_time
from datetime import datetime
# Every day at 2:30 AM UTC
next_time = get_next_scheduled_time("30 2 * * *")
print(f"Next execution: {next_time}")
# Every weekday at 9 AM local time
next_local = get_next_scheduled_time("0 9 * * 1-5", use_local_timezone=True)
print(f"Next weekday 9 AM: {next_local}")
# Every 15 minutes
next_quarter = get_next_scheduled_time("*/15 * * * *")
print(f"Next 15-minute mark: {next_quarter}")
# Complex scheduling - first Monday of each month at noon
next_complex = get_next_scheduled_time("0 12 1-7 * 1")
print(f"Next first Monday: {next_complex}")
# Working with scheduler
from rq_scheduler import Scheduler
from redis import Redis
scheduler = Scheduler(connection=Redis())
# Schedule using calculated time
cron_expr = "0 0 * * 0" # Every Sunday at midnight
next_sunday = get_next_scheduled_time(cron_expr)
scheduler.enqueue_at(next_sunday, weekly_cleanup)Process and normalize time parameters for scheduler queries.
def rationalize_until(until=None):
"""
Process 'until' parameter for time-based queries.
Parameters:
- until: None, datetime, timedelta, or numeric - time constraint
Returns:
str or numeric, normalized time value for Redis queries
Behavior:
- None -> "+inf" (no time limit)
- datetime -> Unix timestamp
- timedelta -> Unix timestamp (current time + delta)
- numeric -> passed through unchanged
Note:
- Used internally by count() and get_jobs() methods
- Handles various time input formats consistently
"""Usage Examples:
from datetime import datetime, timedelta
from rq_scheduler.utils import rationalize_until
# Various input types
print(rationalize_until(None)) # "+inf"
print(rationalize_until(datetime(2025, 6, 15))) # Unix timestamp
print(rationalize_until(timedelta(hours=1))) # Current time + 1 hour timestamp
print(rationalize_until(1750262400)) # 1750262400 (unchanged)
# Usage in custom queries (internal scheduler pattern)
from rq_scheduler import Scheduler
from redis import Redis
scheduler = Scheduler(connection=Redis())
# These calls use rationalize_until internally:
scheduler.count(until=datetime(2025, 12, 31))
scheduler.count(until=timedelta(days=7))
scheduler.count(until=None) # All jobs
# Manual usage for custom Redis queries
until_value = rationalize_until(timedelta(hours=2))
# Use until_value in custom Redis ZRANGEBYSCORE operationsSet up logging handlers optimized for scheduler operations.
def setup_loghandlers(level='INFO'):
"""
Configure logging for RQ Scheduler with colorized output.
Parameters:
- level: str, logging level ('DEBUG', 'INFO', 'WARNING', 'ERROR')
Returns:
None
Behavior:
- Configures 'rq_scheduler.scheduler' logger
- Uses ColorizingStreamHandler for colored console output
- Sets timestamp format optimized for scheduler monitoring
- Idempotent - safe to call multiple times
"""Usage Examples:
from rq_scheduler.utils import setup_loghandlers
# Basic setup with INFO level
setup_loghandlers()
# Debug level for troubleshooting
setup_loghandlers('DEBUG')
# Warning level for production
setup_loghandlers('WARNING')
# Integration with scheduler
from rq_scheduler import Scheduler
from redis import Redis
# Setup logging before creating scheduler
setup_loghandlers('DEBUG')
scheduler = Scheduler(connection=Redis())
scheduler.run() # Will output detailed logs
# Custom logging configuration
import logging
from rq_scheduler.utils import setup_loghandlers
# Setup scheduler logging first
setup_loghandlers('INFO')
# Add custom application logging
app_logger = logging.getLogger('myapp')
app_logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(message)s'))
app_logger.addHandler(handler)
# Both loggers will work together
scheduler = Scheduler(connection=Redis())
app_logger.info("Starting scheduler")
scheduler.run()These utilities are used internally by the scheduler but can be helpful for custom scheduling logic:
from datetime import datetime, timedelta
from rq_scheduler import Scheduler
from rq_scheduler.utils import to_unix, from_unix, get_next_scheduled_time
from redis import Redis
# Custom scheduler with utility functions
class CustomScheduler(Scheduler):
def schedule_with_validation(self, cron_expr, func, *args, **kwargs):
"""Schedule job with cron validation and logging."""
# Validate cron expression
try:
next_time = get_next_scheduled_time(cron_expr)
print(f"Job will first run at: {next_time}")
except Exception as e:
raise ValueError(f"Invalid cron expression: {e}")
# Schedule the job
return self.cron(cron_expr, func, *args, **kwargs)
def get_job_timestamps(self):
"""Get all scheduled jobs with Unix timestamps."""
jobs_with_times = self.get_jobs(with_times=True)
return [(job, to_unix(scheduled_time)) for job, scheduled_time in jobs_with_times]
# Usage
scheduler = CustomScheduler(connection=Redis())
# Use custom methods with utilities
job = scheduler.schedule_with_validation("0 */6 * * *", cleanup_task)
timestamps = scheduler.get_job_timestamps()
for job, timestamp in timestamps:
dt = from_unix(timestamp)
print(f"Job {job.id} scheduled for {dt}")Install with Tessl CLI
npx tessl i tessl/pypi-rq-scheduler