or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client.mdconfig.mdcredentials.mdevents.mdexceptions.mdindex.mdmodels.mdpagination.mdresponse.mdsession.mdtesting.mdwaiters.md

response.mddocs/

0

# Response Handling

1

2

Streaming response handling for large payloads with support for chunked reading, line iteration, and automatic resource cleanup. Botocore's response handling system efficiently manages HTTP response bodies, especially for large or streaming content from AWS services.

3

4

## Capabilities

5

6

### StreamingBody Class

7

8

Wrapper class for HTTP response bodies that provides convenient methods for handling streaming data with automatic validation and resource management.

9

10

```python { .api }

11

class StreamingBody:

12

def __init__(

13

self,

14

raw_stream,

15

content_length: int

16

):

17

"""

18

Initialize streaming body wrapper.

19

20

Args:

21

raw_stream: Underlying HTTP response stream

22

content_length: Expected content length in bytes

23

"""

24

25

def read(self, amt: int = None) -> bytes:

26

"""

27

Read at most amt bytes from the stream.

28

29

Args:

30

amt: Maximum number of bytes to read. If None, read all data.

31

32

Returns:

33

bytes: Data read from stream

34

35

Raises:

36

ReadTimeoutError: If read operation times out

37

ResponseStreamingError: If streaming protocol error occurs

38

IncompleteReadError: If content length validation fails

39

"""

40

41

def readinto(self, b) -> int:

42

"""

43

Read bytes into a pre-allocated, writable bytes-like object.

44

45

Args:

46

b: Pre-allocated buffer to read data into

47

48

Returns:

49

int: Number of bytes read

50

51

Raises:

52

ReadTimeoutError: If read operation times out

53

ResponseStreamingError: If streaming protocol error occurs

54

IncompleteReadError: If content length validation fails

55

"""

56

57

def iter_lines(

58

self,

59

chunk_size: int = 1024,

60

keepends: bool = False

61

) -> Iterator[bytes]:

62

"""

63

Return an iterator to yield lines from the raw stream.

64

65

Args:

66

chunk_size: Size of chunks to read while iterating

67

keepends: Whether to preserve line ending characters

68

69

Yields:

70

bytes: Individual lines from the stream

71

"""

72

73

def iter_chunks(self, chunk_size: int = 1024) -> Iterator[bytes]:

74

"""

75

Return an iterator to yield chunks from the raw stream.

76

77

Args:

78

chunk_size: Size of each chunk in bytes

79

80

Yields:

81

bytes: Data chunks from the stream

82

"""

83

84

def close(self) -> None:

85

"""Close the underlying HTTP response stream."""

86

87

def readable(self) -> bool:

88

"""Check if the stream is readable."""

89

90

def tell(self) -> int:

91

"""Return current position in the stream."""

92

93

def set_socket_timeout(self, timeout: float) -> None:

94

"""

95

Set timeout on the underlying socket.

96

97

Args:

98

timeout: Timeout value in seconds

99

"""

100

```

101

102

### Response Processing

103

104

Function for processing HTTP responses into parsed data structures based on service models.

105

106

```python { .api }

107

def get_response(

108

operation_model: OperationModel,

109

http_response

110

) -> Tuple[HTTPResponse, dict]:

111

"""

112

Process HTTP response into parsed response data.

113

114

Args:

115

operation_model: Service operation model

116

http_response: Raw HTTP response object

117

118

Returns:

119

tuple: (http_response, parsed_response_data)

120

"""

121

```

122

123

## Usage Examples

124

125

### Basic Streaming Response Handling

126

127

```python

128

from botocore.session import get_session

129

130

# Create session and S3 client

131

session = get_session()

132

s3_client = session.create_client('s3', region_name='us-east-1')

133

134

# Get streaming object

135

response = s3_client.get_object(Bucket='mybucket', Key='large-file.txt')

136

streaming_body = response['Body']

137

138

# Read entire content

139

content = streaming_body.read()

140

print(f"Read {len(content)} bytes")

141

142

# Always close when done

143

streaming_body.close()

144

```

145

146

### Chunked Reading for Large Files

147

148

```python

149

# Read file in chunks to manage memory usage

150

response = s3_client.get_object(Bucket='mybucket', Key='large-dataset.csv')

151

streaming_body = response['Body']

152

153

try:

154

total_size = 0

155

for chunk in streaming_body.iter_chunks(chunk_size=8192):

156

# Process chunk (e.g., write to file, analyze data)

157

total_size += len(chunk)

158

print(f"Processed chunk of {len(chunk)} bytes")

159

160

print(f"Total size: {total_size} bytes")

161

finally:

162

streaming_body.close()

163

```

164

165

### Line-by-Line Processing

166

167

```python

168

# Process text files line by line

169

response = s3_client.get_object(Bucket='mybucket', Key='log-file.txt')

170

streaming_body = response['Body']

171

172

try:

173

line_count = 0

174

for line in streaming_body.iter_lines(chunk_size=4096):

175

# Process each line

176

decoded_line = line.decode('utf-8')

177

if 'ERROR' in decoded_line:

178

print(f"Error found: {decoded_line.strip()}")

179

line_count += 1

180

181

print(f"Processed {line_count} lines")

182

finally:

183

streaming_body.close()

184

```

185

186

### Context Manager Usage

187

188

```python

189

# Use context manager for automatic cleanup

190

response = s3_client.get_object(Bucket='mybucket', Key='data.json')

191

192

with response['Body'] as streaming_body:

193

# Stream will be automatically closed when exiting context

194

data = streaming_body.read()

195

parsed_data = json.loads(data.decode('utf-8'))

196

```

197

198

### Streaming with Custom Buffer

199

200

```python

201

# Read into pre-allocated buffer for memory efficiency

202

response = s3_client.get_object(Bucket='mybucket', Key='binary-data.bin')

203

streaming_body = response['Body']

204

205

try:

206

buffer = bytearray(8192) # 8KB buffer

207

total_read = 0

208

209

while True:

210

bytes_read = streaming_body.readinto(buffer)

211

if bytes_read == 0:

212

break

213

214

# Process buffer contents

215

total_read += bytes_read

216

print(f"Read {bytes_read} bytes, total: {total_read}")

217

218

# Process the data in buffer[:bytes_read]

219

finally:

220

streaming_body.close()

221

```

222

223

### Lambda Function Response Streaming

224

225

```python

226

# Handle streaming responses from Lambda invocations

227

lambda_client = session.create_client('lambda', region_name='us-east-1')

228

229

response = lambda_client.invoke(

230

FunctionName='my-function',

231

InvocationType='RequestResponse',

232

Payload=json.dumps({'key': 'value'})

233

)

234

235

if 'Payload' in response:

236

streaming_body = response['Payload']

237

238

try:

239

# Read Lambda response payload

240

payload_data = streaming_body.read()

241

result = json.loads(payload_data.decode('utf-8'))

242

print(f"Lambda result: {result}")

243

finally:

244

streaming_body.close()

245

```

246

247

### Kinesis Data Streams

248

249

```python

250

# Process Kinesis data streams

251

kinesis_client = session.create_client('kinesis', region_name='us-east-1')

252

253

response = kinesis_client.get_records(ShardIterator='shard-iterator-value')

254

255

for record in response['Records']:

256

# Kinesis record data is typically base64 encoded

257

data = record['Data']

258

259

# If data is a StreamingBody (for large records)

260

if hasattr(data, 'read'):

261

try:

262

content = data.read()

263

decoded_data = base64.b64decode(content)

264

print(f"Record data: {decoded_data}")

265

finally:

266

data.close()

267

else:

268

# Direct bytes data

269

decoded_data = base64.b64decode(data)

270

print(f"Record data: {decoded_data}")

271

```

272

273

## Response Metadata Handling

274

275

### Accessing Response Headers and Status

276

277

```python

278

# Get response with metadata

279

response = s3_client.get_object(Bucket='mybucket', Key='file.txt')

280

281

# Access response metadata

282

metadata = response['ResponseMetadata']

283

print(f"HTTP Status: {metadata['HTTPStatusCode']}")

284

print(f"Request ID: {metadata['RequestId']}")

285

286

# Access HTTP headers

287

http_headers = metadata['HTTPHeaders']

288

print(f"Content-Type: {http_headers.get('content-type')}")

289

print(f"Content-Length: {http_headers.get('content-length')}")

290

print(f"ETag: {http_headers.get('etag')}")

291

292

# Object-specific metadata

293

print(f"Last Modified: {response.get('LastModified')}")

294

print(f"ETag: {response.get('ETag')}")

295

print(f"Content Type: {response.get('ContentType')}")

296

```

297

298

### Custom Metadata Processing

299

300

```python

301

# Process custom metadata for S3 objects

302

response = s3_client.get_object(Bucket='mybucket', Key='file-with-metadata.txt')

303

304

# Custom metadata (user-defined)

305

metadata = response.get('Metadata', {})

306

for key, value in metadata.items():

307

print(f"Custom metadata {key}: {value}")

308

309

# Standard S3 metadata

310

if 'CacheControl' in response:

311

print(f"Cache Control: {response['CacheControl']}")

312

if 'ContentDisposition' in response:

313

print(f"Content Disposition: {response['ContentDisposition']}")

314

if 'ContentEncoding' in response:

315

print(f"Content Encoding: {response['ContentEncoding']}")

316

```

317

318

## Error Handling for Streaming Operations

319

320

### Complete Error Handling Pattern

321

322

```python

323

from botocore.exceptions import (

324

ClientError,

325

ReadTimeoutError,

326

ResponseStreamingError,

327

IncompleteReadError,

328

NoCredentialsError

329

)

330

331

def safe_stream_processing(bucket, key):

332

"""Safely process streaming response with comprehensive error handling."""

333

334

try:

335

# Create client

336

s3_client = session.create_client('s3', region_name='us-east-1')

337

338

# Get object

339

response = s3_client.get_object(Bucket=bucket, Key=key)

340

streaming_body = response['Body']

341

342

try:

343

# Set socket timeout for read operations

344

streaming_body.set_socket_timeout(30.0)

345

346

# Process data

347

processed_bytes = 0

348

for chunk in streaming_body.iter_chunks(chunk_size=8192):

349

# Process chunk

350

processed_bytes += len(chunk)

351

352

return processed_bytes

353

354

except ReadTimeoutError as e:

355

print(f"Read timeout error: {e}")

356

return None

357

358

except ResponseStreamingError as e:

359

print(f"Streaming protocol error: {e}")

360

return None

361

362

except IncompleteReadError as e:

363

print(f"Incomplete read - expected {e.expected_bytes}, got {e.actual_bytes}")

364

return None

365

366

finally:

367

# Always close the stream

368

streaming_body.close()

369

370

except ClientError as e:

371

error_code = e.response['Error']['Code']

372

if error_code == 'NoSuchBucket':

373

print(f"Bucket {bucket} does not exist")

374

elif error_code == 'NoSuchKey':

375

print(f"Key {key} does not exist in bucket {bucket}")

376

elif error_code == 'AccessDenied':

377

print(f"Access denied to {bucket}/{key}")

378

else:

379

print(f"Client error: {error_code} - {e.response['Error']['Message']}")

380

return None

381

382

except NoCredentialsError:

383

print("AWS credentials not found or invalid")

384

return None

385

386

except Exception as e:

387

print(f"Unexpected error: {e}")

388

return None

389

390

# Usage

391

result = safe_stream_processing('mybucket', 'large-file.txt')

392

if result is not None:

393

print(f"Successfully processed {result} bytes")

394

```

395

396

### Timeout Configuration

397

398

```python

399

# Configure timeouts for streaming operations

400

from botocore.config import Config

401

402

# Create config with custom timeouts

403

config = Config(

404

connect_timeout=30, # Connection timeout

405

read_timeout=60, # Read timeout

406

retries={

407

'max_attempts': 3,

408

'mode': 'adaptive'

409

}

410

)

411

412

# Create client with timeout configuration

413

s3_client = session.create_client('s3', region_name='us-east-1', config=config)

414

415

# Use client for streaming operations

416

response = s3_client.get_object(Bucket='mybucket', Key='large-file.bin')

417

streaming_body = response['Body']

418

419

try:

420

# Additional socket-level timeout

421

streaming_body.set_socket_timeout(45.0)

422

423

# Process with configured timeouts

424

data = streaming_body.read()

425

426

finally:

427

streaming_body.close()

428

```

429

430

## Best Practices for Memory-Efficient Streaming

431

432

### Optimal Chunk Sizes

433

434

```python

435

def process_large_file_optimally(bucket, key):

436

"""Process large files with memory-efficient streaming."""

437

438

response = s3_client.get_object(Bucket=bucket, Key=key)

439

streaming_body = response['Body']

440

441

# Determine optimal chunk size based on content length

442

content_length = int(response.get('ContentLength', 0))

443

444

if content_length < 1024 * 1024: # < 1MB

445

chunk_size = 8192 # 8KB chunks

446

elif content_length < 100 * 1024 * 1024: # < 100MB

447

chunk_size = 64 * 1024 # 64KB chunks

448

else: # > 100MB

449

chunk_size = 1024 * 1024 # 1MB chunks

450

451

try:

452

processed_chunks = 0

453

for chunk in streaming_body.iter_chunks(chunk_size=chunk_size):

454

# Process chunk without storing entire file in memory

455

process_chunk(chunk)

456

processed_chunks += 1

457

458

# Optional: Progress reporting

459

if processed_chunks % 100 == 0:

460

print(f"Processed {processed_chunks} chunks")

461

462

finally:

463

streaming_body.close()

464

465

def process_chunk(chunk):

466

"""Process individual data chunk."""

467

# Implement chunk processing logic

468

pass

469

```

470

471

### Stream Processing Pipeline

472

473

```python

474

def streaming_pipeline(bucket, key, processors):

475

"""Create a processing pipeline for streaming data."""

476

477

response = s3_client.get_object(Bucket=bucket, Key=key)

478

streaming_body = response['Body']

479

480

try:

481

for line in streaming_body.iter_lines(chunk_size=16384):

482

data = line

483

484

# Apply processing pipeline

485

for processor in processors:

486

data = processor(data)

487

if data is None: # Processor filtered out data

488

break

489

490

if data is not None:

491

yield data

492

493

finally:

494

streaming_body.close()

495

496

# Example processors

497

def decode_processor(data):

498

"""Decode bytes to string."""

499

try:

500

return data.decode('utf-8')

501

except UnicodeDecodeError:

502

return None # Skip invalid lines

503

504

def json_processor(data):

505

"""Parse JSON from string."""

506

try:

507

return json.loads(data)

508

except json.JSONDecodeError:

509

return None # Skip invalid JSON

510

511

def filter_processor(data):

512

"""Filter data based on criteria."""

513

if isinstance(data, dict) and data.get('status') == 'active':

514

return data

515

return None

516

517

# Usage

518

processors = [decode_processor, json_processor, filter_processor]

519

for processed_item in streaming_pipeline('mybucket', 'data.jsonl', processors):

520

print(f"Processed item: {processed_item}")

521

```

522

523

### Resource Cleanup Patterns

524

525

```python

526

class StreamProcessor:

527

"""Context manager for safe stream processing."""

528

529

def __init__(self, client, bucket, key):

530

self.client = client

531

self.bucket = bucket

532

self.key = key

533

self.streaming_body = None

534

535

def __enter__(self):

536

response = self.client.get_object(Bucket=self.bucket, Key=self.key)

537

self.streaming_body = response['Body']

538

return self.streaming_body

539

540

def __exit__(self, exc_type, exc_val, exc_tb):

541

if self.streaming_body:

542

self.streaming_body.close()

543

# Return False to propagate exceptions

544

545

# Usage with automatic cleanup

546

with StreamProcessor(s3_client, 'mybucket', 'large-file.csv') as stream:

547

for line in stream.iter_lines():

548

# Process line

549

decoded_line = line.decode('utf-8')

550

# Stream is automatically closed when exiting context

551

```

552

553

## Integration Examples

554

555

### S3 Select Integration

556

557

```python

558

# Use S3 Select with streaming responses

559

def query_s3_with_select(bucket, key, query):

560

"""Query S3 object content using S3 Select."""

561

562

response = s3_client.select_object_content(

563

Bucket=bucket,

564

Key=key,

565

Expression=query,

566

ExpressionType='SQL',

567

InputSerialization={

568

'CSV': {'FileHeaderInfo': 'USE'},

569

'CompressionType': 'NONE'

570

},

571

OutputSerialization={'CSV': {}}

572

)

573

574

# Process streaming results

575

for event in response['Payload']:

576

if 'Records' in event:

577

# Records event contains streaming data

578

records_data = event['Records']['Payload']

579

if hasattr(records_data, 'read'):

580

# Handle as streaming body

581

try:

582

chunk = records_data.read()

583

yield chunk.decode('utf-8')

584

finally:

585

records_data.close()

586

else:

587

yield records_data.decode('utf-8')

588

589

# Usage

590

query = "SELECT * FROM s3object[*] WHERE age > 25"

591

for result_chunk in query_s3_with_select('mybucket', 'data.csv', query):

592

print(f"Query result: {result_chunk}")

593

```

594

595

### CloudWatch Logs Streaming

596

597

```python

598

# Stream CloudWatch Logs

599

logs_client = session.create_client('logs', region_name='us-east-1')

600

601

def stream_log_events(log_group, log_stream):

602

"""Stream log events from CloudWatch Logs."""

603

604

response = logs_client.get_log_events(

605

logGroupName=log_group,

606

logStreamName=log_stream,

607

startFromHead=True

608

)

609

610

for event in response['events']:

611

# Process log event

612

timestamp = event['timestamp']

613

message = event['message']

614

615

# Convert timestamp to readable format

616

readable_time = datetime.fromtimestamp(timestamp / 1000)

617

print(f"[{readable_time}] {message}")

618

619

# Usage

620

stream_log_events('/aws/lambda/my-function', '2024/01/01/stream-name')

621

```

622

623

This comprehensive response handling documentation provides developers with the knowledge and examples needed to effectively work with streaming responses in botocore, covering everything from basic usage to advanced streaming patterns and error handling strategies.