Low-level, data-driven core of boto 3 providing foundational AWS service access.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Streaming response handling for large payloads with support for chunked reading, line iteration, and automatic resource cleanup. Botocore's response handling system efficiently manages HTTP response bodies, especially for large or streaming content from AWS services.
Wrapper class for HTTP response bodies that provides convenient methods for handling streaming data with automatic validation and resource management.
class StreamingBody:
def __init__(
self,
raw_stream,
content_length: int
):
"""
Initialize streaming body wrapper.
Args:
raw_stream: Underlying HTTP response stream
content_length: Expected content length in bytes
"""
def read(self, amt: int = None) -> bytes:
"""
Read at most amt bytes from the stream.
Args:
amt: Maximum number of bytes to read. If None, read all data.
Returns:
bytes: Data read from stream
Raises:
ReadTimeoutError: If read operation times out
ResponseStreamingError: If streaming protocol error occurs
IncompleteReadError: If content length validation fails
"""
def readinto(self, b) -> int:
"""
Read bytes into a pre-allocated, writable bytes-like object.
Args:
b: Pre-allocated buffer to read data into
Returns:
int: Number of bytes read
Raises:
ReadTimeoutError: If read operation times out
ResponseStreamingError: If streaming protocol error occurs
IncompleteReadError: If content length validation fails
"""
def iter_lines(
self,
chunk_size: int = 1024,
keepends: bool = False
) -> Iterator[bytes]:
"""
Return an iterator to yield lines from the raw stream.
Args:
chunk_size: Size of chunks to read while iterating
keepends: Whether to preserve line ending characters
Yields:
bytes: Individual lines from the stream
"""
def iter_chunks(self, chunk_size: int = 1024) -> Iterator[bytes]:
"""
Return an iterator to yield chunks from the raw stream.
Args:
chunk_size: Size of each chunk in bytes
Yields:
bytes: Data chunks from the stream
"""
def close(self) -> None:
"""Close the underlying HTTP response stream."""
def readable(self) -> bool:
"""Check if the stream is readable."""
def tell(self) -> int:
"""Return current position in the stream."""
def set_socket_timeout(self, timeout: float) -> None:
"""
Set timeout on the underlying socket.
Args:
timeout: Timeout value in seconds
"""Function for processing HTTP responses into parsed data structures based on service models.
def get_response(
operation_model: OperationModel,
http_response
) -> Tuple[HTTPResponse, dict]:
"""
Process HTTP response into parsed response data.
Args:
operation_model: Service operation model
http_response: Raw HTTP response object
Returns:
tuple: (http_response, parsed_response_data)
"""from botocore.session import get_session
# Create session and S3 client
session = get_session()
s3_client = session.create_client('s3', region_name='us-east-1')
# Get streaming object
response = s3_client.get_object(Bucket='mybucket', Key='large-file.txt')
streaming_body = response['Body']
# Read entire content
content = streaming_body.read()
print(f"Read {len(content)} bytes")
# Always close when done
streaming_body.close()# Read file in chunks to manage memory usage
response = s3_client.get_object(Bucket='mybucket', Key='large-dataset.csv')
streaming_body = response['Body']
try:
total_size = 0
for chunk in streaming_body.iter_chunks(chunk_size=8192):
# Process chunk (e.g., write to file, analyze data)
total_size += len(chunk)
print(f"Processed chunk of {len(chunk)} bytes")
print(f"Total size: {total_size} bytes")
finally:
streaming_body.close()# Process text files line by line
response = s3_client.get_object(Bucket='mybucket', Key='log-file.txt')
streaming_body = response['Body']
try:
line_count = 0
for line in streaming_body.iter_lines(chunk_size=4096):
# Process each line
decoded_line = line.decode('utf-8')
if 'ERROR' in decoded_line:
print(f"Error found: {decoded_line.strip()}")
line_count += 1
print(f"Processed {line_count} lines")
finally:
streaming_body.close()# Use context manager for automatic cleanup
response = s3_client.get_object(Bucket='mybucket', Key='data.json')
with response['Body'] as streaming_body:
# Stream will be automatically closed when exiting context
data = streaming_body.read()
parsed_data = json.loads(data.decode('utf-8'))# Read into pre-allocated buffer for memory efficiency
response = s3_client.get_object(Bucket='mybucket', Key='binary-data.bin')
streaming_body = response['Body']
try:
buffer = bytearray(8192) # 8KB buffer
total_read = 0
while True:
bytes_read = streaming_body.readinto(buffer)
if bytes_read == 0:
break
# Process buffer contents
total_read += bytes_read
print(f"Read {bytes_read} bytes, total: {total_read}")
# Process the data in buffer[:bytes_read]
finally:
streaming_body.close()# Handle streaming responses from Lambda invocations
lambda_client = session.create_client('lambda', region_name='us-east-1')
response = lambda_client.invoke(
FunctionName='my-function',
InvocationType='RequestResponse',
Payload=json.dumps({'key': 'value'})
)
if 'Payload' in response:
streaming_body = response['Payload']
try:
# Read Lambda response payload
payload_data = streaming_body.read()
result = json.loads(payload_data.decode('utf-8'))
print(f"Lambda result: {result}")
finally:
streaming_body.close()# Process Kinesis data streams
kinesis_client = session.create_client('kinesis', region_name='us-east-1')
response = kinesis_client.get_records(ShardIterator='shard-iterator-value')
for record in response['Records']:
# Kinesis record data is typically base64 encoded
data = record['Data']
# If data is a StreamingBody (for large records)
if hasattr(data, 'read'):
try:
content = data.read()
decoded_data = base64.b64decode(content)
print(f"Record data: {decoded_data}")
finally:
data.close()
else:
# Direct bytes data
decoded_data = base64.b64decode(data)
print(f"Record data: {decoded_data}")# Get response with metadata
response = s3_client.get_object(Bucket='mybucket', Key='file.txt')
# Access response metadata
metadata = response['ResponseMetadata']
print(f"HTTP Status: {metadata['HTTPStatusCode']}")
print(f"Request ID: {metadata['RequestId']}")
# Access HTTP headers
http_headers = metadata['HTTPHeaders']
print(f"Content-Type: {http_headers.get('content-type')}")
print(f"Content-Length: {http_headers.get('content-length')}")
print(f"ETag: {http_headers.get('etag')}")
# Object-specific metadata
print(f"Last Modified: {response.get('LastModified')}")
print(f"ETag: {response.get('ETag')}")
print(f"Content Type: {response.get('ContentType')}")# Process custom metadata for S3 objects
response = s3_client.get_object(Bucket='mybucket', Key='file-with-metadata.txt')
# Custom metadata (user-defined)
metadata = response.get('Metadata', {})
for key, value in metadata.items():
print(f"Custom metadata {key}: {value}")
# Standard S3 metadata
if 'CacheControl' in response:
print(f"Cache Control: {response['CacheControl']}")
if 'ContentDisposition' in response:
print(f"Content Disposition: {response['ContentDisposition']}")
if 'ContentEncoding' in response:
print(f"Content Encoding: {response['ContentEncoding']}")from botocore.exceptions import (
ClientError,
ReadTimeoutError,
ResponseStreamingError,
IncompleteReadError,
NoCredentialsError
)
def safe_stream_processing(bucket, key):
"""Safely process streaming response with comprehensive error handling."""
try:
# Create client
s3_client = session.create_client('s3', region_name='us-east-1')
# Get object
response = s3_client.get_object(Bucket=bucket, Key=key)
streaming_body = response['Body']
try:
# Set socket timeout for read operations
streaming_body.set_socket_timeout(30.0)
# Process data
processed_bytes = 0
for chunk in streaming_body.iter_chunks(chunk_size=8192):
# Process chunk
processed_bytes += len(chunk)
return processed_bytes
except ReadTimeoutError as e:
print(f"Read timeout error: {e}")
return None
except ResponseStreamingError as e:
print(f"Streaming protocol error: {e}")
return None
except IncompleteReadError as e:
print(f"Incomplete read - expected {e.expected_bytes}, got {e.actual_bytes}")
return None
finally:
# Always close the stream
streaming_body.close()
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == 'NoSuchBucket':
print(f"Bucket {bucket} does not exist")
elif error_code == 'NoSuchKey':
print(f"Key {key} does not exist in bucket {bucket}")
elif error_code == 'AccessDenied':
print(f"Access denied to {bucket}/{key}")
else:
print(f"Client error: {error_code} - {e.response['Error']['Message']}")
return None
except NoCredentialsError:
print("AWS credentials not found or invalid")
return None
except Exception as e:
print(f"Unexpected error: {e}")
return None
# Usage
result = safe_stream_processing('mybucket', 'large-file.txt')
if result is not None:
print(f"Successfully processed {result} bytes")# Configure timeouts for streaming operations
from botocore.config import Config
# Create config with custom timeouts
config = Config(
connect_timeout=30, # Connection timeout
read_timeout=60, # Read timeout
retries={
'max_attempts': 3,
'mode': 'adaptive'
}
)
# Create client with timeout configuration
s3_client = session.create_client('s3', region_name='us-east-1', config=config)
# Use client for streaming operations
response = s3_client.get_object(Bucket='mybucket', Key='large-file.bin')
streaming_body = response['Body']
try:
# Additional socket-level timeout
streaming_body.set_socket_timeout(45.0)
# Process with configured timeouts
data = streaming_body.read()
finally:
streaming_body.close()def process_large_file_optimally(bucket, key):
"""Process large files with memory-efficient streaming."""
response = s3_client.get_object(Bucket=bucket, Key=key)
streaming_body = response['Body']
# Determine optimal chunk size based on content length
content_length = int(response.get('ContentLength', 0))
if content_length < 1024 * 1024: # < 1MB
chunk_size = 8192 # 8KB chunks
elif content_length < 100 * 1024 * 1024: # < 100MB
chunk_size = 64 * 1024 # 64KB chunks
else: # > 100MB
chunk_size = 1024 * 1024 # 1MB chunks
try:
processed_chunks = 0
for chunk in streaming_body.iter_chunks(chunk_size=chunk_size):
# Process chunk without storing entire file in memory
process_chunk(chunk)
processed_chunks += 1
# Optional: Progress reporting
if processed_chunks % 100 == 0:
print(f"Processed {processed_chunks} chunks")
finally:
streaming_body.close()
def process_chunk(chunk):
"""Process individual data chunk."""
# Implement chunk processing logic
passdef streaming_pipeline(bucket, key, processors):
"""Create a processing pipeline for streaming data."""
response = s3_client.get_object(Bucket=bucket, Key=key)
streaming_body = response['Body']
try:
for line in streaming_body.iter_lines(chunk_size=16384):
data = line
# Apply processing pipeline
for processor in processors:
data = processor(data)
if data is None: # Processor filtered out data
break
if data is not None:
yield data
finally:
streaming_body.close()
# Example processors
def decode_processor(data):
"""Decode bytes to string."""
try:
return data.decode('utf-8')
except UnicodeDecodeError:
return None # Skip invalid lines
def json_processor(data):
"""Parse JSON from string."""
try:
return json.loads(data)
except json.JSONDecodeError:
return None # Skip invalid JSON
def filter_processor(data):
"""Filter data based on criteria."""
if isinstance(data, dict) and data.get('status') == 'active':
return data
return None
# Usage
processors = [decode_processor, json_processor, filter_processor]
for processed_item in streaming_pipeline('mybucket', 'data.jsonl', processors):
print(f"Processed item: {processed_item}")class StreamProcessor:
"""Context manager for safe stream processing."""
def __init__(self, client, bucket, key):
self.client = client
self.bucket = bucket
self.key = key
self.streaming_body = None
def __enter__(self):
response = self.client.get_object(Bucket=self.bucket, Key=self.key)
self.streaming_body = response['Body']
return self.streaming_body
def __exit__(self, exc_type, exc_val, exc_tb):
if self.streaming_body:
self.streaming_body.close()
# Return False to propagate exceptions
# Usage with automatic cleanup
with StreamProcessor(s3_client, 'mybucket', 'large-file.csv') as stream:
for line in stream.iter_lines():
# Process line
decoded_line = line.decode('utf-8')
# Stream is automatically closed when exiting context# Use S3 Select with streaming responses
def query_s3_with_select(bucket, key, query):
"""Query S3 object content using S3 Select."""
response = s3_client.select_object_content(
Bucket=bucket,
Key=key,
Expression=query,
ExpressionType='SQL',
InputSerialization={
'CSV': {'FileHeaderInfo': 'USE'},
'CompressionType': 'NONE'
},
OutputSerialization={'CSV': {}}
)
# Process streaming results
for event in response['Payload']:
if 'Records' in event:
# Records event contains streaming data
records_data = event['Records']['Payload']
if hasattr(records_data, 'read'):
# Handle as streaming body
try:
chunk = records_data.read()
yield chunk.decode('utf-8')
finally:
records_data.close()
else:
yield records_data.decode('utf-8')
# Usage
query = "SELECT * FROM s3object[*] WHERE age > 25"
for result_chunk in query_s3_with_select('mybucket', 'data.csv', query):
print(f"Query result: {result_chunk}")# Stream CloudWatch Logs
logs_client = session.create_client('logs', region_name='us-east-1')
def stream_log_events(log_group, log_stream):
"""Stream log events from CloudWatch Logs."""
response = logs_client.get_log_events(
logGroupName=log_group,
logStreamName=log_stream,
startFromHead=True
)
for event in response['events']:
# Process log event
timestamp = event['timestamp']
message = event['message']
# Convert timestamp to readable format
readable_time = datetime.fromtimestamp(timestamp / 1000)
print(f"[{readable_time}] {message}")
# Usage
stream_log_events('/aws/lambda/my-function', '2024/01/01/stream-name')This comprehensive response handling documentation provides developers with the knowledge and examples needed to effectively work with streaming responses in botocore, covering everything from basic usage to advanced streaming patterns and error handling strategies.
Install with Tessl CLI
npx tessl i tessl/pypi-botocore