Async boto3 wrapper providing asynchronous AWS SDK functionality
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Enhanced async DynamoDB operations including batch writing capabilities and async context management for table resources. aioboto3 provides async versions of boto3's DynamoDB high-level interface with additional performance optimizations.
Enhanced table resources with async batch writing capabilities. The CustomTableResource extends the standard boto3 table interface with async-specific optimizations.
class CustomTableResource:
def batch_writer(
self,
overwrite_by_pkeys = None,
flush_amount: int = 25,
on_exit_loop_sleep: int = 0
):
"""
Create an async batch writer for efficient bulk operations.
Parameters:
- overwrite_by_pkeys: List of primary key names to use for overwrite detection
- flush_amount: Number of items to buffer before automatic flush
- on_exit_loop_sleep: Sleep time in seconds when exiting context manager
Returns:
BatchWriter: Async context manager for batch operations
"""Async batch writer for efficient bulk DynamoDB operations with automatic batching and retry logic.
class BatchWriter:
def __init__(
self,
table_name: str,
client,
flush_amount: int = 25,
overwrite_by_pkeys = None,
on_exit_loop_sleep: int = 0
):
"""
Initialize batch writer.
Parameters:
- table_name: Name of the DynamoDB table
- client: DynamoDB client instance
- flush_amount: Number of items to buffer before flushing
- overwrite_by_pkeys: Primary key names for overwrite detection
- on_exit_loop_sleep: Sleep time when exiting context
"""
async def __aenter__(self):
"""
Async context manager entry.
Returns:
self: The BatchWriter instance
"""
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""
Async context manager exit. Flushes any remaining operations.
Parameters:
- exc_type: Exception type if an exception occurred
- exc_val: Exception value if an exception occurred
- exc_tb: Exception traceback if an exception occurred
"""
async def put_item(self, Item: dict, **kwargs):
"""
Add a put item operation to the batch.
Parameters:
- Item: Dictionary representing the item to put
- **kwargs: Additional parameters for the put operation
"""
async def delete_item(self, Key: dict, **kwargs):
"""
Add a delete item operation to the batch.
Parameters:
- Key: Dictionary representing the key of the item to delete
- **kwargs: Additional parameters for the delete operation
"""
async def _flush(self):
"""
Flush all pending batch operations to DynamoDB.
Internal method called automatically when buffer is full.
"""Standard async table operations inherited from boto3 with async/await support.
# These are the standard boto3 table methods, now with async support
async def put_item(self, Item: dict, **kwargs):
"""Put an item into the table."""
async def get_item(self, Key: dict, **kwargs):
"""Get an item from the table."""
async def update_item(self, Key: dict, **kwargs):
"""Update an item in the table."""
async def delete_item(self, Key: dict, **kwargs):
"""Delete an item from the table."""
async def query(self, **kwargs):
"""Query items from the table."""
async def scan(self, **kwargs):
"""Scan items from the table."""
async def batch_get_item(self, **kwargs):
"""Batch get multiple items."""import aioboto3
from boto3.dynamodb.conditions import Key
async def basic_table_operations():
session = aioboto3.Session()
async with session.resource('dynamodb', region_name='us-east-1') as dynamodb:
table = await dynamodb.Table('my-table')
# Put an item
await table.put_item(
Item={
'id': '123',
'name': 'John Doe',
'email': 'john@example.com'
}
)
# Get an item
response = await table.get_item(Key={'id': '123'})
item = response.get('Item')
# Query items
response = await table.query(
KeyConditionExpression=Key('id').eq('123')
)
items = response['Items']
# Update an item
await table.update_item(
Key={'id': '123'},
UpdateExpression='SET #name = :name',
ExpressionAttributeNames={'#name': 'name'},
ExpressionAttributeValues={':name': 'Jane Doe'}
)
# Delete an item
await table.delete_item(Key={'id': '123'})async def batch_operations():
session = aioboto3.Session()
async with session.resource('dynamodb', region_name='us-east-1') as dynamodb:
table = await dynamodb.Table('my-table')
# Use batch writer for efficient bulk operations
async with table.batch_writer() as batch:
# Add multiple items
for i in range(100):
await batch.put_item(
Item={
'id': str(i),
'data': f'item-{i}',
'timestamp': int(time.time())
}
)
# Mix puts and deletes
await batch.delete_item(Key={'id': '50'})
# Items are automatically flushed when buffer reaches flush_amount
# or when exiting the context managerasync def advanced_batch_writer():
session = aioboto3.Session()
async with session.resource('dynamodb', region_name='us-east-1') as dynamodb:
table = await dynamodb.Table('my-table')
# Configure batch writer with custom settings
async with table.batch_writer(
flush_amount=50, # Flush every 50 items instead of 25
overwrite_by_pkeys=['id'], # Detect overwrites by 'id' field
on_exit_loop_sleep=1 # Sleep 1 second when exiting
) as batch:
items = [
{'id': '1', 'name': 'Alice'},
{'id': '2', 'name': 'Bob'},
{'id': '1', 'name': 'Alice Updated'} # Will overwrite first item
]
for item in items:
await batch.put_item(Item=item)import botocore.exceptions
async def handle_dynamodb_errors():
session = aioboto3.Session()
try:
async with session.resource('dynamodb', region_name='us-east-1') as dynamodb:
table = await dynamodb.Table('my-table')
await table.put_item(Item={'id': '123', 'data': 'test'})
except botocore.exceptions.ClientError as e:
error_code = e.response['Error']['Code']
if error_code == 'ResourceNotFoundException':
print("Table does not exist")
elif error_code == 'ValidationException':
print("Invalid request parameters")
elif error_code == 'ProvisionedThroughputExceededException':
print("Request rate too high")
else:
print(f"Unexpected error: {error_code}")Install with Tessl CLI
npx tessl i tessl/pypi-aioboto3