huey, a little task queue - lightweight task queue library for Python with asynchronous execution and comprehensive task management features
—
Advanced scheduling capabilities including delayed execution, periodic tasks with cron-like syntax, and task pipeline management. These features enable sophisticated task orchestration and time-based automation.
Create tasks that run automatically on a schedule defined by cron-like expressions.
def periodic_task(self, validate_datetime, retries=0, retry_delay=0,
priority=None, context=False, name=None, expires=None, **kwargs):
"""
Decorator to create a periodic task.
Parameters:
- validate_datetime: Function that returns True when task should run
- retries (int): Number of retry attempts (default: 0)
- retry_delay (int): Delay between retries in seconds (default: 0)
- priority (int): Task priority (optional)
- context (bool): Pass task instance as keyword argument (default: False)
- name (str): Custom task name (default: function name)
- expires (datetime/int): Task expiration (optional)
- **kwargs: Additional task configuration
Returns:
TaskWrapper for periodic task
"""Create cron-like schedule validation functions for precise timing control.
def crontab(minute='*', hour='*', day='*', month='*', day_of_week='*', strict=False):
"""
Create a crontab-style schedule validator.
Parameters:
- minute (str): Minute specification (0-59, default: '*')
- hour (str): Hour specification (0-23, default: '*')
- day (str): Day of month specification (1-31, default: '*')
- month (str): Month specification (1-12, default: '*')
- day_of_week (str): Day of week specification (0-6, 0=Sunday, default: '*')
- strict (bool): Raise ValueError for invalid formats (default: False)
Supported formats:
- '*': Every value
- 'n': Specific value
- 'm,n': Multiple values
- 'm-n': Range of values
- '*/n': Every nth value
Returns:
Function that validates datetime against cron pattern
"""Schedule individual tasks for delayed execution.
def schedule(self, args=None, kwargs=None, eta=None, delay=None,
priority=None, retries=None, retry_delay=None, expires=None, id=None):
"""
Schedule a task for future execution.
Parameters:
- args (tuple): Task arguments (optional)
- kwargs (dict): Task keyword arguments (optional)
- eta (datetime): Exact execution time (optional)
- delay (int/float/timedelta): Delay before execution (optional)
- priority (int): Task priority (optional)
- retries (int): Number of retries (optional)
- retry_delay (int): Delay between retries (optional)
- expires (datetime/int): Task expiration (optional)
- id (str): Custom task ID (optional)
Returns:
Result instance
"""
def add_schedule(self, task):
"""
Add a task to the schedule queue.
Parameters:
- task (Task): Task with eta set for future execution
Returns:
None
"""Monitor and manage scheduled tasks.
def read_schedule(self, timestamp=None):
"""
Read tasks ready for execution from schedule.
Parameters:
- timestamp (datetime): Current time (default: now)
Returns:
List of Task instances ready to run
"""
def read_periodic(self, timestamp):
"""
Get periodic tasks that should run at given time.
Parameters:
- timestamp (datetime): Time to check periodic tasks against
Returns:
List of periodic Task instances
"""
def scheduled(self, limit=None):
"""
Get list of scheduled tasks.
Parameters:
- limit (int): Maximum number of tasks to return (optional)
Returns:
List of scheduled Task instances
"""
def scheduled_count(self):
"""
Get the number of scheduled tasks.
Returns:
int: Number of tasks in schedule
"""
def ready_to_run(self, task, timestamp=None):
"""
Check if a task is ready for execution.
Parameters:
- task (Task): Task to check
- timestamp (datetime): Current time (default: now)
Returns:
bool: True if task should run now
"""from huey import RedisHuey, crontab
huey = RedisHuey('scheduler-app')
# Run every day at 2:30 AM
@huey.periodic_task(crontab(minute='30', hour='2'))
def daily_backup():
backup_database()
return "Daily backup completed"
# Run every 15 minutes
@huey.periodic_task(crontab(minute='*/15'))
def health_check():
check_system_health()
return "Health check completed"
# Run on weekdays at 9 AM
@huey.periodic_task(crontab(minute='0', hour='9', day_of_week='1-5'))
def weekday_report():
generate_daily_report()
return "Daily report generated"
# Run on the first day of each month
@huey.periodic_task(crontab(minute='0', hour='0', day='1'))
def monthly_cleanup():
cleanup_old_data()
return "Monthly cleanup completed"import datetime
from huey import RedisHuey
huey = RedisHuey('delayed-tasks')
@huey.task()
def send_reminder(user_id, message):
# Send reminder logic
return f"Reminder sent to user {user_id}: {message}"
# Schedule for specific datetime
eta = datetime.datetime(2024, 1, 15, 14, 30)
result = send_reminder.schedule(
args=(123, "Your appointment is tomorrow"),
eta=eta
)
# Schedule with delay (5 minutes from now)
result = send_reminder.schedule(
args=(456, "Meeting starts in 10 minutes"),
delay=300 # 5 minutes
)
# Schedule with timedelta
result = send_reminder.schedule(
args=(789, "Weekly report due"),
delay=datetime.timedelta(hours=2)
)# Custom schedule validation function
def business_hours(dt):
"""Run only during business hours (9 AM - 5 PM, weekdays)"""
return (dt.hour >= 9 and dt.hour < 17 and
dt.weekday() < 5) # Monday=0, Sunday=6
@huey.periodic_task(business_hours)
def business_hours_task():
process_business_data()
return "Business hours processing complete"
# Multiple schedule patterns
@huey.periodic_task(crontab(minute='0', hour='*/3')) # Every 3 hours
def frequent_sync():
sync_external_data()
return "Data sync completed"
@huey.periodic_task(crontab(minute='0', hour='0', day_of_week='0')) # Weekly on Sunday
def weekly_maintenance():
perform_maintenance()
return "Weekly maintenance completed"@huey.task(context=True)
def process_file(filename, task=None):
# Access task metadata
print(f"Processing {filename} (Task ID: {task.id})")
# Simulate file processing
time.sleep(2)
return f"Processed {filename}"
# Schedule with task context
result = process_file.schedule(
args=("important_data.csv",),
delay=60, # Process in 1 minute
priority=10, # High priority
retries=2, # Retry twice if it fails
retry_delay=30 # Wait 30 seconds between retries
)# Check scheduled tasks
scheduled_tasks = huey.scheduled(limit=10)
print(f"Next {len(scheduled_tasks)} scheduled tasks:")
for task in scheduled_tasks:
print(f"- {task.name} at {task.eta}")
# Check schedule size
print(f"Total scheduled tasks: {huey.scheduled_count()}")
# Read tasks ready to run
ready_tasks = huey.read_schedule()
print(f"Tasks ready to run: {len(ready_tasks)}")Install with Tessl CLI
npx tessl i tessl/pypi-huey