Async boto3 wrapper providing asynchronous AWS SDK functionality
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Async S3 operations including file transfers, object operations, and enhanced functionality for working with Amazon S3. aioboto3 provides async versions of boto3's S3 transfer methods with progress callbacks and advanced configuration options.
Async file upload operations with progress tracking and transfer configuration options.
async def upload_file(
filename: str,
bucket: str,
key: str,
callback = None,
config = None
):
"""
Upload a file to S3.
Parameters:
- filename: Local file path to upload
- bucket: S3 bucket name
- key: S3 object key (path within bucket)
- callback: Optional progress callback function
- config: S3TransferConfig for advanced transfer settings
"""
async def upload_fileobj(
fileobj,
bucket: str,
key: str,
callback = None,
config = None
):
"""
Upload a file-like object to S3.
Parameters:
- fileobj: File-like object to upload (must support read())
- bucket: S3 bucket name
- key: S3 object key
- callback: Optional progress callback function
- config: S3TransferConfig for advanced transfer settings
"""Async file download operations with progress tracking and error handling.
async def download_file(
bucket: str,
key: str,
filename: str,
callback = None,
config = None
):
"""
Download a file from S3.
Parameters:
- bucket: S3 bucket name
- key: S3 object key to download
- filename: Local file path to save to
- callback: Optional progress callback function
- config: S3TransferConfig for advanced transfer settings
"""
async def download_fileobj(
bucket: str,
key: str,
fileobj,
callback = None,
config = None
):
"""
Download an S3 object to a file-like object.
Parameters:
- bucket: S3 bucket name
- key: S3 object key to download
- fileobj: File-like object to write to (must support write())
- callback: Optional progress callback function
- config: S3TransferConfig for advanced transfer settings
"""Async S3 object copy operations within and between buckets.
async def copy(
copy_source: dict,
bucket: str,
key: str,
callback = None,
config = None
):
"""
Copy an S3 object from one location to another.
Parameters:
- copy_source: Dictionary specifying source bucket and key
{'Bucket': 'source-bucket', 'Key': 'source-key'}
- bucket: Destination bucket name
- key: Destination object key
- callback: Optional progress callback function
- config: S3TransferConfig for advanced transfer settings
"""Extended functionality for S3 bucket and object resources with async support.
class S3Bucket:
async def upload_file(self, filename: str, key: str, **kwargs):
"""Upload file to this bucket."""
async def download_file(self, key: str, filename: str, **kwargs):
"""Download file from this bucket."""
async def upload_fileobj(self, fileobj, key: str, **kwargs):
"""Upload file object to this bucket."""
async def download_fileobj(self, key: str, fileobj, **kwargs):
"""Download object to file object from this bucket."""
async def copy(self, copy_source: dict, key: str, **kwargs):
"""Copy object to this bucket."""
@property
def objects(self) -> S3ObjectCollection:
"""Collection of all objects in this bucket."""
class S3Object:
async def upload_file(self, filename: str, **kwargs):
"""Upload file to this object."""
async def download_file(self, filename: str, **kwargs):
"""Download this object to file."""
async def upload_fileobj(self, fileobj, **kwargs):
"""Upload file object to this object."""
async def download_fileobj(self, fileobj, **kwargs):
"""Download this object to file object."""
class S3ObjectSummary:
async def load(self):
"""Load object metadata asynchronously."""
class S3ObjectCollection:
"""Async iterable collection of S3 objects."""
def __aiter__(self):
"""Return async iterator for objects."""
async def __anext__(self):
"""Get next object in collection."""
async def pages(self):
"""Return async iterator of pages of objects."""
def all(self):
"""Return iterator for all objects in collection."""
def filter(self, **kwargs):
"""Return filtered collection based on parameters."""
def limit(self, count: int):
"""Limit the number of objects returned."""
def page_size(self, count: int):
"""Set the page size for pagination."""Configuration options for S3 transfer operations using boto3's S3TransferConfig.
from boto3.s3.transfer import S3TransferConfig
# Transfer configuration is used with all transfer methods
config = S3TransferConfig(
multipart_threshold=1024 * 25, # 25MB
max_concurrency=10,
multipart_chunksize=1024 * 25,
use_threads=True
)import aioboto3
import asyncio
async def basic_s3_operations():
session = aioboto3.Session()
async with session.client('s3', region_name='us-east-1') as s3:
# Upload a file
await s3.upload_file(
'/path/to/local/file.txt',
'my-bucket',
'uploads/file.txt'
)
# Download a file
await s3.download_file(
'my-bucket',
'uploads/file.txt',
'/path/to/downloaded/file.txt'
)
# List objects
response = await s3.list_objects_v2(Bucket='my-bucket')
for obj in response.get('Contents', []):
print(f"Object: {obj['Key']}, Size: {obj['Size']}")from io import BytesIO
async def fileobj_operations():
session = aioboto3.Session()
async with session.client('s3', region_name='us-east-1') as s3:
# Upload from memory
data = b"Hello, World!"
fileobj = BytesIO(data)
await s3.upload_fileobj(
fileobj,
'my-bucket',
'data/hello.txt'
)
# Download to memory
download_fileobj = BytesIO()
await s3.download_fileobj(
'my-bucket',
'data/hello.txt',
download_fileobj
)
download_fileobj.seek(0)
content = download_fileobj.read()
print(f"Downloaded content: {content}")async def upload_with_progress():
session = aioboto3.Session()
def progress_callback(bytes_transferred):
print(f"Transferred: {bytes_transferred} bytes")
async with session.client('s3', region_name='us-east-1') as s3:
await s3.upload_file(
'/path/to/large/file.zip',
'my-bucket',
'uploads/large-file.zip',
Callback=progress_callback
)from boto3.s3.transfer import S3TransferConfig
async def configured_transfer():
session = aioboto3.Session()
# Configure transfer for large files
config = S3TransferConfig(
multipart_threshold=1024 * 25, # 25MB threshold for multipart
max_concurrency=10, # Up to 10 concurrent transfers
multipart_chunksize=1024 * 25, # 25MB chunks
use_threads=True # Use threading for concurrency
)
async with session.client('s3', region_name='us-east-1') as s3:
await s3.upload_file(
'/path/to/very/large/file.zip',
'my-bucket',
'uploads/large-file.zip',
Config=config
)async def copy_objects():
session = aioboto3.Session()
async with session.client('s3', region_name='us-east-1') as s3:
# Copy object within same bucket
copy_source = {
'Bucket': 'my-bucket',
'Key': 'original/file.txt'
}
await s3.copy(
copy_source,
'my-bucket',
'copies/file.txt'
)
# Copy object between buckets
copy_source = {
'Bucket': 'source-bucket',
'Key': 'data/file.txt'
}
await s3.copy(
copy_source,
'destination-bucket',
'imported/file.txt'
)async def s3_resource_operations():
session = aioboto3.Session()
async with session.resource('s3', region_name='us-east-1') as s3:
bucket = await s3.Bucket('my-bucket')
# Upload using bucket resource
await bucket.upload_file(
'/path/to/file.txt',
'uploads/file.txt'
)
# Work with object resource
obj = await s3.Object('my-bucket', 'uploads/file.txt')
await obj.download_file('/path/to/downloaded/file.txt')
# List objects in bucket
async for obj_summary in bucket.objects.all():
print(f"Object: {obj_summary.key}")import botocore.exceptions
async def handle_s3_errors():
session = aioboto3.Session()
try:
async with session.client('s3', region_name='us-east-1') as s3:
await s3.upload_file(
'/nonexistent/file.txt',
'my-bucket',
'uploads/file.txt'
)
except FileNotFoundError:
print("Local file not found")
except botocore.exceptions.ClientError as e:
error_code = e.response['Error']['Code']
if error_code == 'NoSuchBucket':
print("Bucket does not exist")
elif error_code == 'AccessDenied':
print("Access denied to bucket or object")
elif error_code == 'NoSuchKey':
print("Object key does not exist")
else:
print(f"S3 error: {error_code}")
except Exception as e:
print(f"Unexpected error: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-aioboto3