or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-tasks.mdcli-operations.mdcore-deployment.mdindex.mdpackage-utilities.mdrequest-processing.mdssl-management.mdutilities.mdwsgi-processing.md

async-tasks.mddocs/

0

# Asynchronous Task Execution

1

2

Background task processing using AWS Lambda and SNS for executing long-running operations asynchronously outside of the HTTP request/response cycle. This capability enables scalable background job processing in serverless applications.

3

4

## Capabilities

5

6

### Task Decorators

7

8

Convert regular functions into asynchronous tasks that can be executed in the background.

9

10

```python { .api }

11

def task(func):

12

"""

13

Decorator to convert function to async Lambda task.

14

15

Wraps function to enable asynchronous execution via Lambda

16

invocation with automatic serialization and error handling.

17

18

Parameters:

19

- func: callable, function to convert to async task

20

21

Returns:

22

callable: Decorated function with async capabilities

23

"""

24

```

25

26

```python { .api }

27

def task_sns(func):

28

"""

29

Decorator to convert function to SNS-based async task.

30

31

Wraps function for asynchronous execution via SNS messaging

32

with higher payload limits and retry capabilities.

33

34

Parameters:

35

- func: callable, function to convert to SNS async task

36

37

Returns:

38

callable: Decorated function with SNS async capabilities

39

"""

40

```

41

42

### Task Execution

43

44

Execute asynchronous tasks and manage task responses.

45

46

```python { .api }

47

def run(func, *args, **kwargs):

48

"""

49

Execute async task via Lambda or SNS.

50

51

Runs function asynchronously using appropriate backend

52

based on payload size and configuration.

53

54

Parameters:

55

- func: callable, function to execute asynchronously

56

- *args: positional arguments for function

57

- **kwargs: keyword arguments for function

58

59

Returns:

60

LambdaAsyncResponse or SnsAsyncResponse: Task response handler

61

"""

62

```

63

64

```python { .api }

65

def run_message(task_type, func_path, args, kwargs):

66

"""

67

Process async task message from queue.

68

69

Executes task function with provided arguments from

70

async message queue (Lambda or SNS).

71

72

Parameters:

73

- task_type: str, type of task ('lambda' or 'sns')

74

- func_path: str, importable path to function

75

- args: list, positional arguments

76

- kwargs: dict, keyword arguments

77

78

Returns:

79

any: Task function return value

80

"""

81

```

82

83

### Task Routing

84

85

Route asynchronous tasks to appropriate execution backends.

86

87

```python { .api }

88

def route_lambda_task(event, context):

89

"""

90

Route Lambda-based async task execution.

91

92

Processes Lambda events containing async task data

93

and executes the specified function.

94

95

Parameters:

96

- event: dict, Lambda event with task data

97

- context: LambdaContext, Lambda runtime context

98

99

Returns:

100

any: Task execution result

101

"""

102

```

103

104

```python { .api }

105

def route_sns_task(event, context):

106

"""

107

Route SNS-based async task execution.

108

109

Processes SNS events containing async task messages

110

and executes the specified function.

111

112

Parameters:

113

- event: dict, Lambda event from SNS trigger

114

- context: LambdaContext, Lambda runtime context

115

116

Returns:

117

any: Task execution result

118

"""

119

```

120

121

### Response Management

122

123

Retrieve and manage asynchronous task responses.

124

125

```python { .api }

126

def get_async_response(response_id):

127

"""

128

Retrieve async task response by task ID.

129

130

Fetches the response/result of an asynchronous task

131

using its unique response identifier.

132

133

Parameters:

134

- response_id: str, unique task response identifier

135

136

Returns:

137

any: Task response data or None if not found

138

"""

139

```

140

141

### Task Management Utilities

142

143

Import and manage task functions dynamically.

144

145

```python { .api }

146

def import_and_get_task(task_path):

147

"""

148

Import task function from module path.

149

150

Dynamically imports and returns function for async execution

151

from dotted module path.

152

153

Parameters:

154

- task_path: str, dotted path to function (module.function)

155

156

Returns:

157

callable: The imported task function

158

"""

159

```

160

161

```python { .api }

162

def get_func_task_path(func):

163

"""

164

Get importable module path for function.

165

166

Generates dotted module path string that can be used

167

to import and execute function in async context.

168

169

Parameters:

170

- func: callable, function to get path for

171

172

Returns:

173

str: Dotted module path (module.function)

174

"""

175

```

176

177

## Response Handler Classes

178

179

### Lambda Async Response

180

181

Handle responses from Lambda-based async task execution.

182

183

```python { .api }

184

class LambdaAsyncResponse:

185

"""

186

Response handler for async Lambda invocations.

187

188

Manages response data and status for asynchronous

189

Lambda function executions.

190

"""

191

192

def __init__(self, **kwargs):

193

"""

194

Initialize Lambda async response handler.

195

196

Parameters:

197

- **kwargs: Response configuration and metadata

198

"""

199

```

200

201

### SNS Async Response

202

203

Handle responses from SNS-based async task execution.

204

205

```python { .api }

206

class SnsAsyncResponse(LambdaAsyncResponse):

207

"""

208

Response handler for SNS async messaging.

209

210

Extends LambdaAsyncResponse with SNS-specific handling

211

for higher payload limits and message delivery.

212

"""

213

214

def __init__(self, **kwargs):

215

"""

216

Initialize SNS async response handler.

217

218

Parameters:

219

- **kwargs: Response configuration and metadata

220

"""

221

```

222

223

## Exception Classes

224

225

```python { .api }

226

class AsyncException(Exception):

227

"""

228

Exception raised for async operation failures.

229

230

Indicates errors in asynchronous task execution,

231

serialization, or response handling.

232

"""

233

```

234

235

## Constants

236

237

```python { .api }

238

# Payload size limits for async backends

239

LAMBDA_ASYNC_PAYLOAD_LIMIT = 256000 # 256KB for Lambda async

240

SNS_ASYNC_PAYLOAD_LIMIT = 256000 # 256KB for SNS messages

241

242

# Mapping of async response types to handler classes

243

ASYNC_CLASSES = {

244

'lambda': LambdaAsyncResponse,

245

'sns': SnsAsyncResponse

246

}

247

```

248

249

## Usage Examples

250

251

### Basic Async Task

252

253

```python

254

from zappa.asynchronous import task, run

255

256

@task

257

def process_data(data, multiplier=1):

258

"""Process data asynchronously."""

259

result = []

260

for item in data:

261

result.append(item * multiplier)

262

return result

263

264

# Execute task asynchronously

265

response = run(process_data, [1, 2, 3, 4], multiplier=2)

266

print(f"Task ID: {response.response_id}")

267

268

# Get result later

269

result = get_async_response(response.response_id)

270

print(f"Result: {result}")

271

```

272

273

### SNS-Based Async Task

274

275

```python

276

from zappa.asynchronous import task_sns, run

277

278

@task_sns

279

def send_email_batch(email_list, subject, body):

280

"""Send batch emails asynchronously via SNS."""

281

import boto3

282

ses = boto3.client('ses')

283

284

results = []

285

for email in email_list:

286

response = ses.send_email(

287

Source='noreply@example.com',

288

Destination={'ToAddresses': [email]},

289

Message={

290

'Subject': {'Data': subject},

291

'Body': {'Text': {'Data': body}}

292

}

293

)

294

results.append(response['MessageId'])

295

296

return results

297

298

# Execute large batch operation

299

emails = ['user1@example.com', 'user2@example.com']

300

response = run(send_email_batch, emails,

301

subject='Newsletter',

302

body='Welcome to our newsletter!')

303

```

304

305

### Custom Task Function

306

307

```python

308

from zappa.asynchronous import run, get_async_response

309

310

def expensive_computation(n):

311

"""Compute factorial asynchronously."""

312

import math

313

return math.factorial(n)

314

315

# Run without decorator (function must be importable)

316

response = run(expensive_computation, 1000)

317

318

# Check response later

319

result = get_async_response(response.response_id)

320

print(f"1000! = {result}")

321

```

322

323

### Task with Error Handling

324

325

```python

326

from zappa.asynchronous import task, run, AsyncException

327

328

@task

329

def risky_operation(data):

330

"""Task that might fail."""

331

if not data:

332

raise ValueError("Data cannot be empty")

333

334

# Simulate processing

335

import time

336

time.sleep(2)

337

return f"Processed {len(data)} items"

338

339

try:

340

response = run(risky_operation, [])

341

result = get_async_response(response.response_id)

342

except AsyncException as e:

343

print(f"Task failed: {e}")

344

```

345

346

### Multiple Task Execution

347

348

```python

349

from zappa.asynchronous import task, run

350

351

@task

352

def process_chunk(chunk_id, data_chunk):

353

"""Process data chunk."""

354

return {

355

'chunk_id': chunk_id,

356

'processed_count': len(data_chunk),

357

'sum': sum(data_chunk)

358

}

359

360

# Process large dataset in parallel chunks

361

large_dataset = list(range(10000))

362

chunk_size = 1000

363

responses = []

364

365

for i in range(0, len(large_dataset), chunk_size):

366

chunk = large_dataset[i:i+chunk_size]

367

response = run(process_chunk, i//chunk_size, chunk)

368

responses.append(response)

369

370

# Collect results

371

results = []

372

for response in responses:

373

result = get_async_response(response.response_id)

374

results.append(result)

375

376

print(f"Processed {len(results)} chunks")

377

```