0
# Streaming Operations
1
2
The StreamResponse class provides efficient streaming functionality for handling large files without loading entire contents into memory. It serves as a wrapper around HTTP responses that enables chunk-by-chunk processing of download streams, making it ideal for processing large objects or when memory usage is a concern.
3
4
## Capabilities
5
6
### Stream Initialization
7
8
StreamResponse instances are created by the Storage client's `download_stream()` method and should not be instantiated directly.
9
10
```python { .api }
11
def __init__(self, response):
12
"""
13
Initialize a StreamResponse wrapper.
14
15
Parameters:
16
- response (aiohttp.ClientResponse): HTTP response object
17
18
Attributes:
19
- response: Underlying HTTP response object
20
"""
21
```
22
23
**Usage Example:**
24
```python
25
async with Storage() as storage:
26
# Get streaming response (don't instantiate directly)
27
stream = storage.download_stream('my-bucket', 'large-file.dat')
28
29
# StreamResponse is created internally by download_stream()
30
async with stream as stream_reader:
31
# Use stream_reader for processing
32
pass
33
```
34
35
### Content Length Information
36
37
Access to the total content length of the streamed object.
38
39
```python { .api }
40
@property
41
def content_length(self):
42
"""
43
Get the total content length of the stream.
44
45
Returns:
46
int: Content length in bytes, or None if not available
47
"""
48
```
49
50
**Usage Example:**
51
```python
52
async with Storage() as storage:
53
async with storage.download_stream('my-bucket', 'video.mp4') as stream:
54
total_size = stream.content_length
55
if total_size:
56
print(f"Downloading {total_size:,} bytes")
57
58
bytes_read = 0
59
while True:
60
chunk = await stream.read(8192)
61
if not chunk:
62
break
63
bytes_read += len(chunk)
64
65
# Show progress
66
if total_size:
67
progress = (bytes_read / total_size) * 100
68
print(f"Progress: {progress:.1f}% ({bytes_read:,}/{total_size:,} bytes)")
69
```
70
71
### Stream Reading
72
73
Read data from the stream in configurable chunk sizes.
74
75
```python { .api }
76
async def read(self, size=-1):
77
"""
78
Read data from the stream.
79
80
Parameters:
81
- size (int): Number of bytes to read. -1 reads all remaining data
82
83
Returns:
84
bytes: Data chunk read from stream, empty bytes when stream is exhausted
85
"""
86
```
87
88
**Usage Example:**
89
```python
90
async with Storage() as storage:
91
async with storage.download_stream('my-bucket', 'large-dataset.csv') as stream:
92
# Read in 64KB chunks
93
chunk_size = 65536
94
95
while True:
96
chunk = await stream.read(chunk_size)
97
if not chunk: # End of stream
98
break
99
100
# Process chunk (e.g., parse CSV rows)
101
process_data_chunk(chunk)
102
103
# Read all remaining data at once (not recommended for large files)
104
# remaining_data = await stream.read(-1)
105
```
106
107
### Context Manager Support
108
109
StreamResponse supports async context manager protocol for automatic resource cleanup.
110
111
```python { .api }
112
async def __aenter__(self):
113
"""
114
Enter async context manager.
115
116
Returns:
117
StreamResponse: Self for use in context
118
"""
119
120
async def __aexit__(self, *exc_info):
121
"""
122
Exit async context manager, cleaning up resources.
123
124
Parameters:
125
- exc_info: Exception information if context exited due to exception
126
127
Returns:
128
None
129
"""
130
```
131
132
**Usage Example:**
133
```python
134
async with Storage() as storage:
135
# Recommended: automatic cleanup with context manager
136
async with storage.download_stream('my-bucket', 'file.dat') as stream:
137
while True:
138
chunk = await stream.read(8192)
139
if not chunk:
140
break
141
# Process chunk
142
# Stream automatically closed here
143
144
# Manual management (not recommended)
145
stream = storage.download_stream('my-bucket', 'file.dat')
146
try:
147
async with stream as stream_reader:
148
# Use stream_reader
149
pass
150
finally:
151
# Cleanup handled by context manager
152
pass
153
```
154
155
## Common Usage Patterns
156
157
### Large File Download with Progress Tracking
158
159
```python
160
import asyncio
161
from pathlib import Path
162
163
async def download_with_progress(storage, bucket_name, object_name, local_path):
164
"""Download large file with progress tracking."""
165
166
async with storage.download_stream(bucket_name, object_name) as stream:
167
total_size = stream.content_length
168
bytes_downloaded = 0
169
170
with open(local_path, 'wb') as f:
171
while True:
172
chunk = await stream.read(1024 * 1024) # 1MB chunks
173
if not chunk:
174
break
175
176
f.write(chunk)
177
bytes_downloaded += len(chunk)
178
179
if total_size:
180
progress = (bytes_downloaded / total_size) * 100
181
print(f"\rDownloading: {progress:.1f}% complete", end='', flush=True)
182
183
print(f"\nDownload complete: {bytes_downloaded:,} bytes")
184
185
# Usage
186
async with Storage() as storage:
187
await download_with_progress(
188
storage,
189
'my-bucket',
190
'large-video.mp4',
191
'/tmp/downloaded-video.mp4'
192
)
193
```
194
195
### Streaming Data Processing
196
197
```python
198
import json
199
import asyncio
200
201
async def process_streaming_json_lines(storage, bucket_name, object_name):
202
"""Process JSON Lines file without loading it entirely into memory."""
203
204
async with storage.download_stream(bucket_name, object_name) as stream:
205
buffer = b''
206
207
while True:
208
chunk = await stream.read(8192)
209
if not chunk:
210
# Process any remaining data in buffer
211
if buffer.strip():
212
try:
213
record = json.loads(buffer.decode('utf-8'))
214
await process_record(record)
215
except json.JSONDecodeError:
216
print(f"Warning: Could not parse final record: {buffer}")
217
break
218
219
buffer += chunk
220
221
# Process complete lines
222
while b'\n' in buffer:
223
line, buffer = buffer.split(b'\n', 1)
224
if line.strip(): # Skip empty lines
225
try:
226
record = json.loads(line.decode('utf-8'))
227
await process_record(record)
228
except json.JSONDecodeError as e:
229
print(f"Warning: Could not parse line: {line} ({e})")
230
231
async def process_record(record):
232
"""Process individual JSON record."""
233
# Your processing logic here
234
print(f"Processing: {record.get('id', 'unknown')}")
235
await asyncio.sleep(0.01) # Simulate processing time
236
237
# Usage
238
async with Storage() as storage:
239
await process_streaming_json_lines(storage, 'data-bucket', 'large-dataset.jsonl')
240
```
241
242
### Concurrent Stream Processing
243
244
```python
245
import asyncio
246
from typing import List
247
248
async def parallel_stream_download(storage, bucket_name, object_names: List[str]):
249
"""Download multiple large files concurrently using streams."""
250
251
async def download_single(object_name):
252
local_path = f"/tmp/{object_name.replace('/', '_')}"
253
254
async with storage.download_stream(bucket_name, object_name) as stream:
255
with open(local_path, 'wb') as f:
256
bytes_written = 0
257
258
while True:
259
chunk = await stream.read(64 * 1024) # 64KB chunks
260
if not chunk:
261
break
262
f.write(chunk)
263
bytes_written += len(chunk)
264
265
print(f"Downloaded {object_name}: {bytes_written:,} bytes")
266
return local_path
267
268
# Download all files concurrently
269
tasks = [download_single(name) for name in object_names]
270
downloaded_paths = await asyncio.gather(*tasks)
271
272
return downloaded_paths
273
274
# Usage
275
async with Storage() as storage:
276
files_to_download = [
277
'datasets/2023/data-01.csv',
278
'datasets/2023/data-02.csv',
279
'datasets/2023/data-03.csv'
280
]
281
282
paths = await parallel_stream_download(storage, 'my-bucket', files_to_download)
283
print(f"All files downloaded to: {paths}")
284
```
285
286
### Memory-Efficient Data Transformation
287
288
```python
289
import gzip
290
import asyncio
291
292
async def transform_and_reupload(storage, source_bucket, source_object, dest_bucket, dest_object):
293
"""Transform data while streaming from source to destination."""
294
295
# Download stream from source
296
async with storage.download_stream(source_bucket, source_object) as source_stream:
297
298
# Collect transformed data in chunks
299
transformed_chunks = []
300
301
while True:
302
chunk = await source_stream.read(32 * 1024) # 32KB chunks
303
if not chunk:
304
break
305
306
# Transform data (example: convert to uppercase)
307
transformed_chunk = chunk.upper()
308
transformed_chunks.append(transformed_chunk)
309
310
# Optional: Process chunks in smaller batches to control memory
311
if len(transformed_chunks) >= 10: # Process every 10 chunks
312
batch_data = b''.join(transformed_chunks)
313
# Could upload partial results here for very large files
314
transformed_chunks = []
315
316
# Combine all transformed data
317
final_data = b''.join(transformed_chunks)
318
319
# Compress before uploading
320
compressed_data = gzip.compress(final_data)
321
322
# Upload transformed result
323
result = await storage.upload(
324
dest_bucket,
325
dest_object,
326
compressed_data,
327
content_type='application/gzip'
328
)
329
330
print(f"Transformed {len(final_data):,} bytes to {len(compressed_data):,} bytes")
331
return result
332
333
# Usage
334
async with Storage() as storage:
335
await transform_and_reupload(
336
storage,
337
'input-bucket', 'raw-data.txt',
338
'output-bucket', 'processed-data.txt.gz'
339
)
340
```
341
342
## Performance Considerations
343
344
### Optimal Chunk Sizes
345
346
```python
347
# Recommended chunk sizes for different scenarios:
348
349
# Network-limited environments (slower connections)
350
chunk_size = 8 * 1024 # 8KB
351
352
# Balanced performance (most common)
353
chunk_size = 64 * 1024 # 64KB
354
355
# High-bandwidth, processing-intensive tasks
356
chunk_size = 1024 * 1024 # 1MB
357
358
# Memory-constrained environments
359
chunk_size = 4 * 1024 # 4KB
360
361
async with storage.download_stream('bucket', 'file') as stream:
362
while True:
363
chunk = await stream.read(chunk_size)
364
if not chunk:
365
break
366
# Process with appropriate chunk size
367
```
368
369
### Resource Management
370
371
```python
372
async with Storage() as storage:
373
# Always use context managers for automatic cleanup
374
async with storage.download_stream('bucket', 'large-file') as stream:
375
# Stream resources are automatically managed
376
while True:
377
chunk = await stream.read(8192)
378
if not chunk:
379
break
380
# Process chunk
381
# Resources automatically released here
382
```