Async boto3 wrapper providing asynchronous AWS SDK functionality
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Experimental integrations and features including Chalice framework support for serverless applications. These features are in development and may change in future versions.
Enhanced Chalice application class with async support and integrated aioboto3 session management for AWS Lambda functions.
class AsyncChalice:
def __init__(
self,
*args,
aioboto3_session: Session = None,
**kwargs
):
"""
Initialize AsyncChalice application with aioboto3 integration.
Parameters:
- *args: Standard Chalice initialization arguments
- aioboto3_session: Optional aioboto3 Session instance
- **kwargs: Standard Chalice initialization keyword arguments
"""
def __call__(self, event, context):
"""
Lambda handler entry point.
Parameters:
- event: AWS Lambda event object
- context: AWS Lambda context object
Returns:
Response from the Chalice application
"""
@property
def aioboto3(self) -> Session:
"""
Access to the integrated aioboto3 session.
Returns:
Session: The aioboto3 session instance
"""Enhanced REST API event handler that supports async view functions with automatic coroutine handling.
class AsyncRestAPIEventHandler:
def _get_view_function_response(self, view_function, function_args):
"""
Handle both sync and async view functions.
Automatically detects coroutines and runs them in event loop.
Parameters:
- view_function: The view function to execute
- function_args: Arguments to pass to the view function
Returns:
Response from the view function
"""from chalice import Chalice
from aioboto3.experimental.async_chalice import AsyncChalice
from aioboto3 import Session
import asyncio
# Create AsyncChalice app with aioboto3 integration
app = AsyncChalice(app_name='my-async-app')
@app.route('/')
async def index():
"""Async route handler."""
# Access integrated aioboto3 session
async with app.aioboto3.client('s3') as s3:
response = await s3.list_buckets()
bucket_names = [bucket['Name'] for bucket in response['Buckets']]
return {
'message': 'Hello from async Chalice!',
'buckets': bucket_names
}
@app.route('/dynamo/{table_name}', methods=['GET'])
async def get_table_info(table_name):
"""Get DynamoDB table information."""
async with app.aioboto3.resource('dynamodb') as dynamodb:
table = await dynamodb.Table(table_name)
# Get table description
response = await table.meta.client.describe_table(TableName=table_name)
return {
'table_name': table_name,
'item_count': response['Table']['ItemCount'],
'status': response['Table']['TableStatus']
}from aioboto3.experimental.async_chalice import AsyncChalice
from aioboto3 import Session
# Create custom aioboto3 session
custom_session = Session(
region_name='us-west-2',
profile_name='production'
)
# Initialize AsyncChalice with custom session
app = AsyncChalice(
app_name='my-custom-app',
aioboto3_session=custom_session
)
@app.route('/upload', methods=['POST'])
async def upload_file():
"""Upload file to S3 using custom session."""
request = app.current_request
raw_body = request.raw_body
# Use the custom session for S3 operations
async with app.aioboto3.client('s3') as s3:
await s3.put_object(
Bucket='my-app-uploads',
Key='uploaded-file.bin',
Body=raw_body
)
return {'status': 'uploaded', 'size': len(raw_body)}from aioboto3.experimental.async_chalice import AsyncChalice
app = AsyncChalice(app_name='mixed-app')
@app.route('/sync')
def sync_route():
"""Traditional synchronous route."""
return {'type': 'sync', 'message': 'This is a sync route'}
@app.route('/async')
async def async_route():
"""Async route with AWS operations."""
async with app.aioboto3.client('ssm') as ssm:
response = await ssm.get_parameter(
Name='/my-app/config/database-url'
)
return {
'type': 'async',
'parameter_value': response['Parameter']['Value']
}
@app.route('/batch-operations', methods=['POST'])
async def batch_operations():
"""Perform batch AWS operations."""
request = app.current_request
items = request.json_body.get('items', [])
async with app.aioboto3.resource('dynamodb') as dynamodb:
table = await dynamodb.Table('my-items')
# Use batch writer for efficient operations
async with table.batch_writer() as batch:
for item in items:
await batch.put_item(Item=item)
return {'processed': len(items)}from chalice import BadRequestError, InternalServerError
from botocore.exceptions import ClientError
app = AsyncChalice(app_name='error-handling-app')
@app.route('/safe-operation/{resource_id}', methods=['GET'])
async def safe_operation(resource_id):
"""Route with comprehensive error handling."""
try:
async with app.aioboto3.client('dynamodb') as dynamodb:
response = await dynamodb.get_item(
TableName='my-resources',
Key={'id': {'S': resource_id}}
)
if 'Item' not in response:
raise BadRequestError(f"Resource {resource_id} not found")
return {
'resource_id': resource_id,
'data': response['Item']
}
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == 'ResourceNotFoundException':
raise BadRequestError("Table does not exist")
elif error_code == 'AccessDeniedException':
raise InternalServerError("Access denied to DynamoDB")
else:
raise InternalServerError(f"AWS error: {error_code}")
except Exception as e:
app.log.error(f"Unexpected error: {e}")
raise InternalServerError("Internal server error")import json
from datetime import datetime
app = AsyncChalice(app_name='multi-service-app')
@app.route('/process-data', methods=['POST'])
async def process_data():
"""Process data using multiple AWS services."""
request = app.current_request
data = request.json_body
results = {}
# Store in DynamoDB
async with app.aioboto3.resource('dynamodb') as dynamodb:
table = await dynamodb.Table('processed-data')
item = {
'id': data.get('id'),
'processed_at': datetime.utcnow().isoformat(),
'data': data
}
await table.put_item(Item=item)
results['dynamodb'] = 'stored'
# Send to SQS
async with app.aioboto3.client('sqs') as sqs:
queue_url = 'https://sqs.us-east-1.amazonaws.com/123456789012/my-queue'
await sqs.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps(data)
)
results['sqs'] = 'queued'
# Upload to S3
async with app.aioboto3.client('s3') as s3:
key = f"processed/{data.get('id')}.json"
await s3.put_object(
Bucket='my-processed-data',
Key=key,
Body=json.dumps(item),
ContentType='application/json'
)
results['s3'] = key
return resultsapp = AsyncChalice(app_name='context-app')
@app.route('/lambda-info')
async def lambda_info():
"""Access Lambda context information."""
# Access Lambda context through app.lambda_context
context = app.lambda_context
return {
'function_name': context.function_name,
'function_version': context.function_version,
'memory_limit': context.memory_limit_in_mb,
'request_id': context.aws_request_id,
'remaining_time': context.get_remaining_time_in_millis()
}
@app.route('/performance-test')
async def performance_test():
"""Test async performance in Lambda."""
import time
start_time = time.time()
# Perform concurrent operations
tasks = []
async with app.aioboto3.client('s3') as s3:
# Create multiple concurrent S3 operations
for i in range(5):
task = s3.list_objects_v2(Bucket='my-test-bucket', MaxKeys=1)
tasks.append(task)
# Wait for all operations to complete
results = await asyncio.gather(*tasks)
end_time = time.time()
return {
'operations': len(results),
'duration_seconds': end_time - start_time,
'remaining_lambda_time': app.lambda_context.get_remaining_time_in_millis()
}Install with Tessl CLI
npx tessl i tessl/pypi-aioboto3