CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-taskiq-redis

Redis integration for taskiq with brokers and result backends supporting single node, cluster, and sentinel deployments

Overview
Eval results
Files

schedule-sources.mddocs/

Schedule Sources

TaskIQ-Redis provides schedule sources for managing scheduled and recurring tasks with different storage strategies and Redis deployment support. Schedule sources handle task scheduling, execution timing, and cleanup operations.

Capabilities

List Redis Schedule Source (Recommended)

Array-based schedule source that provides efficient scheduling with migration support. This is the recommended replacement for the deprecated hash-based RedisScheduleSource.

class ListRedisScheduleSource(ScheduleSource):
    def __init__(
        self,
        url: str,
        prefix: str = "schedule",
        max_connection_pool_size: Optional[int] = None,
        serializer: Optional[TaskiqSerializer] = None,
        buffer_size: int = 50,
        skip_past_schedules: bool = False,
        **connection_kwargs: Any,
    ) -> None:
        """
        Array-based schedule source for Redis.
        
        Parameters:
        - url: Redis connection URL
        - prefix: Prefix for Redis keys (default: "schedule")
        - max_connection_pool_size: Maximum connections in pool
        - serializer: Custom serializer (default: PickleSerializer)
        - buffer_size: Buffer size for retrieving schedules (default: 50)
        - skip_past_schedules: Skip schedules in the past (default: False)
        - connection_kwargs: Additional Redis connection arguments
        """

    async def startup(self) -> None:
        """Initialize the schedule source."""

    async def add_schedule(self, schedule: ScheduledTask) -> None:
        """
        Add a scheduled task.
        
        Parameters:
        - schedule: Scheduled task to add
        """

    async def get_schedules(self) -> List[ScheduledTask]:
        """
        Retrieve all scheduled tasks.
        
        Returns:
        - List[ScheduledTask]: All currently scheduled tasks
        """

    async def delete_schedule(self, schedule_id: str) -> None:
        """
        Remove a scheduled task.
        
        Parameters:
        - schedule_id: Unique identifier of schedule to remove
        """

    async def post_send(self, task: ScheduledTask) -> None:
        """
        Clean up after task execution.
        
        Parameters:
        - task: Task that was executed
        """

    def with_migrate_from(
        self, 
        source: ScheduleSource, 
        delete_schedules: bool = True
    ) -> Self:
        """
        Enable migration from another schedule source.
        
        Parameters:
        - source: Source schedule source to migrate from
        - delete_schedules: Delete schedules from source after migration
        
        Returns:
        - Self: Schedule source with migration enabled
        """

Usage Example:

from taskiq_redis import ListRedisScheduleSource
from taskiq import ScheduledTask
from datetime import datetime, timedelta

# Create schedule source
schedule_source = ListRedisScheduleSource(
    url="redis://localhost:6379",
    prefix="my_schedules",
    buffer_size=100
)

# Add a scheduled task
schedule = ScheduledTask(
    task_id="daily-report",
    task_name="generate_report",
    schedule_time=datetime.now() + timedelta(hours=24),
    args=["daily"],
    kwargs={"format": "pdf"}
)

await schedule_source.add_schedule(schedule)

# Get all scheduled tasks
schedules = await schedule_source.get_schedules()
print(f"Found {len(schedules)} scheduled tasks")

# Delete a schedule
await schedule_source.delete_schedule("daily-report")

Migration from Deprecated Schedule Source

Migrate from the deprecated RedisScheduleSource to the recommended ListRedisScheduleSource:

from taskiq_redis import ListRedisScheduleSource, RedisScheduleSource

# Old deprecated schedule source
old_source = RedisScheduleSource("redis://localhost:6379")

# New recommended schedule source with migration
new_source = ListRedisScheduleSource(
    url="redis://localhost:6379"
).with_migrate_from(old_source, delete_schedules=True)

# Migration will happen automatically during startup
await new_source.startup()

Redis Schedule Source (Deprecated)

Hash-based schedule source (deprecated, use ListRedisScheduleSource instead).

class RedisScheduleSource(ScheduleSource):
    def __init__(
        self,
        url: str,
        prefix: str = "schedule",
        buffer_size: int = 50,
        max_connection_pool_size: Optional[int] = None,
        serializer: Optional[TaskiqSerializer] = None,
        **connection_kwargs: Any,
    ) -> None:
        """
        Hash-based schedule source for Redis (DEPRECATED).
        
        Use ListRedisScheduleSource instead.
        """

    async def delete_schedule(self, schedule_id: str) -> None:
        """Remove a scheduled task."""

    async def add_schedule(self, schedule: ScheduledTask) -> None:
        """Add a scheduled task."""

    async def get_schedules(self) -> List[ScheduledTask]:
        """Retrieve all scheduled tasks."""

    async def post_send(self, task: ScheduledTask) -> None:
        """Clean up after task execution."""

    async def shutdown(self) -> None:
        """Shut down the schedule source."""

Redis Cluster Schedule Source

Schedule source for Redis Cluster deployments.

class RedisClusterScheduleSource(ScheduleSource):
    def __init__(
        self,
        url: str,
        prefix: str = "schedule",
        serializer: Optional[TaskiqSerializer] = None,
        **connection_kwargs: Any,
    ) -> None:
        """
        Schedule source for Redis Cluster.
        
        Parameters:
        - url: Redis cluster connection URL
        - prefix: Prefix for Redis keys (default: "schedule")
        - serializer: Custom serializer (default: PickleSerializer)
        - connection_kwargs: Additional Redis cluster connection arguments
        """

    async def delete_schedule(self, schedule_id: str) -> None:
        """Remove a scheduled task from Redis cluster."""

    async def add_schedule(self, schedule: ScheduledTask) -> None:
        """Add a scheduled task to Redis cluster."""

    async def get_schedules(self) -> List[ScheduledTask]:
        """Retrieve all scheduled tasks from Redis cluster."""

    async def post_send(self, task: ScheduledTask) -> None:
        """Clean up after task execution in Redis cluster."""

    async def shutdown(self) -> None:
        """Shut down the cluster schedule source."""

Usage Example:

from taskiq_redis import RedisClusterScheduleSource

# Create cluster schedule source
schedule_source = RedisClusterScheduleSource(
    url="redis://cluster-node1:6379",
    prefix="cluster_schedules"
)

# Use same API as standard schedule source
await schedule_source.add_schedule(schedule)
schedules = await schedule_source.get_schedules()

Redis Sentinel Schedule Source

Schedule source for Redis Sentinel deployments with high availability.

class RedisSentinelScheduleSource(ScheduleSource):
    def __init__(
        self,
        sentinels: List[Tuple[str, int]],
        master_name: str,
        prefix: str = "schedule",
        buffer_size: int = 50,
        serializer: Optional[TaskiqSerializer] = None,
        min_other_sentinels: int = 0,
        sentinel_kwargs: Optional[Any] = None,
        **connection_kwargs: Any,
    ) -> None:
        """
        Schedule source for Redis Sentinel.
        
        Parameters:
        - sentinels: List of sentinel (host, port) pairs
        - master_name: Sentinel master name
        - prefix: Prefix for Redis keys (default: "schedule")
        - buffer_size: Buffer size for retrieving schedules (default: 50)
        - serializer: Custom serializer (default: PickleSerializer)
        - min_other_sentinels: Minimum other sentinels required (default: 0)
        - sentinel_kwargs: Additional sentinel configuration
        - connection_kwargs: Additional Redis connection arguments
        """

    async def delete_schedule(self, schedule_id: str) -> None:
        """Remove a scheduled task via Sentinel."""

    async def add_schedule(self, schedule: ScheduledTask) -> None:
        """Add a scheduled task via Sentinel."""

    async def get_schedules(self) -> List[ScheduledTask]:
        """Retrieve all scheduled tasks via Sentinel."""

    async def post_send(self, task: ScheduledTask) -> None:
        """Clean up after task execution via Sentinel."""

    async def shutdown(self) -> None:
        """Shut down the Sentinel schedule source."""

Usage Example:

from taskiq_redis import RedisSentinelScheduleSource

# Create high-availability schedule source
schedule_source = RedisSentinelScheduleSource(
    sentinels=[
        ("sentinel1", 26379),
        ("sentinel2", 26379),
        ("sentinel3", 26379)
    ],
    master_name="mymaster",
    prefix="ha_schedules",
    min_other_sentinels=1
)

# Use same API with automatic failover
await schedule_source.add_schedule(schedule)
schedules = await schedule_source.get_schedules()

Scheduling Patterns

One-time Scheduled Tasks

from taskiq_redis import ListRedisScheduleSource
from taskiq import ScheduledTask
from datetime import datetime, timedelta

schedule_source = ListRedisScheduleSource("redis://localhost:6379")

# Schedule task to run in 1 hour
future_time = datetime.now() + timedelta(hours=1)
schedule = ScheduledTask(
    task_id="one-time-report",
    task_name="generate_report",
    schedule_time=future_time,
    args=["monthly"],
    kwargs={"format": "excel"}
)

await schedule_source.add_schedule(schedule)

Recurring Tasks with Custom Logic

from datetime import datetime, timedelta

# Schedule daily backups
async def schedule_daily_backup():
    tomorrow = datetime.now() + timedelta(days=1)
    tomorrow = tomorrow.replace(hour=2, minute=0, second=0, microsecond=0)
    
    schedule = ScheduledTask(
        task_id=f"backup-{tomorrow.strftime('%Y%m%d')}",
        task_name="create_backup",
        schedule_time=tomorrow,
        args=["full"],
        kwargs={"compress": True}
    )
    
    await schedule_source.add_schedule(schedule)

# Add initial schedule
await schedule_daily_backup()

Schedule Management

from taskiq_redis import ListRedisScheduleSource

schedule_source = ListRedisScheduleSource("redis://localhost:6379")

# Get all pending schedules
all_schedules = await schedule_source.get_schedules()
print(f"Total schedules: {len(all_schedules)}")

# Find specific schedule
target_schedule = None
for schedule in all_schedules:
    if schedule.task_name == "generate_report":
        target_schedule = schedule
        break

if target_schedule:
    # Update schedule (delete and re-add with new time)
    await schedule_source.delete_schedule(target_schedule.task_id)
    
    # Reschedule for later
    target_schedule.schedule_time = datetime.now() + timedelta(hours=2)
    await schedule_source.add_schedule(target_schedule)

# Clean up old schedules
now = datetime.now()
for schedule in all_schedules:
    if schedule.schedule_time < now - timedelta(days=7):
        await schedule_source.delete_schedule(schedule.task_id)

Types

from typing import List, Optional, Any, Tuple
from taskiq.abc.schedule_source import ScheduleSource
from taskiq.abc.serializer import TaskiqSerializer
from taskiq import ScheduledTask

Install with Tessl CLI

npx tessl i tessl/pypi-taskiq-redis

docs

brokers.md

index.md

result-backends.md

schedule-sources.md

tile.json