0
# Response Handling
1
2
Streaming response handling for large payloads with support for chunked reading, line iteration, and automatic resource cleanup. Botocore's response handling system efficiently manages HTTP response bodies, especially for large or streaming content from AWS services.
3
4
## Capabilities
5
6
### StreamingBody Class
7
8
Wrapper class for HTTP response bodies that provides convenient methods for handling streaming data with automatic validation and resource management.
9
10
```python { .api }
11
class StreamingBody:
12
def __init__(
13
self,
14
raw_stream,
15
content_length: int
16
):
17
"""
18
Initialize streaming body wrapper.
19
20
Args:
21
raw_stream: Underlying HTTP response stream
22
content_length: Expected content length in bytes
23
"""
24
25
def read(self, amt: int = None) -> bytes:
26
"""
27
Read at most amt bytes from the stream.
28
29
Args:
30
amt: Maximum number of bytes to read. If None, read all data.
31
32
Returns:
33
bytes: Data read from stream
34
35
Raises:
36
ReadTimeoutError: If read operation times out
37
ResponseStreamingError: If streaming protocol error occurs
38
IncompleteReadError: If content length validation fails
39
"""
40
41
def readinto(self, b) -> int:
42
"""
43
Read bytes into a pre-allocated, writable bytes-like object.
44
45
Args:
46
b: Pre-allocated buffer to read data into
47
48
Returns:
49
int: Number of bytes read
50
51
Raises:
52
ReadTimeoutError: If read operation times out
53
ResponseStreamingError: If streaming protocol error occurs
54
IncompleteReadError: If content length validation fails
55
"""
56
57
def iter_lines(
58
self,
59
chunk_size: int = 1024,
60
keepends: bool = False
61
) -> Iterator[bytes]:
62
"""
63
Return an iterator to yield lines from the raw stream.
64
65
Args:
66
chunk_size: Size of chunks to read while iterating
67
keepends: Whether to preserve line ending characters
68
69
Yields:
70
bytes: Individual lines from the stream
71
"""
72
73
def iter_chunks(self, chunk_size: int = 1024) -> Iterator[bytes]:
74
"""
75
Return an iterator to yield chunks from the raw stream.
76
77
Args:
78
chunk_size: Size of each chunk in bytes
79
80
Yields:
81
bytes: Data chunks from the stream
82
"""
83
84
def close(self) -> None:
85
"""Close the underlying HTTP response stream."""
86
87
def readable(self) -> bool:
88
"""Check if the stream is readable."""
89
90
def tell(self) -> int:
91
"""Return current position in the stream."""
92
93
def set_socket_timeout(self, timeout: float) -> None:
94
"""
95
Set timeout on the underlying socket.
96
97
Args:
98
timeout: Timeout value in seconds
99
"""
100
```
101
102
### Response Processing
103
104
Function for processing HTTP responses into parsed data structures based on service models.
105
106
```python { .api }
107
def get_response(
108
operation_model: OperationModel,
109
http_response
110
) -> Tuple[HTTPResponse, dict]:
111
"""
112
Process HTTP response into parsed response data.
113
114
Args:
115
operation_model: Service operation model
116
http_response: Raw HTTP response object
117
118
Returns:
119
tuple: (http_response, parsed_response_data)
120
"""
121
```
122
123
## Usage Examples
124
125
### Basic Streaming Response Handling
126
127
```python
128
from botocore.session import get_session
129
130
# Create session and S3 client
131
session = get_session()
132
s3_client = session.create_client('s3', region_name='us-east-1')
133
134
# Get streaming object
135
response = s3_client.get_object(Bucket='mybucket', Key='large-file.txt')
136
streaming_body = response['Body']
137
138
# Read entire content
139
content = streaming_body.read()
140
print(f"Read {len(content)} bytes")
141
142
# Always close when done
143
streaming_body.close()
144
```
145
146
### Chunked Reading for Large Files
147
148
```python
149
# Read file in chunks to manage memory usage
150
response = s3_client.get_object(Bucket='mybucket', Key='large-dataset.csv')
151
streaming_body = response['Body']
152
153
try:
154
total_size = 0
155
for chunk in streaming_body.iter_chunks(chunk_size=8192):
156
# Process chunk (e.g., write to file, analyze data)
157
total_size += len(chunk)
158
print(f"Processed chunk of {len(chunk)} bytes")
159
160
print(f"Total size: {total_size} bytes")
161
finally:
162
streaming_body.close()
163
```
164
165
### Line-by-Line Processing
166
167
```python
168
# Process text files line by line
169
response = s3_client.get_object(Bucket='mybucket', Key='log-file.txt')
170
streaming_body = response['Body']
171
172
try:
173
line_count = 0
174
for line in streaming_body.iter_lines(chunk_size=4096):
175
# Process each line
176
decoded_line = line.decode('utf-8')
177
if 'ERROR' in decoded_line:
178
print(f"Error found: {decoded_line.strip()}")
179
line_count += 1
180
181
print(f"Processed {line_count} lines")
182
finally:
183
streaming_body.close()
184
```
185
186
### Context Manager Usage
187
188
```python
189
# Use context manager for automatic cleanup
190
response = s3_client.get_object(Bucket='mybucket', Key='data.json')
191
192
with response['Body'] as streaming_body:
193
# Stream will be automatically closed when exiting context
194
data = streaming_body.read()
195
parsed_data = json.loads(data.decode('utf-8'))
196
```
197
198
### Streaming with Custom Buffer
199
200
```python
201
# Read into pre-allocated buffer for memory efficiency
202
response = s3_client.get_object(Bucket='mybucket', Key='binary-data.bin')
203
streaming_body = response['Body']
204
205
try:
206
buffer = bytearray(8192) # 8KB buffer
207
total_read = 0
208
209
while True:
210
bytes_read = streaming_body.readinto(buffer)
211
if bytes_read == 0:
212
break
213
214
# Process buffer contents
215
total_read += bytes_read
216
print(f"Read {bytes_read} bytes, total: {total_read}")
217
218
# Process the data in buffer[:bytes_read]
219
finally:
220
streaming_body.close()
221
```
222
223
### Lambda Function Response Streaming
224
225
```python
226
# Handle streaming responses from Lambda invocations
227
lambda_client = session.create_client('lambda', region_name='us-east-1')
228
229
response = lambda_client.invoke(
230
FunctionName='my-function',
231
InvocationType='RequestResponse',
232
Payload=json.dumps({'key': 'value'})
233
)
234
235
if 'Payload' in response:
236
streaming_body = response['Payload']
237
238
try:
239
# Read Lambda response payload
240
payload_data = streaming_body.read()
241
result = json.loads(payload_data.decode('utf-8'))
242
print(f"Lambda result: {result}")
243
finally:
244
streaming_body.close()
245
```
246
247
### Kinesis Data Streams
248
249
```python
250
# Process Kinesis data streams
251
kinesis_client = session.create_client('kinesis', region_name='us-east-1')
252
253
response = kinesis_client.get_records(ShardIterator='shard-iterator-value')
254
255
for record in response['Records']:
256
# Kinesis record data is typically base64 encoded
257
data = record['Data']
258
259
# If data is a StreamingBody (for large records)
260
if hasattr(data, 'read'):
261
try:
262
content = data.read()
263
decoded_data = base64.b64decode(content)
264
print(f"Record data: {decoded_data}")
265
finally:
266
data.close()
267
else:
268
# Direct bytes data
269
decoded_data = base64.b64decode(data)
270
print(f"Record data: {decoded_data}")
271
```
272
273
## Response Metadata Handling
274
275
### Accessing Response Headers and Status
276
277
```python
278
# Get response with metadata
279
response = s3_client.get_object(Bucket='mybucket', Key='file.txt')
280
281
# Access response metadata
282
metadata = response['ResponseMetadata']
283
print(f"HTTP Status: {metadata['HTTPStatusCode']}")
284
print(f"Request ID: {metadata['RequestId']}")
285
286
# Access HTTP headers
287
http_headers = metadata['HTTPHeaders']
288
print(f"Content-Type: {http_headers.get('content-type')}")
289
print(f"Content-Length: {http_headers.get('content-length')}")
290
print(f"ETag: {http_headers.get('etag')}")
291
292
# Object-specific metadata
293
print(f"Last Modified: {response.get('LastModified')}")
294
print(f"ETag: {response.get('ETag')}")
295
print(f"Content Type: {response.get('ContentType')}")
296
```
297
298
### Custom Metadata Processing
299
300
```python
301
# Process custom metadata for S3 objects
302
response = s3_client.get_object(Bucket='mybucket', Key='file-with-metadata.txt')
303
304
# Custom metadata (user-defined)
305
metadata = response.get('Metadata', {})
306
for key, value in metadata.items():
307
print(f"Custom metadata {key}: {value}")
308
309
# Standard S3 metadata
310
if 'CacheControl' in response:
311
print(f"Cache Control: {response['CacheControl']}")
312
if 'ContentDisposition' in response:
313
print(f"Content Disposition: {response['ContentDisposition']}")
314
if 'ContentEncoding' in response:
315
print(f"Content Encoding: {response['ContentEncoding']}")
316
```
317
318
## Error Handling for Streaming Operations
319
320
### Complete Error Handling Pattern
321
322
```python
323
from botocore.exceptions import (
324
ClientError,
325
ReadTimeoutError,
326
ResponseStreamingError,
327
IncompleteReadError,
328
NoCredentialsError
329
)
330
331
def safe_stream_processing(bucket, key):
332
"""Safely process streaming response with comprehensive error handling."""
333
334
try:
335
# Create client
336
s3_client = session.create_client('s3', region_name='us-east-1')
337
338
# Get object
339
response = s3_client.get_object(Bucket=bucket, Key=key)
340
streaming_body = response['Body']
341
342
try:
343
# Set socket timeout for read operations
344
streaming_body.set_socket_timeout(30.0)
345
346
# Process data
347
processed_bytes = 0
348
for chunk in streaming_body.iter_chunks(chunk_size=8192):
349
# Process chunk
350
processed_bytes += len(chunk)
351
352
return processed_bytes
353
354
except ReadTimeoutError as e:
355
print(f"Read timeout error: {e}")
356
return None
357
358
except ResponseStreamingError as e:
359
print(f"Streaming protocol error: {e}")
360
return None
361
362
except IncompleteReadError as e:
363
print(f"Incomplete read - expected {e.expected_bytes}, got {e.actual_bytes}")
364
return None
365
366
finally:
367
# Always close the stream
368
streaming_body.close()
369
370
except ClientError as e:
371
error_code = e.response['Error']['Code']
372
if error_code == 'NoSuchBucket':
373
print(f"Bucket {bucket} does not exist")
374
elif error_code == 'NoSuchKey':
375
print(f"Key {key} does not exist in bucket {bucket}")
376
elif error_code == 'AccessDenied':
377
print(f"Access denied to {bucket}/{key}")
378
else:
379
print(f"Client error: {error_code} - {e.response['Error']['Message']}")
380
return None
381
382
except NoCredentialsError:
383
print("AWS credentials not found or invalid")
384
return None
385
386
except Exception as e:
387
print(f"Unexpected error: {e}")
388
return None
389
390
# Usage
391
result = safe_stream_processing('mybucket', 'large-file.txt')
392
if result is not None:
393
print(f"Successfully processed {result} bytes")
394
```
395
396
### Timeout Configuration
397
398
```python
399
# Configure timeouts for streaming operations
400
from botocore.config import Config
401
402
# Create config with custom timeouts
403
config = Config(
404
connect_timeout=30, # Connection timeout
405
read_timeout=60, # Read timeout
406
retries={
407
'max_attempts': 3,
408
'mode': 'adaptive'
409
}
410
)
411
412
# Create client with timeout configuration
413
s3_client = session.create_client('s3', region_name='us-east-1', config=config)
414
415
# Use client for streaming operations
416
response = s3_client.get_object(Bucket='mybucket', Key='large-file.bin')
417
streaming_body = response['Body']
418
419
try:
420
# Additional socket-level timeout
421
streaming_body.set_socket_timeout(45.0)
422
423
# Process with configured timeouts
424
data = streaming_body.read()
425
426
finally:
427
streaming_body.close()
428
```
429
430
## Best Practices for Memory-Efficient Streaming
431
432
### Optimal Chunk Sizes
433
434
```python
435
def process_large_file_optimally(bucket, key):
436
"""Process large files with memory-efficient streaming."""
437
438
response = s3_client.get_object(Bucket=bucket, Key=key)
439
streaming_body = response['Body']
440
441
# Determine optimal chunk size based on content length
442
content_length = int(response.get('ContentLength', 0))
443
444
if content_length < 1024 * 1024: # < 1MB
445
chunk_size = 8192 # 8KB chunks
446
elif content_length < 100 * 1024 * 1024: # < 100MB
447
chunk_size = 64 * 1024 # 64KB chunks
448
else: # > 100MB
449
chunk_size = 1024 * 1024 # 1MB chunks
450
451
try:
452
processed_chunks = 0
453
for chunk in streaming_body.iter_chunks(chunk_size=chunk_size):
454
# Process chunk without storing entire file in memory
455
process_chunk(chunk)
456
processed_chunks += 1
457
458
# Optional: Progress reporting
459
if processed_chunks % 100 == 0:
460
print(f"Processed {processed_chunks} chunks")
461
462
finally:
463
streaming_body.close()
464
465
def process_chunk(chunk):
466
"""Process individual data chunk."""
467
# Implement chunk processing logic
468
pass
469
```
470
471
### Stream Processing Pipeline
472
473
```python
474
def streaming_pipeline(bucket, key, processors):
475
"""Create a processing pipeline for streaming data."""
476
477
response = s3_client.get_object(Bucket=bucket, Key=key)
478
streaming_body = response['Body']
479
480
try:
481
for line in streaming_body.iter_lines(chunk_size=16384):
482
data = line
483
484
# Apply processing pipeline
485
for processor in processors:
486
data = processor(data)
487
if data is None: # Processor filtered out data
488
break
489
490
if data is not None:
491
yield data
492
493
finally:
494
streaming_body.close()
495
496
# Example processors
497
def decode_processor(data):
498
"""Decode bytes to string."""
499
try:
500
return data.decode('utf-8')
501
except UnicodeDecodeError:
502
return None # Skip invalid lines
503
504
def json_processor(data):
505
"""Parse JSON from string."""
506
try:
507
return json.loads(data)
508
except json.JSONDecodeError:
509
return None # Skip invalid JSON
510
511
def filter_processor(data):
512
"""Filter data based on criteria."""
513
if isinstance(data, dict) and data.get('status') == 'active':
514
return data
515
return None
516
517
# Usage
518
processors = [decode_processor, json_processor, filter_processor]
519
for processed_item in streaming_pipeline('mybucket', 'data.jsonl', processors):
520
print(f"Processed item: {processed_item}")
521
```
522
523
### Resource Cleanup Patterns
524
525
```python
526
class StreamProcessor:
527
"""Context manager for safe stream processing."""
528
529
def __init__(self, client, bucket, key):
530
self.client = client
531
self.bucket = bucket
532
self.key = key
533
self.streaming_body = None
534
535
def __enter__(self):
536
response = self.client.get_object(Bucket=self.bucket, Key=self.key)
537
self.streaming_body = response['Body']
538
return self.streaming_body
539
540
def __exit__(self, exc_type, exc_val, exc_tb):
541
if self.streaming_body:
542
self.streaming_body.close()
543
# Return False to propagate exceptions
544
545
# Usage with automatic cleanup
546
with StreamProcessor(s3_client, 'mybucket', 'large-file.csv') as stream:
547
for line in stream.iter_lines():
548
# Process line
549
decoded_line = line.decode('utf-8')
550
# Stream is automatically closed when exiting context
551
```
552
553
## Integration Examples
554
555
### S3 Select Integration
556
557
```python
558
# Use S3 Select with streaming responses
559
def query_s3_with_select(bucket, key, query):
560
"""Query S3 object content using S3 Select."""
561
562
response = s3_client.select_object_content(
563
Bucket=bucket,
564
Key=key,
565
Expression=query,
566
ExpressionType='SQL',
567
InputSerialization={
568
'CSV': {'FileHeaderInfo': 'USE'},
569
'CompressionType': 'NONE'
570
},
571
OutputSerialization={'CSV': {}}
572
)
573
574
# Process streaming results
575
for event in response['Payload']:
576
if 'Records' in event:
577
# Records event contains streaming data
578
records_data = event['Records']['Payload']
579
if hasattr(records_data, 'read'):
580
# Handle as streaming body
581
try:
582
chunk = records_data.read()
583
yield chunk.decode('utf-8')
584
finally:
585
records_data.close()
586
else:
587
yield records_data.decode('utf-8')
588
589
# Usage
590
query = "SELECT * FROM s3object[*] WHERE age > 25"
591
for result_chunk in query_s3_with_select('mybucket', 'data.csv', query):
592
print(f"Query result: {result_chunk}")
593
```
594
595
### CloudWatch Logs Streaming
596
597
```python
598
# Stream CloudWatch Logs
599
logs_client = session.create_client('logs', region_name='us-east-1')
600
601
def stream_log_events(log_group, log_stream):
602
"""Stream log events from CloudWatch Logs."""
603
604
response = logs_client.get_log_events(
605
logGroupName=log_group,
606
logStreamName=log_stream,
607
startFromHead=True
608
)
609
610
for event in response['events']:
611
# Process log event
612
timestamp = event['timestamp']
613
message = event['message']
614
615
# Convert timestamp to readable format
616
readable_time = datetime.fromtimestamp(timestamp / 1000)
617
print(f"[{readable_time}] {message}")
618
619
# Usage
620
stream_log_events('/aws/lambda/my-function', '2024/01/01/stream-name')
621
```
622
623
This comprehensive response handling documentation provides developers with the knowledge and examples needed to effectively work with streaming responses in botocore, covering everything from basic usage to advanced streaming patterns and error handling strategies.