0
# Asynchronous Task Execution
1
2
Background task processing using AWS Lambda and SNS for executing long-running operations asynchronously outside of the HTTP request/response cycle. This capability enables scalable background job processing in serverless applications.
3
4
## Capabilities
5
6
### Task Decorators
7
8
Convert regular functions into asynchronous tasks that can be executed in the background.
9
10
```python { .api }
11
def task(func):
12
"""
13
Decorator to convert function to async Lambda task.
14
15
Wraps function to enable asynchronous execution via Lambda
16
invocation with automatic serialization and error handling.
17
18
Parameters:
19
- func: callable, function to convert to async task
20
21
Returns:
22
callable: Decorated function with async capabilities
23
"""
24
```
25
26
```python { .api }
27
def task_sns(func):
28
"""
29
Decorator to convert function to SNS-based async task.
30
31
Wraps function for asynchronous execution via SNS messaging
32
with higher payload limits and retry capabilities.
33
34
Parameters:
35
- func: callable, function to convert to SNS async task
36
37
Returns:
38
callable: Decorated function with SNS async capabilities
39
"""
40
```
41
42
### Task Execution
43
44
Execute asynchronous tasks and manage task responses.
45
46
```python { .api }
47
def run(func, *args, **kwargs):
48
"""
49
Execute async task via Lambda or SNS.
50
51
Runs function asynchronously using appropriate backend
52
based on payload size and configuration.
53
54
Parameters:
55
- func: callable, function to execute asynchronously
56
- *args: positional arguments for function
57
- **kwargs: keyword arguments for function
58
59
Returns:
60
LambdaAsyncResponse or SnsAsyncResponse: Task response handler
61
"""
62
```
63
64
```python { .api }
65
def run_message(task_type, func_path, args, kwargs):
66
"""
67
Process async task message from queue.
68
69
Executes task function with provided arguments from
70
async message queue (Lambda or SNS).
71
72
Parameters:
73
- task_type: str, type of task ('lambda' or 'sns')
74
- func_path: str, importable path to function
75
- args: list, positional arguments
76
- kwargs: dict, keyword arguments
77
78
Returns:
79
any: Task function return value
80
"""
81
```
82
83
### Task Routing
84
85
Route asynchronous tasks to appropriate execution backends.
86
87
```python { .api }
88
def route_lambda_task(event, context):
89
"""
90
Route Lambda-based async task execution.
91
92
Processes Lambda events containing async task data
93
and executes the specified function.
94
95
Parameters:
96
- event: dict, Lambda event with task data
97
- context: LambdaContext, Lambda runtime context
98
99
Returns:
100
any: Task execution result
101
"""
102
```
103
104
```python { .api }
105
def route_sns_task(event, context):
106
"""
107
Route SNS-based async task execution.
108
109
Processes SNS events containing async task messages
110
and executes the specified function.
111
112
Parameters:
113
- event: dict, Lambda event from SNS trigger
114
- context: LambdaContext, Lambda runtime context
115
116
Returns:
117
any: Task execution result
118
"""
119
```
120
121
### Response Management
122
123
Retrieve and manage asynchronous task responses.
124
125
```python { .api }
126
def get_async_response(response_id):
127
"""
128
Retrieve async task response by task ID.
129
130
Fetches the response/result of an asynchronous task
131
using its unique response identifier.
132
133
Parameters:
134
- response_id: str, unique task response identifier
135
136
Returns:
137
any: Task response data or None if not found
138
"""
139
```
140
141
### Task Management Utilities
142
143
Import and manage task functions dynamically.
144
145
```python { .api }
146
def import_and_get_task(task_path):
147
"""
148
Import task function from module path.
149
150
Dynamically imports and returns function for async execution
151
from dotted module path.
152
153
Parameters:
154
- task_path: str, dotted path to function (module.function)
155
156
Returns:
157
callable: The imported task function
158
"""
159
```
160
161
```python { .api }
162
def get_func_task_path(func):
163
"""
164
Get importable module path for function.
165
166
Generates dotted module path string that can be used
167
to import and execute function in async context.
168
169
Parameters:
170
- func: callable, function to get path for
171
172
Returns:
173
str: Dotted module path (module.function)
174
"""
175
```
176
177
## Response Handler Classes
178
179
### Lambda Async Response
180
181
Handle responses from Lambda-based async task execution.
182
183
```python { .api }
184
class LambdaAsyncResponse:
185
"""
186
Response handler for async Lambda invocations.
187
188
Manages response data and status for asynchronous
189
Lambda function executions.
190
"""
191
192
def __init__(self, **kwargs):
193
"""
194
Initialize Lambda async response handler.
195
196
Parameters:
197
- **kwargs: Response configuration and metadata
198
"""
199
```
200
201
### SNS Async Response
202
203
Handle responses from SNS-based async task execution.
204
205
```python { .api }
206
class SnsAsyncResponse(LambdaAsyncResponse):
207
"""
208
Response handler for SNS async messaging.
209
210
Extends LambdaAsyncResponse with SNS-specific handling
211
for higher payload limits and message delivery.
212
"""
213
214
def __init__(self, **kwargs):
215
"""
216
Initialize SNS async response handler.
217
218
Parameters:
219
- **kwargs: Response configuration and metadata
220
"""
221
```
222
223
## Exception Classes
224
225
```python { .api }
226
class AsyncException(Exception):
227
"""
228
Exception raised for async operation failures.
229
230
Indicates errors in asynchronous task execution,
231
serialization, or response handling.
232
"""
233
```
234
235
## Constants
236
237
```python { .api }
238
# Payload size limits for async backends
239
LAMBDA_ASYNC_PAYLOAD_LIMIT = 256000 # 256KB for Lambda async
240
SNS_ASYNC_PAYLOAD_LIMIT = 256000 # 256KB for SNS messages
241
242
# Mapping of async response types to handler classes
243
ASYNC_CLASSES = {
244
'lambda': LambdaAsyncResponse,
245
'sns': SnsAsyncResponse
246
}
247
```
248
249
## Usage Examples
250
251
### Basic Async Task
252
253
```python
254
from zappa.asynchronous import task, run
255
256
@task
257
def process_data(data, multiplier=1):
258
"""Process data asynchronously."""
259
result = []
260
for item in data:
261
result.append(item * multiplier)
262
return result
263
264
# Execute task asynchronously
265
response = run(process_data, [1, 2, 3, 4], multiplier=2)
266
print(f"Task ID: {response.response_id}")
267
268
# Get result later
269
result = get_async_response(response.response_id)
270
print(f"Result: {result}")
271
```
272
273
### SNS-Based Async Task
274
275
```python
276
from zappa.asynchronous import task_sns, run
277
278
@task_sns
279
def send_email_batch(email_list, subject, body):
280
"""Send batch emails asynchronously via SNS."""
281
import boto3
282
ses = boto3.client('ses')
283
284
results = []
285
for email in email_list:
286
response = ses.send_email(
287
Source='noreply@example.com',
288
Destination={'ToAddresses': [email]},
289
Message={
290
'Subject': {'Data': subject},
291
'Body': {'Text': {'Data': body}}
292
}
293
)
294
results.append(response['MessageId'])
295
296
return results
297
298
# Execute large batch operation
299
emails = ['user1@example.com', 'user2@example.com']
300
response = run(send_email_batch, emails,
301
subject='Newsletter',
302
body='Welcome to our newsletter!')
303
```
304
305
### Custom Task Function
306
307
```python
308
from zappa.asynchronous import run, get_async_response
309
310
def expensive_computation(n):
311
"""Compute factorial asynchronously."""
312
import math
313
return math.factorial(n)
314
315
# Run without decorator (function must be importable)
316
response = run(expensive_computation, 1000)
317
318
# Check response later
319
result = get_async_response(response.response_id)
320
print(f"1000! = {result}")
321
```
322
323
### Task with Error Handling
324
325
```python
326
from zappa.asynchronous import task, run, AsyncException
327
328
@task
329
def risky_operation(data):
330
"""Task that might fail."""
331
if not data:
332
raise ValueError("Data cannot be empty")
333
334
# Simulate processing
335
import time
336
time.sleep(2)
337
return f"Processed {len(data)} items"
338
339
try:
340
response = run(risky_operation, [])
341
result = get_async_response(response.response_id)
342
except AsyncException as e:
343
print(f"Task failed: {e}")
344
```
345
346
### Multiple Task Execution
347
348
```python
349
from zappa.asynchronous import task, run
350
351
@task
352
def process_chunk(chunk_id, data_chunk):
353
"""Process data chunk."""
354
return {
355
'chunk_id': chunk_id,
356
'processed_count': len(data_chunk),
357
'sum': sum(data_chunk)
358
}
359
360
# Process large dataset in parallel chunks
361
large_dataset = list(range(10000))
362
chunk_size = 1000
363
responses = []
364
365
for i in range(0, len(large_dataset), chunk_size):
366
chunk = large_dataset[i:i+chunk_size]
367
response = run(process_chunk, i//chunk_size, chunk)
368
responses.append(response)
369
370
# Collect results
371
results = []
372
for response in responses:
373
result = get_async_response(response.response_id)
374
results.append(result)
375
376
print(f"Processed {len(results)} chunks")
377
```