Asynchronous FTP client and server implementation for Python's asyncio framework
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Stream wrappers with timeout and throttling support for controlling data transfer rates and managing network connections. Includes basic streams with timeout support, throttled streams with configurable read/write limits, and async iteration capabilities for processing streaming data.
Fundamental stream wrapper providing timeout support for asyncio streams.
class StreamIO:
"""Basic async stream wrapper with timeout support."""
def __init__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter,
timeout: float = None, read_timeout: float = None,
write_timeout: float = None):
"""
Initialize stream wrapper.
Parameters:
- reader: Asyncio stream reader for input operations
- writer: Asyncio stream writer for output operations
- timeout: Default timeout for all operations (seconds)
- read_timeout: Specific timeout for read operations (seconds)
- write_timeout: Specific timeout for write operations (seconds)
"""
async def read(self, count: int = -1) -> bytes:
"""
Read data from stream.
Parameters:
- count: Number of bytes to read (-1 for all available)
Returns:
Bytes read from stream
"""
async def readline(self) -> bytes:
"""
Read a line from stream (up to newline).
Returns:
Line data as bytes including newline character
"""
async def readexactly(self, count: int) -> bytes:
"""
Read exactly the specified number of bytes.
Parameters:
- count: Exact number of bytes to read
Returns:
Exactly count bytes from stream
Raises:
IncompleteReadError if fewer bytes available
"""
async def write(self, data: bytes) -> None:
"""
Write data to stream.
Parameters:
- data: Bytes to write to stream
"""
def close(self) -> None:
"""Close the stream writer immediately."""
async def start_tls(self, sslcontext: ssl.SSLContext, server_hostname: str = None) -> None:
"""
Upgrade connection to TLS.
Parameters:
- sslcontext: SSL context for encryption
- server_hostname: Server hostname for certificate validation
"""Speed limiting components for controlling data transfer rates.
class Throttle:
"""Speed throttling mechanism for rate limiting."""
def __init__(self, limit: int = None, reset_rate: int = 10):
"""
Initialize throttle.
Parameters:
- limit: Speed limit in bytes per second (None for unlimited)
- reset_rate: Rate statistics reset frequency
"""
async def wait(self) -> None:
"""Wait if throttling is needed based on current transfer rate."""
def append(self, data: bytes, start: float) -> None:
"""
Record data transfer for rate calculation.
Parameters:
- data: Data that was transferred
- start: Transfer start time (from time.time())
"""
def clone(self) -> Throttle:
"""
Create a copy of this throttle with same settings.
Returns:
New Throttle instance with identical configuration
"""
@property
def limit(self) -> Union[int, None]:
"""
Current speed limit in bytes per second.
Returns:
Speed limit or None if unlimited
"""
class StreamThrottle(NamedTuple):
"""Named tuple combining read and write throttles."""
read: Throttle
"""Throttle for read operations."""
write: Throttle
"""Throttle for write operations."""
def clone(self) -> StreamThrottle:
"""
Create a copy of this stream throttle.
Returns:
New StreamThrottle with cloned read/write throttles
"""
@classmethod
def from_limits(read_speed_limit: int = None, write_speed_limit: int = None) -> StreamThrottle:
"""
Create StreamThrottle from speed limits.
Parameters:
- read_speed_limit: Read speed limit in bytes/second
- write_speed_limit: Write speed limit in bytes/second
Returns:
StreamThrottle configured with specified limits
"""Stream wrapper combining basic I/O with throttling capabilities.
class ThrottleStreamIO(StreamIO):
"""Stream with throttling support for rate-limited operations."""
def __init__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter,
throttles: dict[str, StreamThrottle] = {}, timeout: float = None,
read_timeout: float = None, write_timeout: float = None):
"""
Initialize throttled stream.
Parameters:
- reader: Asyncio stream reader
- writer: Asyncio stream writer
- throttles: Dictionary mapping throttle names to StreamThrottle objects
- timeout: Default timeout for operations
- read_timeout: Specific read timeout
- write_timeout: Specific write timeout
"""
async def wait(self, name: str) -> None:
"""
Wait for throttle if needed.
Parameters:
- name: Name of throttle to check
"""
def append(self, name: str, data: bytes, start: float) -> None:
"""
Record data transfer for throttle calculation.
Parameters:
- name: Name of throttle to update
- data: Data that was transferred
- start: Transfer start time
"""
async def read(self, count: int = -1) -> bytes:
"""Read data with throttling applied."""
async def readline(self) -> bytes:
"""Read line with throttling applied."""
async def write(self, data: bytes) -> None:
"""Write data with throttling applied."""
async def __aenter__(self) -> ThrottleStreamIO:
"""Async context manager entry."""
async def __aexit__(*args) -> None:
"""Async context manager exit."""
def iter_by_line(self) -> AsyncStreamIterator:
"""
Create async iterator for line-by-line processing.
Returns:
Async iterator yielding lines as bytes
"""
def iter_by_block(self, count: int = 8192) -> AsyncStreamIterator:
"""
Create async iterator for block-by-block processing.
Parameters:
- count: Block size in bytes
Returns:
Async iterator yielding data blocks as bytes
"""Async iterators for processing streaming data.
class AsyncStreamIterator:
"""Async iterator for stream data processing."""
def __aiter__(self) -> AsyncStreamIterator:
"""Return async iterator."""
async def __anext__(self) -> bytes:
"""
Get next data chunk.
Returns:
Next chunk of stream data
Raises:
StopAsyncIteration when stream ends
"""Data connection stream with FTP protocol integration.
class DataConnectionThrottleStreamIO(ThrottleStreamIO):
"""Throttled stream for FTP data connections with protocol integration."""
def __init__(self, client, reader: asyncio.StreamReader, writer: asyncio.StreamWriter,
throttles: dict[str, StreamThrottle], timeout: float,
read_timeout: float, write_timeout: float):
"""
Initialize FTP data connection stream.
Parameters:
- client: FTP client instance for protocol communication
- reader: Stream reader for data
- writer: Stream writer for data
- throttles: Throttling configuration
- timeout: Default operation timeout
- read_timeout: Read operation timeout
- write_timeout: Write operation timeout
"""
async def finish(self, expected_codes: str = "2xx", wait_codes: str = "1xx") -> None:
"""
Finish data transfer and wait for server confirmation.
Parameters:
- expected_codes: FTP status codes expected on completion
- wait_codes: FTP status codes to wait for during transfer
"""
async def __aexit__(exc_type, exc, tb) -> None:
"""Async context manager exit with FTP protocol cleanup."""import aioftp
import asyncio
async def basic_streaming():
"""Example of basic stream operations with timeout."""
reader, writer = await asyncio.open_connection("example.com", 80)
# Create stream with timeout
stream = aioftp.StreamIO(reader, writer, timeout=30.0)
try:
# Write HTTP request
await stream.write(b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")
# Read response line by line
status_line = await stream.readline()
print(f"Status: {status_line.decode().strip()}")
# Read specific amount of data
data = await stream.read(1024)
print(f"Received {len(data)} bytes")
finally:
stream.close()
asyncio.run(basic_streaming())import aioftp
import asyncio
async def throttled_transfer():
"""Example of throttled stream operations."""
# Create throttles for rate limiting
read_throttle = aioftp.Throttle(limit=1024*1024) # 1MB/s read
write_throttle = aioftp.Throttle(limit=512*1024) # 512KB/s write
stream_throttle = aioftp.StreamThrottle(
read=read_throttle,
write=write_throttle
)
reader, writer = await asyncio.open_connection("ftp.example.com", 21)
# Create throttled stream
throttled_stream = aioftp.ThrottleStreamIO(
reader, writer,
throttles={"default": stream_throttle},
timeout=60.0
)
try:
async with throttled_stream:
# Transfers will be automatically rate-limited
await throttled_stream.write(b"USER anonymous\r\n")
response = await throttled_stream.readline()
print(f"Response: {response.decode().strip()}")
except asyncio.TimeoutError:
print("Operation timed out")
asyncio.run(throttled_transfer())import aioftp
import asyncio
async def stream_iteration_example():
"""Example of iterating over stream data."""
async with aioftp.Client.context("ftp.example.com") as client:
# Download large file using stream iteration
async with client.download_stream("large_file.txt") as stream:
# Process file line by line
async for line in stream.iter_by_line():
# Process each line without loading entire file into memory
processed_line = line.decode().strip().upper()
print(f"Processed: {processed_line}")
# Upload file using block iteration
async with client.upload_stream("output_file.txt") as stream:
# Write data in blocks
async for block in stream.iter_by_block(count=4096):
# Process and write blocks
processed_block = block.upper() # Example processing
await stream.write(processed_block)
asyncio.run(stream_iteration_example())import aioftp
import asyncio
async def advanced_throttling():
"""Example with multiple throttling configurations."""
# Different throttles for different operations
fast_throttle = aioftp.StreamThrottle.from_limits(
read_speed_limit=10*1024*1024, # 10MB/s
write_speed_limit=10*1024*1024
)
slow_throttle = aioftp.StreamThrottle.from_limits(
read_speed_limit=256*1024, # 256KB/s
write_speed_limit=256*1024
)
async with aioftp.Client.context("ftp.example.com") as client:
# Fast upload for small files
await client.upload("small_file.txt", "small_remote.txt")
# Throttled upload for large files to limit bandwidth usage
async with client.upload_stream("large_file.txt") as stream:
# Apply custom throttling
stream.throttles["upload"] = slow_throttle
with open("large_file.txt", "rb") as f:
while True:
chunk = f.read(8192)
if not chunk:
break
await stream.write(chunk)
asyncio.run(advanced_throttling())import aioftp
import asyncio
import time
async def custom_stream_processing():
"""Example with custom stream processing and monitoring."""
class MonitoredThrottle(aioftp.Throttle):
"""Custom throttle with transfer monitoring."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.total_bytes = 0
self.start_time = time.time()
def append(self, data: bytes, start: float):
super().append(data, start)
self.total_bytes += len(data)
# Log progress every MB
if self.total_bytes % (1024*1024) == 0:
elapsed = time.time() - self.start_time
rate = self.total_bytes / elapsed if elapsed > 0 else 0
print(f"Transferred {self.total_bytes} bytes at {rate:.0f} bytes/sec")
# Create monitored throttles
monitored = aioftp.StreamThrottle(
read=MonitoredThrottle(limit=2*1024*1024),
write=MonitoredThrottle(limit=2*1024*1024)
)
async with aioftp.Client.context("ftp.example.com") as client:
async with client.download_stream("large_file.zip") as stream:
stream.throttles["monitored"] = monitored
with open("downloaded_file.zip", "wb") as f:
async for chunk in stream.iter_by_block(count=64*1024):
f.write(chunk)
asyncio.run(custom_stream_processing())async with) for proper resource cleanup