or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

brokers.mdindex.mdresult-backends.mdschedule-sources.md

result-backends.mddocs/

0

# Result Backends

1

2

TaskIQ-Redis provides async result backends for storing and retrieving task execution results with configurable expiration times. Each backend supports different Redis deployment architectures while maintaining the same API interface.

3

4

## Capabilities

5

6

### Standard Redis Result Backend

7

8

Async result backend for single Redis instance deployments.

9

10

```python { .api }

11

class RedisAsyncResultBackend(AsyncResultBackend[_ReturnType]):

12

def __init__(

13

self,

14

redis_url: str,

15

keep_results: bool = True,

16

result_ex_time: Optional[int] = None,

17

result_px_time: Optional[int] = None,

18

max_connection_pool_size: Optional[int] = None,

19

serializer: Optional[TaskiqSerializer] = None,

20

prefix_str: Optional[str] = None,

21

**connection_kwargs: Any,

22

) -> None:

23

"""

24

Redis async result backend.

25

26

Parameters:

27

- redis_url: Redis connection URL

28

- keep_results: Don't remove results after reading (default: True)

29

- result_ex_time: Result expiration time in seconds

30

- result_px_time: Result expiration time in milliseconds

31

- max_connection_pool_size: Maximum connections in pool

32

- serializer: Custom serializer (default: PickleSerializer)

33

- prefix_str: Prefix for Redis keys

34

- connection_kwargs: Additional Redis connection arguments

35

36

Raises:

37

- DuplicateExpireTimeSelectedError: If both ex_time and px_time specified

38

- ExpireTimeMustBeMoreThanZeroError: If expiration time <= 0

39

"""

40

41

async def shutdown(self) -> None:

42

"""Close Redis connection pool."""

43

44

async def set_result(

45

self,

46

task_id: str,

47

result: TaskiqResult[_ReturnType]

48

) -> None:

49

"""

50

Store task result in Redis.

51

52

Parameters:

53

- task_id: Unique task identifier

54

- result: Task execution result to store

55

"""

56

57

async def is_result_ready(self, task_id: str) -> bool:

58

"""

59

Check if task result is available.

60

61

Parameters:

62

- task_id: Unique task identifier

63

64

Returns:

65

- bool: True if result is ready, False otherwise

66

"""

67

68

async def get_result(

69

self,

70

task_id: str,

71

with_logs: bool = False

72

) -> TaskiqResult[_ReturnType]:

73

"""

74

Retrieve task result from Redis.

75

76

Parameters:

77

- task_id: Unique task identifier

78

- with_logs: Include execution logs in result (default: False)

79

80

Returns:

81

- TaskiqResult: Task execution result

82

83

Raises:

84

- ResultIsMissingError: If result not found

85

"""

86

87

async def set_progress(

88

self,

89

task_id: str,

90

progress: TaskProgress[_ReturnType]

91

) -> None:

92

"""

93

Store task progress information.

94

95

Parameters:

96

- task_id: Unique task identifier

97

- progress: Task progress information

98

"""

99

100

async def get_progress(

101

self,

102

task_id: str

103

) -> Union[TaskProgress[_ReturnType], None]:

104

"""

105

Retrieve task progress information.

106

107

Parameters:

108

- task_id: Unique task identifier

109

110

Returns:

111

- TaskProgress or None: Progress information if available

112

"""

113

```

114

115

**Usage Example:**

116

117

```python

118

from taskiq_redis import RedisAsyncResultBackend, RedisStreamBroker

119

120

# Create result backend with 1 hour expiration

121

backend = RedisAsyncResultBackend(

122

redis_url="redis://localhost:6379",

123

result_ex_time=3600, # 1 hour in seconds

124

keep_results=True

125

)

126

127

# Use with broker

128

broker = RedisStreamBroker(

129

url="redis://localhost:6379",

130

result_backend=backend

131

)

132

133

@broker.task

134

async def compute_task(x: int, y: int) -> int:

135

return x * y

136

137

# Execute task and get result

138

task = await compute_task.kiq(10, 20)

139

result = await task.wait_result()

140

print(f"Result: {result.return_value}") # Result: 200

141

142

# Check progress (if task supports it)

143

progress = await backend.get_progress(task.task_id)

144

if progress:

145

print(f"Progress: {progress.progress}%")

146

```

147

148

### Redis Cluster Result Backend

149

150

Async result backend for Redis Cluster deployments.

151

152

```python { .api }

153

class RedisAsyncClusterResultBackend(AsyncResultBackend[_ReturnType]):

154

def __init__(

155

self,

156

redis_url: str,

157

keep_results: bool = True,

158

result_ex_time: Optional[int] = None,

159

result_px_time: Optional[int] = None,

160

serializer: Optional[TaskiqSerializer] = None,

161

prefix_str: Optional[str] = None,

162

**connection_kwargs: Any,

163

) -> None:

164

"""

165

Redis Cluster async result backend.

166

167

Parameters similar to RedisAsyncResultBackend but without

168

max_connection_pool_size (managed by Redis Cluster client).

169

"""

170

171

async def shutdown(self) -> None:

172

"""Close Redis cluster connection."""

173

174

async def set_result(

175

self,

176

task_id: str,

177

result: TaskiqResult[_ReturnType]

178

) -> None:

179

"""Store task result in Redis cluster."""

180

181

async def is_result_ready(self, task_id: str) -> bool:

182

"""Check if task result is available in Redis cluster."""

183

184

async def get_result(

185

self,

186

task_id: str,

187

with_logs: bool = False

188

) -> TaskiqResult[_ReturnType]:

189

"""Retrieve task result from Redis cluster."""

190

191

async def set_progress(

192

self,

193

task_id: str,

194

progress: TaskProgress[_ReturnType]

195

) -> None:

196

"""Store task progress in Redis cluster."""

197

198

async def get_progress(

199

self,

200

task_id: str

201

) -> Union[TaskProgress[_ReturnType], None]:

202

"""Retrieve task progress from Redis cluster."""

203

```

204

205

**Usage Example:**

206

207

```python

208

from taskiq_redis import RedisAsyncClusterResultBackend, RedisStreamClusterBroker

209

210

# Create cluster result backend

211

backend = RedisAsyncClusterResultBackend(

212

redis_url="redis://cluster-node1:6379",

213

result_ex_time=7200 # 2 hours

214

)

215

216

# Use with cluster broker

217

broker = RedisStreamClusterBroker(

218

url="redis://cluster-node1:6379",

219

result_backend=backend

220

)

221

```

222

223

### Redis Sentinel Result Backend

224

225

Async result backend for Redis Sentinel deployments with high availability.

226

227

```python { .api }

228

class RedisAsyncSentinelResultBackend(AsyncResultBackend[_ReturnType]):

229

def __init__(

230

self,

231

sentinels: List[Tuple[str, int]],

232

master_name: str,

233

keep_results: bool = True,

234

result_ex_time: Optional[int] = None,

235

result_px_time: Optional[int] = None,

236

min_other_sentinels: int = 0,

237

sentinel_kwargs: Optional[Any] = None,

238

serializer: Optional[TaskiqSerializer] = None,

239

prefix_str: Optional[str] = None,

240

**connection_kwargs: Any,

241

) -> None:

242

"""

243

Redis Sentinel async result backend.

244

245

Parameters:

246

- sentinels: List of sentinel (host, port) pairs

247

- master_name: Sentinel master name

248

- keep_results: Don't remove results after reading (default: True)

249

- result_ex_time: Result expiration time in seconds

250

- result_px_time: Result expiration time in milliseconds

251

- min_other_sentinels: Minimum other sentinels required (default: 0)

252

- sentinel_kwargs: Additional sentinel configuration

253

- serializer: Custom serializer (default: PickleSerializer)

254

- prefix_str: Prefix for Redis keys

255

- connection_kwargs: Additional Redis connection arguments

256

"""

257

258

async def shutdown(self) -> None:

259

"""Close Redis sentinel connection."""

260

261

async def set_result(

262

self,

263

task_id: str,

264

result: TaskiqResult[_ReturnType]

265

) -> None:

266

"""Store task result in Redis via Sentinel."""

267

268

async def is_result_ready(self, task_id: str) -> bool:

269

"""Check if task result is available via Sentinel."""

270

271

async def get_result(

272

self,

273

task_id: str,

274

with_logs: bool = False

275

) -> TaskiqResult[_ReturnType]:

276

"""Retrieve task result from Redis via Sentinel."""

277

278

async def set_progress(

279

self,

280

task_id: str,

281

progress: TaskProgress[_ReturnType]

282

) -> None:

283

"""Store task progress via Sentinel."""

284

285

async def get_progress(

286

self,

287

task_id: str

288

) -> Union[TaskProgress[_ReturnType], None]:

289

"""Retrieve task progress via Sentinel."""

290

```

291

292

**Usage Example:**

293

294

```python

295

from taskiq_redis import RedisAsyncSentinelResultBackend, RedisStreamSentinelBroker

296

297

# Create high-availability result backend

298

backend = RedisAsyncSentinelResultBackend(

299

sentinels=[

300

("sentinel1", 26379),

301

("sentinel2", 26379),

302

("sentinel3", 26379)

303

],

304

master_name="mymaster",

305

result_ex_time=1800, # 30 minutes

306

min_other_sentinels=1

307

)

308

309

# Use with sentinel broker

310

broker = RedisStreamSentinelBroker(

311

sentinels=[("sentinel1", 26379), ("sentinel2", 26379)],

312

master_name="mymaster",

313

result_backend=backend

314

)

315

316

@broker.task

317

async def important_task(data: dict) -> dict:

318

# Critical task with HA storage

319

return {"processed": data, "success": True}

320

```

321

322

## Result Management

323

324

### Expiration Strategies

325

326

Results can be configured to expire automatically:

327

328

```python

329

# Expire after 1 hour (3600 seconds)

330

backend = RedisAsyncResultBackend(

331

redis_url="redis://localhost:6379",

332

result_ex_time=3600

333

)

334

335

# Expire after 30 minutes (1800000 milliseconds)

336

backend = RedisAsyncResultBackend(

337

redis_url="redis://localhost:6379",

338

result_px_time=1800000

339

)

340

341

# Keep results indefinitely

342

backend = RedisAsyncResultBackend(

343

redis_url="redis://localhost:6379",

344

keep_results=True

345

)

346

```

347

348

### Progress Tracking

349

350

Tasks can report progress during execution:

351

352

```python

353

from taskiq_redis import RedisAsyncResultBackend

354

from taskiq.depends.progress_tracker import TaskProgress

355

356

backend = RedisAsyncResultBackend("redis://localhost:6379")

357

358

@broker.task

359

async def long_running_task(items: List[str]) -> List[str]:

360

results = []

361

total = len(items)

362

363

for i, item in enumerate(items):

364

# Process item

365

processed = await process_item(item)

366

results.append(processed)

367

368

# Update progress

369

progress = TaskProgress(

370

progress=int((i + 1) / total * 100),

371

message=f"Processed {i + 1}/{total} items"

372

)

373

await backend.set_progress(task.task_id, progress)

374

375

return results

376

377

# Monitor progress

378

task = await long_running_task.kiq(["item1", "item2", "item3"])

379

while not await backend.is_result_ready(task.task_id):

380

progress = await backend.get_progress(task.task_id)

381

if progress:

382

print(f"Progress: {progress.progress}% - {progress.message}")

383

await asyncio.sleep(1)

384

```

385

386

## Types

387

388

```python { .api }

389

from typing import TypeVar, Optional, Any, List, Tuple, Union

390

from taskiq.abc.result_backend import AsyncResultBackend

391

from taskiq.abc.serializer import TaskiqSerializer

392

from taskiq.result import TaskiqResult

393

from taskiq.depends.progress_tracker import TaskProgress

394

395

_ReturnType = TypeVar("_ReturnType")

396

397

# Constants

398

PROGRESS_KEY_SUFFIX: str = "__progress"

399

```