CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-zappa

Server-less Python Web Services for AWS Lambda and API Gateway

Overview
Eval results
Files

async-tasks.mddocs/

Asynchronous Task Execution

Background task processing using AWS Lambda and SNS for executing long-running operations asynchronously outside of the HTTP request/response cycle. This capability enables scalable background job processing in serverless applications.

Capabilities

Task Decorators

Convert regular functions into asynchronous tasks that can be executed in the background.

def task(func):
    """
    Decorator to convert function to async Lambda task.
    
    Wraps function to enable asynchronous execution via Lambda
    invocation with automatic serialization and error handling.
    
    Parameters:
    - func: callable, function to convert to async task
    
    Returns:
    callable: Decorated function with async capabilities
    """
def task_sns(func):
    """
    Decorator to convert function to SNS-based async task.
    
    Wraps function for asynchronous execution via SNS messaging
    with higher payload limits and retry capabilities.
    
    Parameters:
    - func: callable, function to convert to SNS async task
    
    Returns:
    callable: Decorated function with SNS async capabilities
    """

Task Execution

Execute asynchronous tasks and manage task responses.

def run(func, *args, **kwargs):
    """
    Execute async task via Lambda or SNS.
    
    Runs function asynchronously using appropriate backend
    based on payload size and configuration.
    
    Parameters:
    - func: callable, function to execute asynchronously
    - *args: positional arguments for function
    - **kwargs: keyword arguments for function
    
    Returns:
    LambdaAsyncResponse or SnsAsyncResponse: Task response handler
    """
def run_message(task_type, func_path, args, kwargs):
    """
    Process async task message from queue.
    
    Executes task function with provided arguments from
    async message queue (Lambda or SNS).
    
    Parameters:
    - task_type: str, type of task ('lambda' or 'sns')
    - func_path: str, importable path to function
    - args: list, positional arguments
    - kwargs: dict, keyword arguments
    
    Returns:
    any: Task function return value
    """

Task Routing

Route asynchronous tasks to appropriate execution backends.

def route_lambda_task(event, context):
    """
    Route Lambda-based async task execution.
    
    Processes Lambda events containing async task data
    and executes the specified function.
    
    Parameters:
    - event: dict, Lambda event with task data
    - context: LambdaContext, Lambda runtime context
    
    Returns:
    any: Task execution result
    """
def route_sns_task(event, context):
    """
    Route SNS-based async task execution.
    
    Processes SNS events containing async task messages
    and executes the specified function.
    
    Parameters:
    - event: dict, Lambda event from SNS trigger
    - context: LambdaContext, Lambda runtime context
    
    Returns:
    any: Task execution result
    """

Response Management

Retrieve and manage asynchronous task responses.

def get_async_response(response_id):
    """
    Retrieve async task response by task ID.
    
    Fetches the response/result of an asynchronous task
    using its unique response identifier.
    
    Parameters:
    - response_id: str, unique task response identifier
    
    Returns:
    any: Task response data or None if not found
    """

Task Management Utilities

Import and manage task functions dynamically.

def import_and_get_task(task_path):
    """
    Import task function from module path.
    
    Dynamically imports and returns function for async execution
    from dotted module path.
    
    Parameters:
    - task_path: str, dotted path to function (module.function)
    
    Returns:
    callable: The imported task function
    """
def get_func_task_path(func):
    """
    Get importable module path for function.
    
    Generates dotted module path string that can be used
    to import and execute function in async context.
    
    Parameters:
    - func: callable, function to get path for
    
    Returns:
    str: Dotted module path (module.function)
    """

Response Handler Classes

Lambda Async Response

Handle responses from Lambda-based async task execution.

class LambdaAsyncResponse:
    """
    Response handler for async Lambda invocations.
    
    Manages response data and status for asynchronous
    Lambda function executions.
    """
    
    def __init__(self, **kwargs):
        """
        Initialize Lambda async response handler.
        
        Parameters:
        - **kwargs: Response configuration and metadata
        """

SNS Async Response

Handle responses from SNS-based async task execution.

class SnsAsyncResponse(LambdaAsyncResponse):
    """
    Response handler for SNS async messaging.
    
    Extends LambdaAsyncResponse with SNS-specific handling
    for higher payload limits and message delivery.
    """
    
    def __init__(self, **kwargs):
        """
        Initialize SNS async response handler.
        
        Parameters:
        - **kwargs: Response configuration and metadata
        """

Exception Classes

class AsyncException(Exception):
    """
    Exception raised for async operation failures.
    
    Indicates errors in asynchronous task execution,
    serialization, or response handling.
    """

Constants

# Payload size limits for async backends
LAMBDA_ASYNC_PAYLOAD_LIMIT = 256000  # 256KB for Lambda async
SNS_ASYNC_PAYLOAD_LIMIT = 256000     # 256KB for SNS messages

# Mapping of async response types to handler classes
ASYNC_CLASSES = {
    'lambda': LambdaAsyncResponse,
    'sns': SnsAsyncResponse
}

Usage Examples

Basic Async Task

from zappa.asynchronous import task, run

@task
def process_data(data, multiplier=1):
    """Process data asynchronously."""
    result = []
    for item in data:
        result.append(item * multiplier)
    return result

# Execute task asynchronously
response = run(process_data, [1, 2, 3, 4], multiplier=2)
print(f"Task ID: {response.response_id}")

# Get result later
result = get_async_response(response.response_id)
print(f"Result: {result}")

SNS-Based Async Task

from zappa.asynchronous import task_sns, run

@task_sns  
def send_email_batch(email_list, subject, body):
    """Send batch emails asynchronously via SNS."""
    import boto3
    ses = boto3.client('ses')
    
    results = []
    for email in email_list:
        response = ses.send_email(
            Source='noreply@example.com',
            Destination={'ToAddresses': [email]},
            Message={
                'Subject': {'Data': subject},
                'Body': {'Text': {'Data': body}}
            }
        )
        results.append(response['MessageId'])
    
    return results

# Execute large batch operation
emails = ['user1@example.com', 'user2@example.com']
response = run(send_email_batch, emails, 
               subject='Newsletter', 
               body='Welcome to our newsletter!')

Custom Task Function

from zappa.asynchronous import run, get_async_response

def expensive_computation(n):
    """Compute factorial asynchronously."""
    import math
    return math.factorial(n)

# Run without decorator (function must be importable)
response = run(expensive_computation, 1000)

# Check response later
result = get_async_response(response.response_id)
print(f"1000! = {result}")

Task with Error Handling

from zappa.asynchronous import task, run, AsyncException

@task
def risky_operation(data):
    """Task that might fail."""
    if not data:
        raise ValueError("Data cannot be empty")
    
    # Simulate processing
    import time
    time.sleep(2)
    return f"Processed {len(data)} items"

try:
    response = run(risky_operation, [])
    result = get_async_response(response.response_id)
except AsyncException as e:
    print(f"Task failed: {e}")

Multiple Task Execution

from zappa.asynchronous import task, run

@task
def process_chunk(chunk_id, data_chunk):
    """Process data chunk."""
    return {
        'chunk_id': chunk_id,
        'processed_count': len(data_chunk),
        'sum': sum(data_chunk)
    }

# Process large dataset in parallel chunks
large_dataset = list(range(10000))
chunk_size = 1000
responses = []

for i in range(0, len(large_dataset), chunk_size):
    chunk = large_dataset[i:i+chunk_size]
    response = run(process_chunk, i//chunk_size, chunk)
    responses.append(response)

# Collect results
results = []
for response in responses:
    result = get_async_response(response.response_id)
    results.append(result)

print(f"Processed {len(results)} chunks")

Install with Tessl CLI

npx tessl i tessl/pypi-zappa

docs

async-tasks.md

cli-operations.md

core-deployment.md

index.md

package-utilities.md

request-processing.md

ssl-management.md

utilities.md

wsgi-processing.md

tile.json