or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

brokers.mdindex.mdresult-backends.mdschedule-sources.md

index.mddocs/

0

# TaskIQ-Redis

1

2

Redis integration for TaskIQ, providing comprehensive Redis-based message brokers and result backends with support for different Redis architectures (single node, cluster, sentinel). TaskIQ-Redis offers three broker types: PubSub for broadcasting (no acknowledgements), ListQueue for simple queuing (no acknowledgements), and Stream for reliable message processing with acknowledgement support.

3

4

## Package Information

5

6

- **Package Name**: taskiq-redis

7

- **Language**: Python

8

- **Installation**: `pip install taskiq-redis`

9

- **Dependencies**: `taskiq>=0.11.12,<1`, `redis^6`

10

11

## Core Imports

12

13

```python

14

from taskiq_redis import (

15

# Basic Redis brokers

16

PubSubBroker,

17

ListQueueBroker,

18

RedisStreamBroker,

19

20

# Redis Cluster brokers

21

ListQueueClusterBroker,

22

RedisStreamClusterBroker,

23

24

# Redis Sentinel brokers

25

ListQueueSentinelBroker,

26

PubSubSentinelBroker,

27

RedisStreamSentinelBroker,

28

29

# Result backends

30

RedisAsyncResultBackend,

31

RedisAsyncClusterResultBackend,

32

RedisAsyncSentinelResultBackend,

33

34

# Schedule sources

35

ListRedisScheduleSource,

36

RedisScheduleSource,

37

RedisClusterScheduleSource,

38

RedisSentinelScheduleSource

39

)

40

```

41

42

## Basic Usage

43

44

```python

45

import asyncio

46

from taskiq import TaskiqResult, TaskiqMessage

47

from taskiq_redis import RedisStreamBroker, RedisAsyncResultBackend

48

49

# Create broker with result backend

50

broker = RedisStreamBroker(

51

url="redis://localhost:6379",

52

result_backend=RedisAsyncResultBackend("redis://localhost:6379")

53

)

54

55

# Define a task

56

@broker.task

57

async def add_numbers(a: int, b: int) -> int:

58

return a + b

59

60

async def main():

61

# Start the broker

62

await broker.startup()

63

64

# Send task

65

task = await add_numbers.kiq(10, 20)

66

67

# Get result

68

result = await task.wait_result()

69

print(f"Result: {result.return_value}") # Result: 30

70

71

# Shutdown

72

await broker.shutdown()

73

74

# Run the example

75

asyncio.run(main())

76

```

77

78

## Architecture

79

80

TaskIQ-Redis provides a comprehensive Redis integration with three main component categories:

81

82

- **Brokers**: Handle message distribution between producers and consumers

83

- **PubSub**: Broadcast messages to all connected workers (fire-and-forget)

84

- **ListQueue**: Distribute tasks between workers using Redis lists (simple queuing)

85

- **Stream**: Reliable message processing with acknowledgement using Redis streams

86

- **Result Backends**: Store and retrieve task execution results with configurable expiration

87

- **Schedule Sources**: Manage scheduled/recurring tasks with different storage strategies

88

89

Each component type supports three Redis deployment architectures:

90

- **Standard Redis**: Single Redis instance

91

- **Redis Cluster**: Distributed Redis deployment

92

- **Redis Sentinel**: High-availability Redis with automatic failover

93

94

## Capabilities

95

96

### Message Brokers

97

98

Core message brokers that handle task distribution between producers and consumers, supporting different message patterns and Redis deployment types.

99

100

```python { .api }

101

class PubSubBroker(BaseRedisBroker):

102

def __init__(self, url: str, queue_name: str = "taskiq", **kwargs): ...

103

async def kick(self, message: BrokerMessage) -> None: ...

104

async def listen(self) -> AsyncGenerator[bytes, None]: ...

105

106

class ListQueueBroker(BaseRedisBroker):

107

def __init__(self, url: str, queue_name: str = "taskiq", **kwargs): ...

108

async def kick(self, message: BrokerMessage) -> None: ...

109

async def listen(self) -> AsyncGenerator[bytes, None]: ...

110

111

class RedisStreamBroker(BaseRedisBroker):

112

def __init__(

113

self,

114

url: str,

115

queue_name: str = "taskiq",

116

consumer_group_name: str = "taskiq",

117

consumer_name: Optional[str] = None,

118

**kwargs

119

): ...

120

async def startup(self) -> None: ...

121

async def kick(self, message: BrokerMessage) -> None: ...

122

async def listen(self) -> AsyncGenerator[AckableMessage, None]: ...

123

```

124

125

[Message Brokers](./brokers.md)

126

127

### Result Backends

128

129

Async result backends for storing and retrieving task execution results with configurable expiration times and different Redis deployment support.

130

131

```python { .api }

132

class RedisAsyncResultBackend(AsyncResultBackend[_ReturnType]):

133

def __init__(

134

self,

135

redis_url: str,

136

keep_results: bool = True,

137

result_ex_time: Optional[int] = None,

138

result_px_time: Optional[int] = None,

139

**kwargs

140

): ...

141

async def set_result(self, task_id: str, result: TaskiqResult[_ReturnType]) -> None: ...

142

async def get_result(self, task_id: str, with_logs: bool = False) -> TaskiqResult[_ReturnType]: ...

143

async def is_result_ready(self, task_id: str) -> bool: ...

144

async def set_progress(self, task_id: str, progress: TaskProgress[_ReturnType]) -> None: ...

145

async def get_progress(self, task_id: str) -> Union[TaskProgress[_ReturnType], None]: ...

146

```

147

148

[Result Backends](./result-backends.md)

149

150

### Schedule Sources

151

152

Schedule sources for managing scheduled and recurring tasks with different storage strategies and Redis deployment support.

153

154

```python { .api }

155

class ListRedisScheduleSource(ScheduleSource):

156

def __init__(

157

self,

158

url: str,

159

prefix: str = "schedule",

160

buffer_size: int = 50,

161

skip_past_schedules: bool = False,

162

**kwargs

163

): ...

164

async def add_schedule(self, schedule: ScheduledTask) -> None: ...

165

async def get_schedules(self) -> List[ScheduledTask]: ...

166

async def delete_schedule(self, schedule_id: str) -> None: ...

167

def with_migrate_from(self, source: ScheduleSource, delete_schedules: bool = True) -> Self: ...

168

```

169

170

[Schedule Sources](./schedule-sources.md)

171

172

## Exception Classes

173

174

```python { .api }

175

class TaskIQRedisError(TaskiqError):

176

"""Base error for all taskiq-redis exceptions."""

177

178

class DuplicateExpireTimeSelectedError(ResultBackendError, TaskIQRedisError):

179

"""Error if two lifetimes are selected."""

180

181

class ExpireTimeMustBeMoreThanZeroError(ResultBackendError, TaskIQRedisError):

182

"""Error if lifetimes are less or equal zero."""

183

184

class ResultIsMissingError(TaskIQRedisError, ResultGetError):

185

"""Error if there is no result when trying to get it."""

186

```