CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-botocore

Low-level, data-driven core of boto 3 providing foundational AWS service access.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

response.mddocs/

Response Handling

Streaming response handling for large payloads with support for chunked reading, line iteration, and automatic resource cleanup. Botocore's response handling system efficiently manages HTTP response bodies, especially for large or streaming content from AWS services.

Capabilities

StreamingBody Class

Wrapper class for HTTP response bodies that provides convenient methods for handling streaming data with automatic validation and resource management.

class StreamingBody:
    def __init__(
        self,
        raw_stream,
        content_length: int
    ):
        """
        Initialize streaming body wrapper.
        
        Args:
            raw_stream: Underlying HTTP response stream
            content_length: Expected content length in bytes
        """

    def read(self, amt: int = None) -> bytes:
        """
        Read at most amt bytes from the stream.
        
        Args:
            amt: Maximum number of bytes to read. If None, read all data.
            
        Returns:
            bytes: Data read from stream
            
        Raises:
            ReadTimeoutError: If read operation times out
            ResponseStreamingError: If streaming protocol error occurs
            IncompleteReadError: If content length validation fails
        """

    def readinto(self, b) -> int:
        """
        Read bytes into a pre-allocated, writable bytes-like object.
        
        Args:
            b: Pre-allocated buffer to read data into
            
        Returns:
            int: Number of bytes read
            
        Raises:
            ReadTimeoutError: If read operation times out
            ResponseStreamingError: If streaming protocol error occurs
            IncompleteReadError: If content length validation fails
        """

    def iter_lines(
        self,
        chunk_size: int = 1024,
        keepends: bool = False
    ) -> Iterator[bytes]:
        """
        Return an iterator to yield lines from the raw stream.
        
        Args:
            chunk_size: Size of chunks to read while iterating
            keepends: Whether to preserve line ending characters
            
        Yields:
            bytes: Individual lines from the stream
        """

    def iter_chunks(self, chunk_size: int = 1024) -> Iterator[bytes]:
        """
        Return an iterator to yield chunks from the raw stream.
        
        Args:
            chunk_size: Size of each chunk in bytes
            
        Yields:
            bytes: Data chunks from the stream
        """

    def close(self) -> None:
        """Close the underlying HTTP response stream."""

    def readable(self) -> bool:
        """Check if the stream is readable."""

    def tell(self) -> int:
        """Return current position in the stream."""

    def set_socket_timeout(self, timeout: float) -> None:
        """
        Set timeout on the underlying socket.
        
        Args:
            timeout: Timeout value in seconds
        """

Response Processing

Function for processing HTTP responses into parsed data structures based on service models.

def get_response(
    operation_model: OperationModel,
    http_response
) -> Tuple[HTTPResponse, dict]:
    """
    Process HTTP response into parsed response data.
    
    Args:
        operation_model: Service operation model
        http_response: Raw HTTP response object
        
    Returns:
        tuple: (http_response, parsed_response_data)
    """

Usage Examples

Basic Streaming Response Handling

from botocore.session import get_session

# Create session and S3 client
session = get_session()
s3_client = session.create_client('s3', region_name='us-east-1')

# Get streaming object
response = s3_client.get_object(Bucket='mybucket', Key='large-file.txt')
streaming_body = response['Body']

# Read entire content
content = streaming_body.read()
print(f"Read {len(content)} bytes")

# Always close when done
streaming_body.close()

Chunked Reading for Large Files

# Read file in chunks to manage memory usage
response = s3_client.get_object(Bucket='mybucket', Key='large-dataset.csv')
streaming_body = response['Body']

try:
    total_size = 0
    for chunk in streaming_body.iter_chunks(chunk_size=8192):
        # Process chunk (e.g., write to file, analyze data)
        total_size += len(chunk)
        print(f"Processed chunk of {len(chunk)} bytes")
    
    print(f"Total size: {total_size} bytes")
finally:
    streaming_body.close()

Line-by-Line Processing

# Process text files line by line
response = s3_client.get_object(Bucket='mybucket', Key='log-file.txt')
streaming_body = response['Body']

try:
    line_count = 0
    for line in streaming_body.iter_lines(chunk_size=4096):
        # Process each line
        decoded_line = line.decode('utf-8')
        if 'ERROR' in decoded_line:
            print(f"Error found: {decoded_line.strip()}")
        line_count += 1
    
    print(f"Processed {line_count} lines")
finally:
    streaming_body.close()

Context Manager Usage

# Use context manager for automatic cleanup
response = s3_client.get_object(Bucket='mybucket', Key='data.json')

with response['Body'] as streaming_body:
    # Stream will be automatically closed when exiting context
    data = streaming_body.read()
    parsed_data = json.loads(data.decode('utf-8'))

Streaming with Custom Buffer

# Read into pre-allocated buffer for memory efficiency
response = s3_client.get_object(Bucket='mybucket', Key='binary-data.bin')
streaming_body = response['Body']

try:
    buffer = bytearray(8192)  # 8KB buffer
    total_read = 0
    
    while True:
        bytes_read = streaming_body.readinto(buffer)
        if bytes_read == 0:
            break
        
        # Process buffer contents
        total_read += bytes_read
        print(f"Read {bytes_read} bytes, total: {total_read}")
        
        # Process the data in buffer[:bytes_read]
finally:
    streaming_body.close()

Lambda Function Response Streaming

# Handle streaming responses from Lambda invocations
lambda_client = session.create_client('lambda', region_name='us-east-1')

response = lambda_client.invoke(
    FunctionName='my-function',
    InvocationType='RequestResponse',
    Payload=json.dumps({'key': 'value'})
)

if 'Payload' in response:
    streaming_body = response['Payload']
    
    try:
        # Read Lambda response payload
        payload_data = streaming_body.read()
        result = json.loads(payload_data.decode('utf-8'))
        print(f"Lambda result: {result}")
    finally:
        streaming_body.close()

Kinesis Data Streams

# Process Kinesis data streams
kinesis_client = session.create_client('kinesis', region_name='us-east-1')

response = kinesis_client.get_records(ShardIterator='shard-iterator-value')

for record in response['Records']:
    # Kinesis record data is typically base64 encoded
    data = record['Data']
    
    # If data is a StreamingBody (for large records)
    if hasattr(data, 'read'):
        try:
            content = data.read()
            decoded_data = base64.b64decode(content)
            print(f"Record data: {decoded_data}")
        finally:
            data.close()
    else:
        # Direct bytes data
        decoded_data = base64.b64decode(data)
        print(f"Record data: {decoded_data}")

Response Metadata Handling

Accessing Response Headers and Status

# Get response with metadata
response = s3_client.get_object(Bucket='mybucket', Key='file.txt')

# Access response metadata
metadata = response['ResponseMetadata']
print(f"HTTP Status: {metadata['HTTPStatusCode']}")
print(f"Request ID: {metadata['RequestId']}")

# Access HTTP headers
http_headers = metadata['HTTPHeaders']
print(f"Content-Type: {http_headers.get('content-type')}")
print(f"Content-Length: {http_headers.get('content-length')}")
print(f"ETag: {http_headers.get('etag')}")

# Object-specific metadata
print(f"Last Modified: {response.get('LastModified')}")
print(f"ETag: {response.get('ETag')}")
print(f"Content Type: {response.get('ContentType')}")

Custom Metadata Processing

# Process custom metadata for S3 objects
response = s3_client.get_object(Bucket='mybucket', Key='file-with-metadata.txt')

# Custom metadata (user-defined)
metadata = response.get('Metadata', {})
for key, value in metadata.items():
    print(f"Custom metadata {key}: {value}")

# Standard S3 metadata
if 'CacheControl' in response:
    print(f"Cache Control: {response['CacheControl']}")
if 'ContentDisposition' in response:
    print(f"Content Disposition: {response['ContentDisposition']}")
if 'ContentEncoding' in response:
    print(f"Content Encoding: {response['ContentEncoding']}")

Error Handling for Streaming Operations

Complete Error Handling Pattern

from botocore.exceptions import (
    ClientError,
    ReadTimeoutError,
    ResponseStreamingError,
    IncompleteReadError,
    NoCredentialsError
)

def safe_stream_processing(bucket, key):
    """Safely process streaming response with comprehensive error handling."""
    
    try:
        # Create client
        s3_client = session.create_client('s3', region_name='us-east-1')
        
        # Get object
        response = s3_client.get_object(Bucket=bucket, Key=key)
        streaming_body = response['Body']
        
        try:
            # Set socket timeout for read operations
            streaming_body.set_socket_timeout(30.0)
            
            # Process data
            processed_bytes = 0
            for chunk in streaming_body.iter_chunks(chunk_size=8192):
                # Process chunk
                processed_bytes += len(chunk)
                
            return processed_bytes
            
        except ReadTimeoutError as e:
            print(f"Read timeout error: {e}")
            return None
            
        except ResponseStreamingError as e:
            print(f"Streaming protocol error: {e}")
            return None
            
        except IncompleteReadError as e:
            print(f"Incomplete read - expected {e.expected_bytes}, got {e.actual_bytes}")
            return None
            
        finally:
            # Always close the stream
            streaming_body.close()
            
    except ClientError as e:
        error_code = e.response['Error']['Code']
        if error_code == 'NoSuchBucket':
            print(f"Bucket {bucket} does not exist")
        elif error_code == 'NoSuchKey':
            print(f"Key {key} does not exist in bucket {bucket}")
        elif error_code == 'AccessDenied':
            print(f"Access denied to {bucket}/{key}")
        else:
            print(f"Client error: {error_code} - {e.response['Error']['Message']}")
        return None
        
    except NoCredentialsError:
        print("AWS credentials not found or invalid")
        return None
        
    except Exception as e:
        print(f"Unexpected error: {e}")
        return None

# Usage
result = safe_stream_processing('mybucket', 'large-file.txt')
if result is not None:
    print(f"Successfully processed {result} bytes")

Timeout Configuration

# Configure timeouts for streaming operations
from botocore.config import Config

# Create config with custom timeouts
config = Config(
    connect_timeout=30,    # Connection timeout
    read_timeout=60,       # Read timeout
    retries={
        'max_attempts': 3,
        'mode': 'adaptive'
    }
)

# Create client with timeout configuration
s3_client = session.create_client('s3', region_name='us-east-1', config=config)

# Use client for streaming operations
response = s3_client.get_object(Bucket='mybucket', Key='large-file.bin')
streaming_body = response['Body']

try:
    # Additional socket-level timeout
    streaming_body.set_socket_timeout(45.0)
    
    # Process with configured timeouts
    data = streaming_body.read()
    
finally:
    streaming_body.close()

Best Practices for Memory-Efficient Streaming

Optimal Chunk Sizes

def process_large_file_optimally(bucket, key):
    """Process large files with memory-efficient streaming."""
    
    response = s3_client.get_object(Bucket=bucket, Key=key)
    streaming_body = response['Body']
    
    # Determine optimal chunk size based on content length
    content_length = int(response.get('ContentLength', 0))
    
    if content_length < 1024 * 1024:  # < 1MB
        chunk_size = 8192  # 8KB chunks
    elif content_length < 100 * 1024 * 1024:  # < 100MB
        chunk_size = 64 * 1024  # 64KB chunks
    else:  # > 100MB
        chunk_size = 1024 * 1024  # 1MB chunks
    
    try:
        processed_chunks = 0
        for chunk in streaming_body.iter_chunks(chunk_size=chunk_size):
            # Process chunk without storing entire file in memory
            process_chunk(chunk)
            processed_chunks += 1
            
            # Optional: Progress reporting
            if processed_chunks % 100 == 0:
                print(f"Processed {processed_chunks} chunks")
                
    finally:
        streaming_body.close()

def process_chunk(chunk):
    """Process individual data chunk."""
    # Implement chunk processing logic
    pass

Stream Processing Pipeline

def streaming_pipeline(bucket, key, processors):
    """Create a processing pipeline for streaming data."""
    
    response = s3_client.get_object(Bucket=bucket, Key=key)
    streaming_body = response['Body']
    
    try:
        for line in streaming_body.iter_lines(chunk_size=16384):
            data = line
            
            # Apply processing pipeline
            for processor in processors:
                data = processor(data)
                if data is None:  # Processor filtered out data
                    break
            
            if data is not None:
                yield data
                
    finally:
        streaming_body.close()

# Example processors
def decode_processor(data):
    """Decode bytes to string."""
    try:
        return data.decode('utf-8')
    except UnicodeDecodeError:
        return None  # Skip invalid lines

def json_processor(data):
    """Parse JSON from string."""
    try:
        return json.loads(data)
    except json.JSONDecodeError:
        return None  # Skip invalid JSON

def filter_processor(data):
    """Filter data based on criteria."""
    if isinstance(data, dict) and data.get('status') == 'active':
        return data
    return None

# Usage
processors = [decode_processor, json_processor, filter_processor]
for processed_item in streaming_pipeline('mybucket', 'data.jsonl', processors):
    print(f"Processed item: {processed_item}")

Resource Cleanup Patterns

class StreamProcessor:
    """Context manager for safe stream processing."""
    
    def __init__(self, client, bucket, key):
        self.client = client
        self.bucket = bucket
        self.key = key
        self.streaming_body = None
        
    def __enter__(self):
        response = self.client.get_object(Bucket=self.bucket, Key=self.key)
        self.streaming_body = response['Body']
        return self.streaming_body
        
    def __exit__(self, exc_type, exc_val, exc_tb):
        if self.streaming_body:
            self.streaming_body.close()
        # Return False to propagate exceptions

# Usage with automatic cleanup
with StreamProcessor(s3_client, 'mybucket', 'large-file.csv') as stream:
    for line in stream.iter_lines():
        # Process line
        decoded_line = line.decode('utf-8')
        # Stream is automatically closed when exiting context

Integration Examples

S3 Select Integration

# Use S3 Select with streaming responses
def query_s3_with_select(bucket, key, query):
    """Query S3 object content using S3 Select."""
    
    response = s3_client.select_object_content(
        Bucket=bucket,
        Key=key,
        Expression=query,
        ExpressionType='SQL',
        InputSerialization={
            'CSV': {'FileHeaderInfo': 'USE'},
            'CompressionType': 'NONE'
        },
        OutputSerialization={'CSV': {}}
    )
    
    # Process streaming results
    for event in response['Payload']:
        if 'Records' in event:
            # Records event contains streaming data
            records_data = event['Records']['Payload']
            if hasattr(records_data, 'read'):
                # Handle as streaming body
                try:
                    chunk = records_data.read()
                    yield chunk.decode('utf-8')
                finally:
                    records_data.close()
            else:
                yield records_data.decode('utf-8')

# Usage
query = "SELECT * FROM s3object[*] WHERE age > 25"
for result_chunk in query_s3_with_select('mybucket', 'data.csv', query):
    print(f"Query result: {result_chunk}")

CloudWatch Logs Streaming

# Stream CloudWatch Logs
logs_client = session.create_client('logs', region_name='us-east-1')

def stream_log_events(log_group, log_stream):
    """Stream log events from CloudWatch Logs."""
    
    response = logs_client.get_log_events(
        logGroupName=log_group,
        logStreamName=log_stream,
        startFromHead=True
    )
    
    for event in response['events']:
        # Process log event
        timestamp = event['timestamp']
        message = event['message']
        
        # Convert timestamp to readable format
        readable_time = datetime.fromtimestamp(timestamp / 1000)
        print(f"[{readable_time}] {message}")

# Usage
stream_log_events('/aws/lambda/my-function', '2024/01/01/stream-name')

This comprehensive response handling documentation provides developers with the knowledge and examples needed to effectively work with streaming responses in botocore, covering everything from basic usage to advanced streaming patterns and error handling strategies.

Install with Tessl CLI

npx tessl i tessl/pypi-botocore

docs

client.md

config.md

credentials.md

events.md

exceptions.md

index.md

models.md

pagination.md

response.md

session.md

testing.md

waiters.md

tile.json