Server-less Python Web Services for AWS Lambda and API Gateway
Background task processing using AWS Lambda and SNS for executing long-running operations asynchronously outside of the HTTP request/response cycle. This capability enables scalable background job processing in serverless applications.
Convert regular functions into asynchronous tasks that can be executed in the background.
def task(func):
"""
Decorator to convert function to async Lambda task.
Wraps function to enable asynchronous execution via Lambda
invocation with automatic serialization and error handling.
Parameters:
- func: callable, function to convert to async task
Returns:
callable: Decorated function with async capabilities
"""def task_sns(func):
"""
Decorator to convert function to SNS-based async task.
Wraps function for asynchronous execution via SNS messaging
with higher payload limits and retry capabilities.
Parameters:
- func: callable, function to convert to SNS async task
Returns:
callable: Decorated function with SNS async capabilities
"""Execute asynchronous tasks and manage task responses.
def run(func, *args, **kwargs):
"""
Execute async task via Lambda or SNS.
Runs function asynchronously using appropriate backend
based on payload size and configuration.
Parameters:
- func: callable, function to execute asynchronously
- *args: positional arguments for function
- **kwargs: keyword arguments for function
Returns:
LambdaAsyncResponse or SnsAsyncResponse: Task response handler
"""def run_message(task_type, func_path, args, kwargs):
"""
Process async task message from queue.
Executes task function with provided arguments from
async message queue (Lambda or SNS).
Parameters:
- task_type: str, type of task ('lambda' or 'sns')
- func_path: str, importable path to function
- args: list, positional arguments
- kwargs: dict, keyword arguments
Returns:
any: Task function return value
"""Route asynchronous tasks to appropriate execution backends.
def route_lambda_task(event, context):
"""
Route Lambda-based async task execution.
Processes Lambda events containing async task data
and executes the specified function.
Parameters:
- event: dict, Lambda event with task data
- context: LambdaContext, Lambda runtime context
Returns:
any: Task execution result
"""def route_sns_task(event, context):
"""
Route SNS-based async task execution.
Processes SNS events containing async task messages
and executes the specified function.
Parameters:
- event: dict, Lambda event from SNS trigger
- context: LambdaContext, Lambda runtime context
Returns:
any: Task execution result
"""Retrieve and manage asynchronous task responses.
def get_async_response(response_id):
"""
Retrieve async task response by task ID.
Fetches the response/result of an asynchronous task
using its unique response identifier.
Parameters:
- response_id: str, unique task response identifier
Returns:
any: Task response data or None if not found
"""Import and manage task functions dynamically.
def import_and_get_task(task_path):
"""
Import task function from module path.
Dynamically imports and returns function for async execution
from dotted module path.
Parameters:
- task_path: str, dotted path to function (module.function)
Returns:
callable: The imported task function
"""def get_func_task_path(func):
"""
Get importable module path for function.
Generates dotted module path string that can be used
to import and execute function in async context.
Parameters:
- func: callable, function to get path for
Returns:
str: Dotted module path (module.function)
"""Handle responses from Lambda-based async task execution.
class LambdaAsyncResponse:
"""
Response handler for async Lambda invocations.
Manages response data and status for asynchronous
Lambda function executions.
"""
def __init__(self, **kwargs):
"""
Initialize Lambda async response handler.
Parameters:
- **kwargs: Response configuration and metadata
"""Handle responses from SNS-based async task execution.
class SnsAsyncResponse(LambdaAsyncResponse):
"""
Response handler for SNS async messaging.
Extends LambdaAsyncResponse with SNS-specific handling
for higher payload limits and message delivery.
"""
def __init__(self, **kwargs):
"""
Initialize SNS async response handler.
Parameters:
- **kwargs: Response configuration and metadata
"""class AsyncException(Exception):
"""
Exception raised for async operation failures.
Indicates errors in asynchronous task execution,
serialization, or response handling.
"""# Payload size limits for async backends
LAMBDA_ASYNC_PAYLOAD_LIMIT = 256000 # 256KB for Lambda async
SNS_ASYNC_PAYLOAD_LIMIT = 256000 # 256KB for SNS messages
# Mapping of async response types to handler classes
ASYNC_CLASSES = {
'lambda': LambdaAsyncResponse,
'sns': SnsAsyncResponse
}from zappa.asynchronous import task, run
@task
def process_data(data, multiplier=1):
"""Process data asynchronously."""
result = []
for item in data:
result.append(item * multiplier)
return result
# Execute task asynchronously
response = run(process_data, [1, 2, 3, 4], multiplier=2)
print(f"Task ID: {response.response_id}")
# Get result later
result = get_async_response(response.response_id)
print(f"Result: {result}")from zappa.asynchronous import task_sns, run
@task_sns
def send_email_batch(email_list, subject, body):
"""Send batch emails asynchronously via SNS."""
import boto3
ses = boto3.client('ses')
results = []
for email in email_list:
response = ses.send_email(
Source='noreply@example.com',
Destination={'ToAddresses': [email]},
Message={
'Subject': {'Data': subject},
'Body': {'Text': {'Data': body}}
}
)
results.append(response['MessageId'])
return results
# Execute large batch operation
emails = ['user1@example.com', 'user2@example.com']
response = run(send_email_batch, emails,
subject='Newsletter',
body='Welcome to our newsletter!')from zappa.asynchronous import run, get_async_response
def expensive_computation(n):
"""Compute factorial asynchronously."""
import math
return math.factorial(n)
# Run without decorator (function must be importable)
response = run(expensive_computation, 1000)
# Check response later
result = get_async_response(response.response_id)
print(f"1000! = {result}")from zappa.asynchronous import task, run, AsyncException
@task
def risky_operation(data):
"""Task that might fail."""
if not data:
raise ValueError("Data cannot be empty")
# Simulate processing
import time
time.sleep(2)
return f"Processed {len(data)} items"
try:
response = run(risky_operation, [])
result = get_async_response(response.response_id)
except AsyncException as e:
print(f"Task failed: {e}")from zappa.asynchronous import task, run
@task
def process_chunk(chunk_id, data_chunk):
"""Process data chunk."""
return {
'chunk_id': chunk_id,
'processed_count': len(data_chunk),
'sum': sum(data_chunk)
}
# Process large dataset in parallel chunks
large_dataset = list(range(10000))
chunk_size = 1000
responses = []
for i in range(0, len(large_dataset), chunk_size):
chunk = large_dataset[i:i+chunk_size]
response = run(process_chunk, i//chunk_size, chunk)
responses.append(response)
# Collect results
results = []
for response in responses:
result = get_async_response(response.response_id)
results.append(result)
print(f"Processed {len(results)} chunks")Install with Tessl CLI
npx tessl i tessl/pypi-zappa