CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-aioboto3

Async boto3 wrapper providing asynchronous AWS SDK functionality

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

experimental.mddocs/

Experimental Features

Experimental integrations and features including Chalice framework support for serverless applications. These features are in development and may change in future versions.

Capabilities

AsyncChalice Framework Integration

Enhanced Chalice application class with async support and integrated aioboto3 session management for AWS Lambda functions.

class AsyncChalice:
    def __init__(
        self,
        *args,
        aioboto3_session: Session = None,
        **kwargs
    ):
        """
        Initialize AsyncChalice application with aioboto3 integration.
        
        Parameters:
        - *args: Standard Chalice initialization arguments
        - aioboto3_session: Optional aioboto3 Session instance
        - **kwargs: Standard Chalice initialization keyword arguments
        """
    
    def __call__(self, event, context):
        """
        Lambda handler entry point.
        
        Parameters:
        - event: AWS Lambda event object
        - context: AWS Lambda context object
        
        Returns:
        Response from the Chalice application
        """
    
    @property
    def aioboto3(self) -> Session:
        """
        Access to the integrated aioboto3 session.
        
        Returns:
        Session: The aioboto3 session instance
        """

Async REST API Event Handler

Enhanced REST API event handler that supports async view functions with automatic coroutine handling.

class AsyncRestAPIEventHandler:
    def _get_view_function_response(self, view_function, function_args):
        """
        Handle both sync and async view functions.
        
        Automatically detects coroutines and runs them in event loop.
        
        Parameters:
        - view_function: The view function to execute
        - function_args: Arguments to pass to the view function
        
        Returns:
        Response from the view function
        """

Usage Examples

Basic AsyncChalice Application

from chalice import Chalice
from aioboto3.experimental.async_chalice import AsyncChalice
from aioboto3 import Session
import asyncio

# Create AsyncChalice app with aioboto3 integration
app = AsyncChalice(app_name='my-async-app')

@app.route('/')
async def index():
    """Async route handler."""
    # Access integrated aioboto3 session
    async with app.aioboto3.client('s3') as s3:
        response = await s3.list_buckets()
        bucket_names = [bucket['Name'] for bucket in response['Buckets']]
    
    return {
        'message': 'Hello from async Chalice!',
        'buckets': bucket_names
    }

@app.route('/dynamo/{table_name}', methods=['GET'])
async def get_table_info(table_name):
    """Get DynamoDB table information."""
    async with app.aioboto3.resource('dynamodb') as dynamodb:
        table = await dynamodb.Table(table_name)
        
        # Get table description
        response = await table.meta.client.describe_table(TableName=table_name)
        
        return {
            'table_name': table_name,
            'item_count': response['Table']['ItemCount'],
            'status': response['Table']['TableStatus']
        }

Custom Session Configuration

from aioboto3.experimental.async_chalice import AsyncChalice
from aioboto3 import Session

# Create custom aioboto3 session
custom_session = Session(
    region_name='us-west-2',
    profile_name='production'
)

# Initialize AsyncChalice with custom session
app = AsyncChalice(
    app_name='my-custom-app',
    aioboto3_session=custom_session
)

@app.route('/upload', methods=['POST'])
async def upload_file():
    """Upload file to S3 using custom session."""
    request = app.current_request
    raw_body = request.raw_body
    
    # Use the custom session for S3 operations
    async with app.aioboto3.client('s3') as s3:
        await s3.put_object(
            Bucket='my-app-uploads',
            Key='uploaded-file.bin',
            Body=raw_body
        )
    
    return {'status': 'uploaded', 'size': len(raw_body)}

Mixed Sync/Async Routes

from aioboto3.experimental.async_chalice import AsyncChalice

app = AsyncChalice(app_name='mixed-app')

@app.route('/sync')
def sync_route():
    """Traditional synchronous route."""
    return {'type': 'sync', 'message': 'This is a sync route'}

@app.route('/async')
async def async_route():
    """Async route with AWS operations."""
    async with app.aioboto3.client('ssm') as ssm:
        response = await ssm.get_parameter(
            Name='/my-app/config/database-url'
        )
        
        return {
            'type': 'async',
            'parameter_value': response['Parameter']['Value']
        }

@app.route('/batch-operations', methods=['POST'])
async def batch_operations():
    """Perform batch AWS operations."""
    request = app.current_request
    items = request.json_body.get('items', [])
    
    async with app.aioboto3.resource('dynamodb') as dynamodb:
        table = await dynamodb.Table('my-items')
        
        # Use batch writer for efficient operations
        async with table.batch_writer() as batch:
            for item in items:
                await batch.put_item(Item=item)
    
    return {'processed': len(items)}

Error Handling in Async Routes

from chalice import BadRequestError, InternalServerError
from botocore.exceptions import ClientError

app = AsyncChalice(app_name='error-handling-app')

@app.route('/safe-operation/{resource_id}', methods=['GET'])
async def safe_operation(resource_id):
    """Route with comprehensive error handling."""
    try:
        async with app.aioboto3.client('dynamodb') as dynamodb:
            response = await dynamodb.get_item(
                TableName='my-resources',
                Key={'id': {'S': resource_id}}
            )
            
            if 'Item' not in response:
                raise BadRequestError(f"Resource {resource_id} not found")
            
            return {
                'resource_id': resource_id,
                'data': response['Item']
            }
    
    except ClientError as e:
        error_code = e.response['Error']['Code']
        
        if error_code == 'ResourceNotFoundException':
            raise BadRequestError("Table does not exist")
        elif error_code == 'AccessDeniedException':
            raise InternalServerError("Access denied to DynamoDB")
        else:
            raise InternalServerError(f"AWS error: {error_code}")
    
    except Exception as e:
        app.log.error(f"Unexpected error: {e}")
        raise InternalServerError("Internal server error")

Integration with Other AWS Services

import json
from datetime import datetime

app = AsyncChalice(app_name='multi-service-app')

@app.route('/process-data', methods=['POST'])
async def process_data():
    """Process data using multiple AWS services."""
    request = app.current_request
    data = request.json_body
    
    results = {}
    
    # Store in DynamoDB
    async with app.aioboto3.resource('dynamodb') as dynamodb:
        table = await dynamodb.Table('processed-data')
        
        item = {
            'id': data.get('id'),
            'processed_at': datetime.utcnow().isoformat(),
            'data': data
        }
        
        await table.put_item(Item=item)
        results['dynamodb'] = 'stored'
    
    # Send to SQS
    async with app.aioboto3.client('sqs') as sqs:
        queue_url = 'https://sqs.us-east-1.amazonaws.com/123456789012/my-queue'
        
        await sqs.send_message(
            QueueUrl=queue_url,
            MessageBody=json.dumps(data)
        )
        results['sqs'] = 'queued'
    
    # Upload to S3
    async with app.aioboto3.client('s3') as s3:
        key = f"processed/{data.get('id')}.json"
        
        await s3.put_object(
            Bucket='my-processed-data',
            Key=key,
            Body=json.dumps(item),
            ContentType='application/json'
        )
        results['s3'] = key
    
    return results

Lambda Context Access

app = AsyncChalice(app_name='context-app')

@app.route('/lambda-info')
async def lambda_info():
    """Access Lambda context information."""
    # Access Lambda context through app.lambda_context
    context = app.lambda_context
    
    return {
        'function_name': context.function_name,
        'function_version': context.function_version,
        'memory_limit': context.memory_limit_in_mb,
        'request_id': context.aws_request_id,
        'remaining_time': context.get_remaining_time_in_millis()
    }

@app.route('/performance-test')
async def performance_test():
    """Test async performance in Lambda."""
    import time
    
    start_time = time.time()
    
    # Perform concurrent operations
    tasks = []
    async with app.aioboto3.client('s3') as s3:
        # Create multiple concurrent S3 operations
        for i in range(5):
            task = s3.list_objects_v2(Bucket='my-test-bucket', MaxKeys=1)
            tasks.append(task)
        
        # Wait for all operations to complete
        results = await asyncio.gather(*tasks)
    
    end_time = time.time()
    
    return {
        'operations': len(results),
        'duration_seconds': end_time - start_time,
        'remaining_lambda_time': app.lambda_context.get_remaining_time_in_millis()
    }

Install with Tessl CLI

npx tessl i tessl/pypi-aioboto3

docs

client-side-encryption.md

dynamodb.md

experimental.md

index.md

s3-operations.md

session-management.md

tile.json