Redis integration for taskiq with brokers and result backends supporting single node, cluster, and sentinel deployments
TaskIQ-Redis provides schedule sources for managing scheduled and recurring tasks with different storage strategies and Redis deployment support. Schedule sources handle task scheduling, execution timing, and cleanup operations.
Array-based schedule source that provides efficient scheduling with migration support. This is the recommended replacement for the deprecated hash-based RedisScheduleSource.
class ListRedisScheduleSource(ScheduleSource):
def __init__(
self,
url: str,
prefix: str = "schedule",
max_connection_pool_size: Optional[int] = None,
serializer: Optional[TaskiqSerializer] = None,
buffer_size: int = 50,
skip_past_schedules: bool = False,
**connection_kwargs: Any,
) -> None:
"""
Array-based schedule source for Redis.
Parameters:
- url: Redis connection URL
- prefix: Prefix for Redis keys (default: "schedule")
- max_connection_pool_size: Maximum connections in pool
- serializer: Custom serializer (default: PickleSerializer)
- buffer_size: Buffer size for retrieving schedules (default: 50)
- skip_past_schedules: Skip schedules in the past (default: False)
- connection_kwargs: Additional Redis connection arguments
"""
async def startup(self) -> None:
"""Initialize the schedule source."""
async def add_schedule(self, schedule: ScheduledTask) -> None:
"""
Add a scheduled task.
Parameters:
- schedule: Scheduled task to add
"""
async def get_schedules(self) -> List[ScheduledTask]:
"""
Retrieve all scheduled tasks.
Returns:
- List[ScheduledTask]: All currently scheduled tasks
"""
async def delete_schedule(self, schedule_id: str) -> None:
"""
Remove a scheduled task.
Parameters:
- schedule_id: Unique identifier of schedule to remove
"""
async def post_send(self, task: ScheduledTask) -> None:
"""
Clean up after task execution.
Parameters:
- task: Task that was executed
"""
def with_migrate_from(
self,
source: ScheduleSource,
delete_schedules: bool = True
) -> Self:
"""
Enable migration from another schedule source.
Parameters:
- source: Source schedule source to migrate from
- delete_schedules: Delete schedules from source after migration
Returns:
- Self: Schedule source with migration enabled
"""Usage Example:
from taskiq_redis import ListRedisScheduleSource
from taskiq import ScheduledTask
from datetime import datetime, timedelta
# Create schedule source
schedule_source = ListRedisScheduleSource(
url="redis://localhost:6379",
prefix="my_schedules",
buffer_size=100
)
# Add a scheduled task
schedule = ScheduledTask(
task_id="daily-report",
task_name="generate_report",
schedule_time=datetime.now() + timedelta(hours=24),
args=["daily"],
kwargs={"format": "pdf"}
)
await schedule_source.add_schedule(schedule)
# Get all scheduled tasks
schedules = await schedule_source.get_schedules()
print(f"Found {len(schedules)} scheduled tasks")
# Delete a schedule
await schedule_source.delete_schedule("daily-report")Migrate from the deprecated RedisScheduleSource to the recommended ListRedisScheduleSource:
from taskiq_redis import ListRedisScheduleSource, RedisScheduleSource
# Old deprecated schedule source
old_source = RedisScheduleSource("redis://localhost:6379")
# New recommended schedule source with migration
new_source = ListRedisScheduleSource(
url="redis://localhost:6379"
).with_migrate_from(old_source, delete_schedules=True)
# Migration will happen automatically during startup
await new_source.startup()Hash-based schedule source (deprecated, use ListRedisScheduleSource instead).
class RedisScheduleSource(ScheduleSource):
def __init__(
self,
url: str,
prefix: str = "schedule",
buffer_size: int = 50,
max_connection_pool_size: Optional[int] = None,
serializer: Optional[TaskiqSerializer] = None,
**connection_kwargs: Any,
) -> None:
"""
Hash-based schedule source for Redis (DEPRECATED).
Use ListRedisScheduleSource instead.
"""
async def delete_schedule(self, schedule_id: str) -> None:
"""Remove a scheduled task."""
async def add_schedule(self, schedule: ScheduledTask) -> None:
"""Add a scheduled task."""
async def get_schedules(self) -> List[ScheduledTask]:
"""Retrieve all scheduled tasks."""
async def post_send(self, task: ScheduledTask) -> None:
"""Clean up after task execution."""
async def shutdown(self) -> None:
"""Shut down the schedule source."""Schedule source for Redis Cluster deployments.
class RedisClusterScheduleSource(ScheduleSource):
def __init__(
self,
url: str,
prefix: str = "schedule",
serializer: Optional[TaskiqSerializer] = None,
**connection_kwargs: Any,
) -> None:
"""
Schedule source for Redis Cluster.
Parameters:
- url: Redis cluster connection URL
- prefix: Prefix for Redis keys (default: "schedule")
- serializer: Custom serializer (default: PickleSerializer)
- connection_kwargs: Additional Redis cluster connection arguments
"""
async def delete_schedule(self, schedule_id: str) -> None:
"""Remove a scheduled task from Redis cluster."""
async def add_schedule(self, schedule: ScheduledTask) -> None:
"""Add a scheduled task to Redis cluster."""
async def get_schedules(self) -> List[ScheduledTask]:
"""Retrieve all scheduled tasks from Redis cluster."""
async def post_send(self, task: ScheduledTask) -> None:
"""Clean up after task execution in Redis cluster."""
async def shutdown(self) -> None:
"""Shut down the cluster schedule source."""Usage Example:
from taskiq_redis import RedisClusterScheduleSource
# Create cluster schedule source
schedule_source = RedisClusterScheduleSource(
url="redis://cluster-node1:6379",
prefix="cluster_schedules"
)
# Use same API as standard schedule source
await schedule_source.add_schedule(schedule)
schedules = await schedule_source.get_schedules()Schedule source for Redis Sentinel deployments with high availability.
class RedisSentinelScheduleSource(ScheduleSource):
def __init__(
self,
sentinels: List[Tuple[str, int]],
master_name: str,
prefix: str = "schedule",
buffer_size: int = 50,
serializer: Optional[TaskiqSerializer] = None,
min_other_sentinels: int = 0,
sentinel_kwargs: Optional[Any] = None,
**connection_kwargs: Any,
) -> None:
"""
Schedule source for Redis Sentinel.
Parameters:
- sentinels: List of sentinel (host, port) pairs
- master_name: Sentinel master name
- prefix: Prefix for Redis keys (default: "schedule")
- buffer_size: Buffer size for retrieving schedules (default: 50)
- serializer: Custom serializer (default: PickleSerializer)
- min_other_sentinels: Minimum other sentinels required (default: 0)
- sentinel_kwargs: Additional sentinel configuration
- connection_kwargs: Additional Redis connection arguments
"""
async def delete_schedule(self, schedule_id: str) -> None:
"""Remove a scheduled task via Sentinel."""
async def add_schedule(self, schedule: ScheduledTask) -> None:
"""Add a scheduled task via Sentinel."""
async def get_schedules(self) -> List[ScheduledTask]:
"""Retrieve all scheduled tasks via Sentinel."""
async def post_send(self, task: ScheduledTask) -> None:
"""Clean up after task execution via Sentinel."""
async def shutdown(self) -> None:
"""Shut down the Sentinel schedule source."""Usage Example:
from taskiq_redis import RedisSentinelScheduleSource
# Create high-availability schedule source
schedule_source = RedisSentinelScheduleSource(
sentinels=[
("sentinel1", 26379),
("sentinel2", 26379),
("sentinel3", 26379)
],
master_name="mymaster",
prefix="ha_schedules",
min_other_sentinels=1
)
# Use same API with automatic failover
await schedule_source.add_schedule(schedule)
schedules = await schedule_source.get_schedules()from taskiq_redis import ListRedisScheduleSource
from taskiq import ScheduledTask
from datetime import datetime, timedelta
schedule_source = ListRedisScheduleSource("redis://localhost:6379")
# Schedule task to run in 1 hour
future_time = datetime.now() + timedelta(hours=1)
schedule = ScheduledTask(
task_id="one-time-report",
task_name="generate_report",
schedule_time=future_time,
args=["monthly"],
kwargs={"format": "excel"}
)
await schedule_source.add_schedule(schedule)from datetime import datetime, timedelta
# Schedule daily backups
async def schedule_daily_backup():
tomorrow = datetime.now() + timedelta(days=1)
tomorrow = tomorrow.replace(hour=2, minute=0, second=0, microsecond=0)
schedule = ScheduledTask(
task_id=f"backup-{tomorrow.strftime('%Y%m%d')}",
task_name="create_backup",
schedule_time=tomorrow,
args=["full"],
kwargs={"compress": True}
)
await schedule_source.add_schedule(schedule)
# Add initial schedule
await schedule_daily_backup()from taskiq_redis import ListRedisScheduleSource
schedule_source = ListRedisScheduleSource("redis://localhost:6379")
# Get all pending schedules
all_schedules = await schedule_source.get_schedules()
print(f"Total schedules: {len(all_schedules)}")
# Find specific schedule
target_schedule = None
for schedule in all_schedules:
if schedule.task_name == "generate_report":
target_schedule = schedule
break
if target_schedule:
# Update schedule (delete and re-add with new time)
await schedule_source.delete_schedule(target_schedule.task_id)
# Reschedule for later
target_schedule.schedule_time = datetime.now() + timedelta(hours=2)
await schedule_source.add_schedule(target_schedule)
# Clean up old schedules
now = datetime.now()
for schedule in all_schedules:
if schedule.schedule_time < now - timedelta(days=7):
await schedule_source.delete_schedule(schedule.task_id)from typing import List, Optional, Any, Tuple
from taskiq.abc.schedule_source import ScheduleSource
from taskiq.abc.serializer import TaskiqSerializer
from taskiq import ScheduledTaskInstall with Tessl CLI
npx tessl i tessl/pypi-taskiq-redis