or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

brokers.mdevents-state.mdexceptions.mdindex.mdmiddleware.mdresult-backends.mdscheduling.mdtasks-results.md

tasks-results.mddocs/

0

# Tasks and Results

1

2

Task execution system that handles the lifecycle of tasks from creation to result retrieval. This includes decorated task wrappers, result containers, execution context, and utilities for working with task outcomes.

3

4

## Capabilities

5

6

### Task Decoration and Execution

7

8

Tasks are created by decorating functions with the broker's `@task` decorator, which converts them into distributed task objects that can be executed asynchronously.

9

10

```python { .api }

11

class AsyncTaskiqDecoratedTask:

12

"""

13

Decorated task wrapper that enables distributed execution.

14

15

Created automatically when using @broker.task decorator.

16

Provides methods for sending tasks to workers and calling locally.

17

"""

18

19

task_name: str

20

broker: AsyncBroker

21

labels: Dict[str, Any]

22

23

async def kiq(self, *args, **kwargs) -> TaskiqResult:

24

"""

25

Send task to broker for distributed execution.

26

27

Args:

28

*args: Positional arguments for the task function

29

**kwargs: Keyword arguments for the task function

30

31

Returns:

32

TaskiqResult object for retrieving the result

33

"""

34

35

async def __call__(self, *args, **kwargs) -> Any:

36

"""

37

Execute task locally in current process.

38

39

Args:

40

*args: Positional arguments for the task function

41

**kwargs: Keyword arguments for the task function

42

43

Returns:

44

Direct result of task function execution

45

"""

46

47

def kicker(self) -> AsyncKicker:

48

"""

49

Get kicker object for advanced task configuration.

50

51

Kicker allows modifying task parameters before sending.

52

53

Returns:

54

AsyncKicker instance for this task

55

"""

56

57

async def schedule_by_cron(

58

self,

59

source: ScheduleSource,

60

cron: Union[str, CronSpec],

61

*args,

62

**kwargs,

63

) -> CreatedSchedule:

64

"""

65

Schedule task to run on cron pattern.

66

67

Args:

68

source: Schedule source that supports dynamic scheduling

69

cron: Cron string or CronSpec instance

70

*args: Positional arguments for the task function

71

**kwargs: Keyword arguments for the task function

72

73

Returns:

74

CreatedSchedule object with schedule details

75

"""

76

77

async def schedule_by_time(

78

self,

79

source: ScheduleSource,

80

time: datetime,

81

*args,

82

**kwargs,

83

) -> CreatedSchedule:

84

"""

85

Schedule task to run at specific time.

86

87

Args:

88

source: Schedule source that supports dynamic scheduling

89

time: Specific datetime to run the task

90

*args: Positional arguments for the task function

91

**kwargs: Keyword arguments for the task function

92

93

Returns:

94

CreatedSchedule object with schedule details

95

"""

96

97

class AsyncTaskiqTask:

98

"""

99

Task execution wrapper for handling async task invocation.

100

"""

101

102

def __init__(

103

self,

104

task_name: str,

105

broker: AsyncBroker,

106

labels: Optional[Dict[str, Any]] = None,

107

) -> None: ...

108

109

async def kiq(self, *args, **kwargs) -> TaskiqResult:

110

"""Send task for execution and return result handle."""

111

112

async def is_ready(self) -> bool:

113

"""

114

Check if task result is ready.

115

116

Returns:

117

True if task is completed, False otherwise

118

119

Raises:

120

ResultIsReadyError: If unable to check task readiness

121

"""

122

123

async def get_result(self, with_logs: bool = False) -> TaskiqResult:

124

"""

125

Get task result from result backend.

126

127

Args:

128

with_logs: Whether to fetch execution logs

129

130

Returns:

131

TaskiqResult with execution outcome

132

133

Raises:

134

ResultGetError: If unable to retrieve result

135

"""

136

137

async def wait_result(

138

self,

139

check_interval: float = 0.2,

140

timeout: float = -1.0,

141

with_logs: bool = False,

142

) -> TaskiqResult:

143

"""

144

Wait for task completion and return result.

145

146

Args:

147

check_interval: How often to check for completion (seconds)

148

timeout: Maximum time to wait (-1 for no timeout)

149

with_logs: Whether to fetch execution logs

150

151

Returns:

152

TaskiqResult with execution outcome

153

154

Raises:

155

TaskiqResultTimeoutError: If timeout is exceeded

156

"""

157

158

async def get_progress(self) -> Optional[TaskProgress[Any]]:

159

"""

160

Get current task execution progress.

161

162

Returns:

163

TaskProgress object or None if no progress tracking

164

"""

165

```

166

167

### Result Management

168

169

Result objects provide access to task execution outcomes, including return values, errors, execution metadata, and status checking.

170

171

```python { .api }

172

class TaskiqResult:

173

"""

174

Container for task execution results and metadata.

175

176

Supports both successful results and error conditions,

177

along with execution timing and custom labels.

178

"""

179

180

is_err: bool

181

"""Whether the task execution resulted in an error."""

182

183

return_value: Any

184

"""The return value from successful task execution."""

185

186

execution_time: float

187

"""Task execution time in seconds."""

188

189

labels: Dict[str, Any]

190

"""Custom labels attached to the task result."""

191

192

error: Optional[BaseException]

193

"""Exception object if task execution failed."""

194

195

log: Optional[str]

196

"""Deprecated: Task execution logs (may be removed in future)."""

197

198

async def wait_result(

199

self,

200

timeout: Optional[float] = None,

201

check_interval: float = 0.5,

202

) -> Any:

203

"""

204

Wait for task completion and return the result.

205

206

Args:

207

timeout: Maximum time to wait in seconds

208

check_interval: How often to check for completion

209

210

Returns:

211

The task return value

212

213

Raises:

214

TaskiqResultTimeoutError: If timeout is exceeded

215

Exception: Any exception raised by the task

216

"""

217

218

async def is_ready(self) -> bool:

219

"""

220

Check if task result is available.

221

222

Returns:

223

True if result is ready, False otherwise

224

"""

225

226

def __await__(self):

227

"""Enable direct awaiting of TaskiqResult objects."""

228

229

def raise_for_error(self) -> TaskiqResult:

230

"""

231

Raise exception if task resulted in error.

232

233

Returns:

234

Self if no error occurred

235

236

Raises:

237

Exception: The original task exception if is_err is True

238

"""

239

```

240

241

### Execution Context

242

243

Context objects provide task execution environment information and control capabilities during task processing.

244

245

```python { .api }

246

class Context:

247

"""

248

Task execution context providing access to message data,

249

broker instance, and task control operations.

250

"""

251

252

message: TaskiqMessage

253

"""The original task message with metadata."""

254

255

broker: AsyncBroker

256

"""Broker instance executing this task."""

257

258

state: TaskiqState

259

"""Shared state object for the broker."""

260

261

def __init__(self, message: TaskiqMessage, broker: AsyncBroker) -> None: ...

262

263

async def requeue(self) -> None:

264

"""

265

Requeue the current task for later execution.

266

267

Increments requeue counter and sends task back to broker.

268

Always raises NoResultError to prevent result storage.

269

270

Raises:

271

NoResultError: Always raised to stop current execution

272

"""

273

274

def reject(self) -> None:

275

"""

276

Reject the current task and prevent reprocessing.

277

278

Always raises TaskRejectedError to mark task as rejected.

279

280

Raises:

281

TaskRejectedError: Always raised to reject the task

282

"""

283

```

284

285

### Task Gathering

286

287

Utility functions for working with multiple task results concurrently.

288

289

```python { .api }

290

async def gather(

291

*tasks: AsyncTaskiqTask[Any],

292

timeout: float = -1,

293

with_logs: bool = False,

294

periodicity: float = 0.1,

295

) -> Tuple[TaskiqResult[Any], ...]:

296

"""

297

Wait for multiple task results concurrently.

298

299

Similar to asyncio.gather but works with AsyncTaskiqTask objects.

300

301

Args:

302

*tasks: AsyncTaskiqTask objects to wait for

303

timeout: Maximum time to wait in seconds, -1 for no timeout

304

with_logs: Whether to fetch logs from result backend

305

periodicity: How often to check for task completion

306

307

Returns:

308

Tuple of TaskiqResult objects in the same order as input tasks

309

310

Raises:

311

TaskiqResultTimeoutError: If timeout is exceeded

312

"""

313

```

314

315

## Usage Examples

316

317

### Basic Task Definition and Execution

318

319

```python

320

from taskiq import InMemoryBroker

321

322

broker = InMemoryBroker()

323

324

@broker.task

325

async def calculate_sum(numbers: List[int]) -> int:

326

"""Calculate sum of numbers with artificial delay."""

327

await asyncio.sleep(1) # Simulate work

328

return sum(numbers)

329

330

# Execute task

331

async def main():

332

await broker.startup()

333

334

# Send task for execution

335

result = await calculate_sum.kiq([1, 2, 3, 4, 5])

336

337

# Wait for result

338

total = await result.wait_result(timeout=10.0)

339

print(f"Sum: {total}") # Sum: 15

340

341

await broker.shutdown()

342

```

343

344

### Error Handling and Result Inspection

345

346

```python

347

@broker.task

348

async def risky_operation(value: int) -> int:

349

if value < 0:

350

raise ValueError("Negative values not allowed")

351

return value * 2

352

353

async def handle_results():

354

result = await risky_operation.kiq(-5)

355

356

# Check if result is ready

357

if await result.is_ready():

358

try:

359

value = await result.wait_result()

360

print(f"Success: {value}")

361

except ValueError as e:

362

print(f"Task failed: {e}")

363

364

# Inspect result metadata

365

print(f"Execution time: {result.execution_time}s")

366

print(f"Had error: {result.is_err}")

367

if result.error:

368

print(f"Error type: {type(result.error).__name__}")

369

```

370

371

### Context Usage in Tasks

372

373

```python

374

from taskiq import Context, TaskiqDepends

375

376

@broker.task

377

async def context_aware_task(

378

data: str,

379

context: Context = TaskiqDepends(),

380

) -> str:

381

"""Task that uses execution context."""

382

383

# Access task metadata

384

task_id = context.message.task_id

385

requeue_count = context.message.labels.get("X-Taskiq-requeue", "0")

386

387

# Conditional requeue logic

388

if data == "retry_me" and int(requeue_count) < 2:

389

print(f"Requeuing task {task_id} (attempt {int(requeue_count) + 1})")

390

await context.requeue()

391

392

# Reject invalid data

393

if data == "invalid":

394

context.reject()

395

396

return f"Processed: {data} (ID: {task_id})"

397

```

398

399

### Multiple Task Coordination

400

401

```python

402

from taskiq import gather

403

404

@broker.task

405

async def fetch_data(url: str) -> dict:

406

# Simulate API call

407

await asyncio.sleep(random.uniform(0.5, 2.0))

408

return {"url": url, "data": f"content from {url}"}

409

410

async def fetch_multiple_sources():

411

# Start multiple tasks

412

tasks = [

413

fetch_data.kiq("https://api1.example.com"),

414

fetch_data.kiq("https://api2.example.com"),

415

fetch_data.kiq("https://api3.example.com"),

416

]

417

418

# Wait for all results

419

results = await gather(*tasks)

420

421

# Process combined results

422

all_data = {}

423

for result in results:

424

all_data[result["url"]] = result["data"]

425

426

return all_data

427

```

428

429

## Types

430

431

```python { .api }

432

class TaskiqMessage:

433

"""Message format for task data and metadata."""

434

435

task_id: str

436

task_name: str

437

labels: Dict[str, Any]

438

args: Tuple[Any, ...]

439

kwargs: Dict[str, Any]

440

441

class AsyncKicker:

442

"""Kicker object for advanced task parameter configuration."""

443

444

def __init__(

445

self,

446

task_name: str,

447

broker: AsyncBroker,

448

labels: Dict[str, Any],

449

return_type: Optional[Type[Any]] = None,

450

) -> None: ...

451

452

async def kiq(self, *args, **kwargs) -> AsyncTaskiqTask[Any]: ...

453

async def schedule_by_cron(

454

self,

455

source: ScheduleSource,

456

cron: Union[str, CronSpec],

457

*args,

458

**kwargs,

459

) -> CreatedSchedule[Any]: ...

460

async def schedule_by_time(

461

self,

462

source: ScheduleSource,

463

time: datetime,

464

*args,

465

**kwargs,

466

) -> CreatedSchedule[Any]: ...

467

468

class CreatedSchedule:

469

"""Container for created schedule information."""

470

471

schedule_id: str

472

source: ScheduleSource

473

474

CronSpec = str # Type alias for cron specification strings

475

476

class TaskProgress:

477

"""Progress tracking container for long-running tasks."""

478

479

def __init__(self, current: int, total: int, message: str = "") -> None: ...

480

481

current: int

482

"""Current progress value."""

483

484

total: int

485

"""Total expected value."""

486

487

message: str

488

"""Optional progress message."""

489

```