0
# Streaming and Throttling
1
2
Stream wrappers with timeout and throttling support for controlling data transfer rates and managing network connections. Includes basic streams with timeout support, throttled streams with configurable read/write limits, and async iteration capabilities for processing streaming data.
3
4
## Capabilities
5
6
### Basic Stream Operations
7
8
Fundamental stream wrapper providing timeout support for asyncio streams.
9
10
```python { .api }
11
class StreamIO:
12
"""Basic async stream wrapper with timeout support."""
13
14
def __init__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter,
15
timeout: float = None, read_timeout: float = None,
16
write_timeout: float = None):
17
"""
18
Initialize stream wrapper.
19
20
Parameters:
21
- reader: Asyncio stream reader for input operations
22
- writer: Asyncio stream writer for output operations
23
- timeout: Default timeout for all operations (seconds)
24
- read_timeout: Specific timeout for read operations (seconds)
25
- write_timeout: Specific timeout for write operations (seconds)
26
"""
27
28
async def read(self, count: int = -1) -> bytes:
29
"""
30
Read data from stream.
31
32
Parameters:
33
- count: Number of bytes to read (-1 for all available)
34
35
Returns:
36
Bytes read from stream
37
"""
38
39
async def readline(self) -> bytes:
40
"""
41
Read a line from stream (up to newline).
42
43
Returns:
44
Line data as bytes including newline character
45
"""
46
47
async def readexactly(self, count: int) -> bytes:
48
"""
49
Read exactly the specified number of bytes.
50
51
Parameters:
52
- count: Exact number of bytes to read
53
54
Returns:
55
Exactly count bytes from stream
56
57
Raises:
58
IncompleteReadError if fewer bytes available
59
"""
60
61
async def write(self, data: bytes) -> None:
62
"""
63
Write data to stream.
64
65
Parameters:
66
- data: Bytes to write to stream
67
"""
68
69
def close(self) -> None:
70
"""Close the stream writer immediately."""
71
72
async def start_tls(self, sslcontext: ssl.SSLContext, server_hostname: str = None) -> None:
73
"""
74
Upgrade connection to TLS.
75
76
Parameters:
77
- sslcontext: SSL context for encryption
78
- server_hostname: Server hostname for certificate validation
79
"""
80
```
81
82
### Throttling Mechanisms
83
84
Speed limiting components for controlling data transfer rates.
85
86
```python { .api }
87
class Throttle:
88
"""Speed throttling mechanism for rate limiting."""
89
90
def __init__(self, limit: int = None, reset_rate: int = 10):
91
"""
92
Initialize throttle.
93
94
Parameters:
95
- limit: Speed limit in bytes per second (None for unlimited)
96
- reset_rate: Rate statistics reset frequency
97
"""
98
99
async def wait(self) -> None:
100
"""Wait if throttling is needed based on current transfer rate."""
101
102
def append(self, data: bytes, start: float) -> None:
103
"""
104
Record data transfer for rate calculation.
105
106
Parameters:
107
- data: Data that was transferred
108
- start: Transfer start time (from time.time())
109
"""
110
111
def clone(self) -> Throttle:
112
"""
113
Create a copy of this throttle with same settings.
114
115
Returns:
116
New Throttle instance with identical configuration
117
"""
118
119
@property
120
def limit(self) -> Union[int, None]:
121
"""
122
Current speed limit in bytes per second.
123
124
Returns:
125
Speed limit or None if unlimited
126
"""
127
128
class StreamThrottle(NamedTuple):
129
"""Named tuple combining read and write throttles."""
130
131
read: Throttle
132
"""Throttle for read operations."""
133
134
write: Throttle
135
"""Throttle for write operations."""
136
137
def clone(self) -> StreamThrottle:
138
"""
139
Create a copy of this stream throttle.
140
141
Returns:
142
New StreamThrottle with cloned read/write throttles
143
"""
144
145
@classmethod
146
def from_limits(read_speed_limit: int = None, write_speed_limit: int = None) -> StreamThrottle:
147
"""
148
Create StreamThrottle from speed limits.
149
150
Parameters:
151
- read_speed_limit: Read speed limit in bytes/second
152
- write_speed_limit: Write speed limit in bytes/second
153
154
Returns:
155
StreamThrottle configured with specified limits
156
"""
157
```
158
159
### Throttled Stream Operations
160
161
Stream wrapper combining basic I/O with throttling capabilities.
162
163
```python { .api }
164
class ThrottleStreamIO(StreamIO):
165
"""Stream with throttling support for rate-limited operations."""
166
167
def __init__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter,
168
throttles: dict[str, StreamThrottle] = {}, timeout: float = None,
169
read_timeout: float = None, write_timeout: float = None):
170
"""
171
Initialize throttled stream.
172
173
Parameters:
174
- reader: Asyncio stream reader
175
- writer: Asyncio stream writer
176
- throttles: Dictionary mapping throttle names to StreamThrottle objects
177
- timeout: Default timeout for operations
178
- read_timeout: Specific read timeout
179
- write_timeout: Specific write timeout
180
"""
181
182
async def wait(self, name: str) -> None:
183
"""
184
Wait for throttle if needed.
185
186
Parameters:
187
- name: Name of throttle to check
188
"""
189
190
def append(self, name: str, data: bytes, start: float) -> None:
191
"""
192
Record data transfer for throttle calculation.
193
194
Parameters:
195
- name: Name of throttle to update
196
- data: Data that was transferred
197
- start: Transfer start time
198
"""
199
200
async def read(self, count: int = -1) -> bytes:
201
"""Read data with throttling applied."""
202
203
async def readline(self) -> bytes:
204
"""Read line with throttling applied."""
205
206
async def write(self, data: bytes) -> None:
207
"""Write data with throttling applied."""
208
209
async def __aenter__(self) -> ThrottleStreamIO:
210
"""Async context manager entry."""
211
212
async def __aexit__(*args) -> None:
213
"""Async context manager exit."""
214
215
def iter_by_line(self) -> AsyncStreamIterator:
216
"""
217
Create async iterator for line-by-line processing.
218
219
Returns:
220
Async iterator yielding lines as bytes
221
"""
222
223
def iter_by_block(self, count: int = 8192) -> AsyncStreamIterator:
224
"""
225
Create async iterator for block-by-block processing.
226
227
Parameters:
228
- count: Block size in bytes
229
230
Returns:
231
Async iterator yielding data blocks as bytes
232
"""
233
```
234
235
### Stream Iteration
236
237
Async iterators for processing streaming data.
238
239
```python { .api }
240
class AsyncStreamIterator:
241
"""Async iterator for stream data processing."""
242
243
def __aiter__(self) -> AsyncStreamIterator:
244
"""Return async iterator."""
245
246
async def __anext__(self) -> bytes:
247
"""
248
Get next data chunk.
249
250
Returns:
251
Next chunk of stream data
252
253
Raises:
254
StopAsyncIteration when stream ends
255
"""
256
```
257
258
### FTP-Specific Stream Wrapper
259
260
Data connection stream with FTP protocol integration.
261
262
```python { .api }
263
class DataConnectionThrottleStreamIO(ThrottleStreamIO):
264
"""Throttled stream for FTP data connections with protocol integration."""
265
266
def __init__(self, client, reader: asyncio.StreamReader, writer: asyncio.StreamWriter,
267
throttles: dict[str, StreamThrottle], timeout: float,
268
read_timeout: float, write_timeout: float):
269
"""
270
Initialize FTP data connection stream.
271
272
Parameters:
273
- client: FTP client instance for protocol communication
274
- reader: Stream reader for data
275
- writer: Stream writer for data
276
- throttles: Throttling configuration
277
- timeout: Default operation timeout
278
- read_timeout: Read operation timeout
279
- write_timeout: Write operation timeout
280
"""
281
282
async def finish(self, expected_codes: str = "2xx", wait_codes: str = "1xx") -> None:
283
"""
284
Finish data transfer and wait for server confirmation.
285
286
Parameters:
287
- expected_codes: FTP status codes expected on completion
288
- wait_codes: FTP status codes to wait for during transfer
289
"""
290
291
async def __aexit__(exc_type, exc, tb) -> None:
292
"""Async context manager exit with FTP protocol cleanup."""
293
```
294
295
## Usage Examples
296
297
### Basic Stream Operations
298
299
```python
300
import aioftp
301
import asyncio
302
303
async def basic_streaming():
304
"""Example of basic stream operations with timeout."""
305
306
reader, writer = await asyncio.open_connection("example.com", 80)
307
308
# Create stream with timeout
309
stream = aioftp.StreamIO(reader, writer, timeout=30.0)
310
311
try:
312
# Write HTTP request
313
await stream.write(b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")
314
315
# Read response line by line
316
status_line = await stream.readline()
317
print(f"Status: {status_line.decode().strip()}")
318
319
# Read specific amount of data
320
data = await stream.read(1024)
321
print(f"Received {len(data)} bytes")
322
323
finally:
324
stream.close()
325
326
asyncio.run(basic_streaming())
327
```
328
329
### Throttled File Transfer
330
331
```python
332
import aioftp
333
import asyncio
334
335
async def throttled_transfer():
336
"""Example of throttled stream operations."""
337
338
# Create throttles for rate limiting
339
read_throttle = aioftp.Throttle(limit=1024*1024) # 1MB/s read
340
write_throttle = aioftp.Throttle(limit=512*1024) # 512KB/s write
341
342
stream_throttle = aioftp.StreamThrottle(
343
read=read_throttle,
344
write=write_throttle
345
)
346
347
reader, writer = await asyncio.open_connection("ftp.example.com", 21)
348
349
# Create throttled stream
350
throttled_stream = aioftp.ThrottleStreamIO(
351
reader, writer,
352
throttles={"default": stream_throttle},
353
timeout=60.0
354
)
355
356
try:
357
async with throttled_stream:
358
# Transfers will be automatically rate-limited
359
await throttled_stream.write(b"USER anonymous\r\n")
360
response = await throttled_stream.readline()
361
print(f"Response: {response.decode().strip()}")
362
363
except asyncio.TimeoutError:
364
print("Operation timed out")
365
366
asyncio.run(throttled_transfer())
367
```
368
369
### Stream Iteration
370
371
```python
372
import aioftp
373
import asyncio
374
375
async def stream_iteration_example():
376
"""Example of iterating over stream data."""
377
378
async with aioftp.Client.context("ftp.example.com") as client:
379
# Download large file using stream iteration
380
async with client.download_stream("large_file.txt") as stream:
381
# Process file line by line
382
async for line in stream.iter_by_line():
383
# Process each line without loading entire file into memory
384
processed_line = line.decode().strip().upper()
385
print(f"Processed: {processed_line}")
386
387
# Upload file using block iteration
388
async with client.upload_stream("output_file.txt") as stream:
389
# Write data in blocks
390
async for block in stream.iter_by_block(count=4096):
391
# Process and write blocks
392
processed_block = block.upper() # Example processing
393
await stream.write(processed_block)
394
395
asyncio.run(stream_iteration_example())
396
```
397
398
### Advanced Throttling Configuration
399
400
```python
401
import aioftp
402
import asyncio
403
404
async def advanced_throttling():
405
"""Example with multiple throttling configurations."""
406
407
# Different throttles for different operations
408
fast_throttle = aioftp.StreamThrottle.from_limits(
409
read_speed_limit=10*1024*1024, # 10MB/s
410
write_speed_limit=10*1024*1024
411
)
412
413
slow_throttle = aioftp.StreamThrottle.from_limits(
414
read_speed_limit=256*1024, # 256KB/s
415
write_speed_limit=256*1024
416
)
417
418
async with aioftp.Client.context("ftp.example.com") as client:
419
# Fast upload for small files
420
await client.upload("small_file.txt", "small_remote.txt")
421
422
# Throttled upload for large files to limit bandwidth usage
423
async with client.upload_stream("large_file.txt") as stream:
424
# Apply custom throttling
425
stream.throttles["upload"] = slow_throttle
426
427
with open("large_file.txt", "rb") as f:
428
while True:
429
chunk = f.read(8192)
430
if not chunk:
431
break
432
await stream.write(chunk)
433
434
asyncio.run(advanced_throttling())
435
```
436
437
### Custom Stream Processing
438
439
```python
440
import aioftp
441
import asyncio
442
import time
443
444
async def custom_stream_processing():
445
"""Example with custom stream processing and monitoring."""
446
447
class MonitoredThrottle(aioftp.Throttle):
448
"""Custom throttle with transfer monitoring."""
449
450
def __init__(self, *args, **kwargs):
451
super().__init__(*args, **kwargs)
452
self.total_bytes = 0
453
self.start_time = time.time()
454
455
def append(self, data: bytes, start: float):
456
super().append(data, start)
457
self.total_bytes += len(data)
458
459
# Log progress every MB
460
if self.total_bytes % (1024*1024) == 0:
461
elapsed = time.time() - self.start_time
462
rate = self.total_bytes / elapsed if elapsed > 0 else 0
463
print(f"Transferred {self.total_bytes} bytes at {rate:.0f} bytes/sec")
464
465
# Create monitored throttles
466
monitored = aioftp.StreamThrottle(
467
read=MonitoredThrottle(limit=2*1024*1024),
468
write=MonitoredThrottle(limit=2*1024*1024)
469
)
470
471
async with aioftp.Client.context("ftp.example.com") as client:
472
async with client.download_stream("large_file.zip") as stream:
473
stream.throttles["monitored"] = monitored
474
475
with open("downloaded_file.zip", "wb") as f:
476
async for chunk in stream.iter_by_block(count=64*1024):
477
f.write(chunk)
478
479
asyncio.run(custom_stream_processing())
480
```
481
482
## Performance Considerations
483
484
1. **Block Size**: Larger blocks reduce overhead but increase memory usage
485
2. **Throttle Limits**: Set appropriate limits based on network capacity and requirements
486
3. **Timeout Values**: Balance responsiveness with network conditions
487
4. **Stream Iteration**: Use appropriate iteration method (line vs block) for data type
488
5. **Memory Usage**: Stream processing keeps memory usage constant regardless of file size
489
490
## Best Practices
491
492
1. **Always use context managers** (`async with`) for proper resource cleanup
493
2. **Set appropriate timeouts** to prevent hanging operations
494
3. **Use throttling** to be respectful of network resources
495
4. **Monitor transfer progress** for long-running operations
496
5. **Handle exceptions** properly, especially timeout and connection errors
497
6. **Choose appropriate block sizes** based on your use case and memory constraints