CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-huey

huey, a little task queue - lightweight task queue library for Python with asynchronous execution and comprehensive task management features

Pending
Overview
Eval results
Files

scheduling.mddocs/

Task Scheduling and Periodic Tasks

Advanced scheduling capabilities including delayed execution, periodic tasks with cron-like syntax, and task pipeline management. These features enable sophisticated task orchestration and time-based automation.

Capabilities

Periodic Task Creation

Create tasks that run automatically on a schedule defined by cron-like expressions.

def periodic_task(self, validate_datetime, retries=0, retry_delay=0,
                  priority=None, context=False, name=None, expires=None, **kwargs):
    """
    Decorator to create a periodic task.
    
    Parameters:
    - validate_datetime: Function that returns True when task should run
    - retries (int): Number of retry attempts (default: 0)
    - retry_delay (int): Delay between retries in seconds (default: 0)
    - priority (int): Task priority (optional)
    - context (bool): Pass task instance as keyword argument (default: False)
    - name (str): Custom task name (default: function name)
    - expires (datetime/int): Task expiration (optional)
    - **kwargs: Additional task configuration
    
    Returns:
    TaskWrapper for periodic task
    """

Cron-style Scheduling

Create cron-like schedule validation functions for precise timing control.

def crontab(minute='*', hour='*', day='*', month='*', day_of_week='*', strict=False):
    """
    Create a crontab-style schedule validator.
    
    Parameters:
    - minute (str): Minute specification (0-59, default: '*')
    - hour (str): Hour specification (0-23, default: '*')
    - day (str): Day of month specification (1-31, default: '*')
    - month (str): Month specification (1-12, default: '*')
    - day_of_week (str): Day of week specification (0-6, 0=Sunday, default: '*')
    - strict (bool): Raise ValueError for invalid formats (default: False)
    
    Supported formats:
    - '*': Every value
    - 'n': Specific value
    - 'm,n': Multiple values
    - 'm-n': Range of values
    - '*/n': Every nth value
    
    Returns:
    Function that validates datetime against cron pattern
    """

Task Scheduling

Schedule individual tasks for delayed execution.

def schedule(self, args=None, kwargs=None, eta=None, delay=None,
             priority=None, retries=None, retry_delay=None, expires=None, id=None):
    """
    Schedule a task for future execution.
    
    Parameters:
    - args (tuple): Task arguments (optional)
    - kwargs (dict): Task keyword arguments (optional)
    - eta (datetime): Exact execution time (optional)
    - delay (int/float/timedelta): Delay before execution (optional)
    - priority (int): Task priority (optional)
    - retries (int): Number of retries (optional)
    - retry_delay (int): Delay between retries (optional)
    - expires (datetime/int): Task expiration (optional)
    - id (str): Custom task ID (optional)
    
    Returns:
    Result instance
    """

def add_schedule(self, task):
    """
    Add a task to the schedule queue.
    
    Parameters:
    - task (Task): Task with eta set for future execution
    
    Returns:
    None
    """

Schedule Management

Monitor and manage scheduled tasks.

def read_schedule(self, timestamp=None):
    """
    Read tasks ready for execution from schedule.
    
    Parameters:
    - timestamp (datetime): Current time (default: now)
    
    Returns:
    List of Task instances ready to run
    """

def read_periodic(self, timestamp):
    """
    Get periodic tasks that should run at given time.
    
    Parameters:
    - timestamp (datetime): Time to check periodic tasks against
    
    Returns:
    List of periodic Task instances
    """

def scheduled(self, limit=None):
    """
    Get list of scheduled tasks.
    
    Parameters:
    - limit (int): Maximum number of tasks to return (optional)
    
    Returns:
    List of scheduled Task instances
    """

def scheduled_count(self):
    """
    Get the number of scheduled tasks.
    
    Returns:
    int: Number of tasks in schedule
    """

def ready_to_run(self, task, timestamp=None):
    """
    Check if a task is ready for execution.
    
    Parameters:
    - task (Task): Task to check
    - timestamp (datetime): Current time (default: now)
    
    Returns:
    bool: True if task should run now
    """

Usage Examples

Periodic Tasks with Cron Schedules

from huey import RedisHuey, crontab

huey = RedisHuey('scheduler-app')

# Run every day at 2:30 AM
@huey.periodic_task(crontab(minute='30', hour='2'))
def daily_backup():
    backup_database()
    return "Daily backup completed"

# Run every 15 minutes
@huey.periodic_task(crontab(minute='*/15'))
def health_check():
    check_system_health()
    return "Health check completed"

# Run on weekdays at 9 AM
@huey.periodic_task(crontab(minute='0', hour='9', day_of_week='1-5'))
def weekday_report():
    generate_daily_report()
    return "Daily report generated"

# Run on the first day of each month
@huey.periodic_task(crontab(minute='0', hour='0', day='1'))
def monthly_cleanup():
    cleanup_old_data()
    return "Monthly cleanup completed"

Delayed Task Execution

import datetime
from huey import RedisHuey

huey = RedisHuey('delayed-tasks')

@huey.task()
def send_reminder(user_id, message):
    # Send reminder logic
    return f"Reminder sent to user {user_id}: {message}"

# Schedule for specific datetime
eta = datetime.datetime(2024, 1, 15, 14, 30)
result = send_reminder.schedule(
    args=(123, "Your appointment is tomorrow"),
    eta=eta
)

# Schedule with delay (5 minutes from now)
result = send_reminder.schedule(
    args=(456, "Meeting starts in 10 minutes"),
    delay=300  # 5 minutes
)

# Schedule with timedelta
result = send_reminder.schedule(
    args=(789, "Weekly report due"),
    delay=datetime.timedelta(hours=2)
)

Complex Scheduling Patterns

# Custom schedule validation function
def business_hours(dt):
    """Run only during business hours (9 AM - 5 PM, weekdays)"""
    return (dt.hour >= 9 and dt.hour < 17 and 
            dt.weekday() < 5)  # Monday=0, Sunday=6

@huey.periodic_task(business_hours)
def business_hours_task():
    process_business_data()
    return "Business hours processing complete"

# Multiple schedule patterns
@huey.periodic_task(crontab(minute='0', hour='*/3'))  # Every 3 hours
def frequent_sync():
    sync_external_data()
    return "Data sync completed"

@huey.periodic_task(crontab(minute='0', hour='0', day_of_week='0'))  # Weekly on Sunday
def weekly_maintenance():
    perform_maintenance()
    return "Weekly maintenance completed"

Task Scheduling with Context

@huey.task(context=True)
def process_file(filename, task=None):
    # Access task metadata
    print(f"Processing {filename} (Task ID: {task.id})")
    
    # Simulate file processing
    time.sleep(2)
    return f"Processed {filename}"

# Schedule with task context
result = process_file.schedule(
    args=("important_data.csv",),
    delay=60,  # Process in 1 minute
    priority=10,  # High priority
    retries=2,  # Retry twice if it fails
    retry_delay=30  # Wait 30 seconds between retries
)

Schedule Monitoring

# Check scheduled tasks
scheduled_tasks = huey.scheduled(limit=10)
print(f"Next {len(scheduled_tasks)} scheduled tasks:")
for task in scheduled_tasks:
    print(f"- {task.name} at {task.eta}")

# Check schedule size
print(f"Total scheduled tasks: {huey.scheduled_count()}")

# Read tasks ready to run
ready_tasks = huey.read_schedule()
print(f"Tasks ready to run: {len(ready_tasks)}")

Install with Tessl CLI

npx tessl i tessl/pypi-huey

docs

core-task-queue.md

exception-handling.md

index.md

locking-concurrency.md

result-management.md

scheduling.md

storage-backends.md

task-lifecycle.md

tile.json