CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-aioftp

Asynchronous FTP client and server implementation for Python's asyncio framework

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

streaming.mddocs/

Streaming and Throttling

Stream wrappers with timeout and throttling support for controlling data transfer rates and managing network connections. Includes basic streams with timeout support, throttled streams with configurable read/write limits, and async iteration capabilities for processing streaming data.

Capabilities

Basic Stream Operations

Fundamental stream wrapper providing timeout support for asyncio streams.

class StreamIO:
    """Basic async stream wrapper with timeout support."""
    
    def __init__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter,
                timeout: float = None, read_timeout: float = None, 
                write_timeout: float = None):
        """
        Initialize stream wrapper.
        
        Parameters:
        - reader: Asyncio stream reader for input operations
        - writer: Asyncio stream writer for output operations  
        - timeout: Default timeout for all operations (seconds)
        - read_timeout: Specific timeout for read operations (seconds)
        - write_timeout: Specific timeout for write operations (seconds)
        """
    
    async def read(self, count: int = -1) -> bytes:
        """
        Read data from stream.
        
        Parameters:
        - count: Number of bytes to read (-1 for all available)
        
        Returns:
        Bytes read from stream
        """
    
    async def readline(self) -> bytes:
        """
        Read a line from stream (up to newline).
        
        Returns:
        Line data as bytes including newline character
        """
    
    async def readexactly(self, count: int) -> bytes:
        """
        Read exactly the specified number of bytes.
        
        Parameters:
        - count: Exact number of bytes to read
        
        Returns:
        Exactly count bytes from stream
        
        Raises:
        IncompleteReadError if fewer bytes available
        """
    
    async def write(self, data: bytes) -> None:
        """
        Write data to stream.
        
        Parameters:
        - data: Bytes to write to stream
        """
    
    def close(self) -> None:
        """Close the stream writer immediately."""
    
    async def start_tls(self, sslcontext: ssl.SSLContext, server_hostname: str = None) -> None:
        """
        Upgrade connection to TLS.
        
        Parameters:
        - sslcontext: SSL context for encryption
        - server_hostname: Server hostname for certificate validation
        """

Throttling Mechanisms

Speed limiting components for controlling data transfer rates.

class Throttle:
    """Speed throttling mechanism for rate limiting."""
    
    def __init__(self, limit: int = None, reset_rate: int = 10):
        """
        Initialize throttle.
        
        Parameters:
        - limit: Speed limit in bytes per second (None for unlimited)
        - reset_rate: Rate statistics reset frequency
        """
    
    async def wait(self) -> None:
        """Wait if throttling is needed based on current transfer rate."""
    
    def append(self, data: bytes, start: float) -> None:
        """
        Record data transfer for rate calculation.
        
        Parameters:
        - data: Data that was transferred
        - start: Transfer start time (from time.time())
        """
    
    def clone(self) -> Throttle:
        """
        Create a copy of this throttle with same settings.
        
        Returns:
        New Throttle instance with identical configuration
        """
    
    @property
    def limit(self) -> Union[int, None]:
        """
        Current speed limit in bytes per second.
        
        Returns:
        Speed limit or None if unlimited
        """

class StreamThrottle(NamedTuple):
    """Named tuple combining read and write throttles."""
    
    read: Throttle
    """Throttle for read operations."""
    
    write: Throttle  
    """Throttle for write operations."""
    
    def clone(self) -> StreamThrottle:
        """
        Create a copy of this stream throttle.
        
        Returns:
        New StreamThrottle with cloned read/write throttles
        """
    
    @classmethod
    def from_limits(read_speed_limit: int = None, write_speed_limit: int = None) -> StreamThrottle:
        """
        Create StreamThrottle from speed limits.
        
        Parameters:
        - read_speed_limit: Read speed limit in bytes/second
        - write_speed_limit: Write speed limit in bytes/second
        
        Returns:
        StreamThrottle configured with specified limits
        """

Throttled Stream Operations

Stream wrapper combining basic I/O with throttling capabilities.

class ThrottleStreamIO(StreamIO):
    """Stream with throttling support for rate-limited operations."""
    
    def __init__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter,
                throttles: dict[str, StreamThrottle] = {}, timeout: float = None,
                read_timeout: float = None, write_timeout: float = None):
        """
        Initialize throttled stream.
        
        Parameters:
        - reader: Asyncio stream reader
        - writer: Asyncio stream writer
        - throttles: Dictionary mapping throttle names to StreamThrottle objects
        - timeout: Default timeout for operations
        - read_timeout: Specific read timeout
        - write_timeout: Specific write timeout
        """
    
    async def wait(self, name: str) -> None:
        """
        Wait for throttle if needed.
        
        Parameters:
        - name: Name of throttle to check
        """
    
    def append(self, name: str, data: bytes, start: float) -> None:
        """
        Record data transfer for throttle calculation.
        
        Parameters:
        - name: Name of throttle to update
        - data: Data that was transferred
        - start: Transfer start time
        """
    
    async def read(self, count: int = -1) -> bytes:
        """Read data with throttling applied."""
    
    async def readline(self) -> bytes:
        """Read line with throttling applied."""
    
    async def write(self, data: bytes) -> None:
        """Write data with throttling applied."""
    
    async def __aenter__(self) -> ThrottleStreamIO:
        """Async context manager entry."""
    
    async def __aexit__(*args) -> None:
        """Async context manager exit."""
    
    def iter_by_line(self) -> AsyncStreamIterator:
        """
        Create async iterator for line-by-line processing.
        
        Returns:
        Async iterator yielding lines as bytes
        """
    
    def iter_by_block(self, count: int = 8192) -> AsyncStreamIterator:
        """
        Create async iterator for block-by-block processing.
        
        Parameters:
        - count: Block size in bytes
        
        Returns:
        Async iterator yielding data blocks as bytes
        """

Stream Iteration

Async iterators for processing streaming data.

class AsyncStreamIterator:
    """Async iterator for stream data processing."""
    
    def __aiter__(self) -> AsyncStreamIterator:
        """Return async iterator."""
    
    async def __anext__(self) -> bytes:
        """
        Get next data chunk.
        
        Returns:
        Next chunk of stream data
        
        Raises:
        StopAsyncIteration when stream ends
        """

FTP-Specific Stream Wrapper

Data connection stream with FTP protocol integration.

class DataConnectionThrottleStreamIO(ThrottleStreamIO):
    """Throttled stream for FTP data connections with protocol integration."""
    
    def __init__(self, client, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, 
                throttles: dict[str, StreamThrottle], timeout: float,
                read_timeout: float, write_timeout: float):
        """
        Initialize FTP data connection stream.
        
        Parameters:
        - client: FTP client instance for protocol communication
        - reader: Stream reader for data
        - writer: Stream writer for data
        - throttles: Throttling configuration
        - timeout: Default operation timeout
        - read_timeout: Read operation timeout  
        - write_timeout: Write operation timeout
        """
    
    async def finish(self, expected_codes: str = "2xx", wait_codes: str = "1xx") -> None:
        """
        Finish data transfer and wait for server confirmation.
        
        Parameters:
        - expected_codes: FTP status codes expected on completion
        - wait_codes: FTP status codes to wait for during transfer
        """
    
    async def __aexit__(exc_type, exc, tb) -> None:
        """Async context manager exit with FTP protocol cleanup."""

Usage Examples

Basic Stream Operations

import aioftp
import asyncio

async def basic_streaming():
    """Example of basic stream operations with timeout."""
    
    reader, writer = await asyncio.open_connection("example.com", 80)
    
    # Create stream with timeout
    stream = aioftp.StreamIO(reader, writer, timeout=30.0)
    
    try:
        # Write HTTP request
        await stream.write(b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")
        
        # Read response line by line
        status_line = await stream.readline()
        print(f"Status: {status_line.decode().strip()}")
        
        # Read specific amount of data
        data = await stream.read(1024)
        print(f"Received {len(data)} bytes")
        
    finally:
        stream.close()

asyncio.run(basic_streaming())

Throttled File Transfer

import aioftp
import asyncio

async def throttled_transfer():
    """Example of throttled stream operations."""
    
    # Create throttles for rate limiting
    read_throttle = aioftp.Throttle(limit=1024*1024)  # 1MB/s read
    write_throttle = aioftp.Throttle(limit=512*1024)  # 512KB/s write
    
    stream_throttle = aioftp.StreamThrottle(
        read=read_throttle,
        write=write_throttle
    )
    
    reader, writer = await asyncio.open_connection("ftp.example.com", 21)
    
    # Create throttled stream
    throttled_stream = aioftp.ThrottleStreamIO(
        reader, writer,
        throttles={"default": stream_throttle},
        timeout=60.0
    )
    
    try:
        async with throttled_stream:
            # Transfers will be automatically rate-limited
            await throttled_stream.write(b"USER anonymous\r\n")
            response = await throttled_stream.readline()
            print(f"Response: {response.decode().strip()}")
            
    except asyncio.TimeoutError:
        print("Operation timed out")

asyncio.run(throttled_transfer())

Stream Iteration

import aioftp
import asyncio

async def stream_iteration_example():
    """Example of iterating over stream data."""
    
    async with aioftp.Client.context("ftp.example.com") as client:
        # Download large file using stream iteration
        async with client.download_stream("large_file.txt") as stream:
            # Process file line by line
            async for line in stream.iter_by_line():
                # Process each line without loading entire file into memory
                processed_line = line.decode().strip().upper()
                print(f"Processed: {processed_line}")
        
        # Upload file using block iteration  
        async with client.upload_stream("output_file.txt") as stream:
            # Write data in blocks
            async for block in stream.iter_by_block(count=4096):
                # Process and write blocks
                processed_block = block.upper()  # Example processing
                await stream.write(processed_block)

asyncio.run(stream_iteration_example())

Advanced Throttling Configuration

import aioftp
import asyncio

async def advanced_throttling():
    """Example with multiple throttling configurations."""
    
    # Different throttles for different operations
    fast_throttle = aioftp.StreamThrottle.from_limits(
        read_speed_limit=10*1024*1024,  # 10MB/s
        write_speed_limit=10*1024*1024
    )
    
    slow_throttle = aioftp.StreamThrottle.from_limits(
        read_speed_limit=256*1024,  # 256KB/s
        write_speed_limit=256*1024
    )
    
    async with aioftp.Client.context("ftp.example.com") as client:
        # Fast upload for small files
        await client.upload("small_file.txt", "small_remote.txt")
        
        # Throttled upload for large files to limit bandwidth usage
        async with client.upload_stream("large_file.txt") as stream:
            # Apply custom throttling
            stream.throttles["upload"] = slow_throttle
            
            with open("large_file.txt", "rb") as f:
                while True:
                    chunk = f.read(8192)
                    if not chunk:
                        break
                    await stream.write(chunk)

asyncio.run(advanced_throttling())

Custom Stream Processing

import aioftp
import asyncio
import time

async def custom_stream_processing():
    """Example with custom stream processing and monitoring."""
    
    class MonitoredThrottle(aioftp.Throttle):
        """Custom throttle with transfer monitoring."""
        
        def __init__(self, *args, **kwargs):
            super().__init__(*args, **kwargs)
            self.total_bytes = 0
            self.start_time = time.time()
        
        def append(self, data: bytes, start: float):
            super().append(data, start)
            self.total_bytes += len(data)
            
            # Log progress every MB
            if self.total_bytes % (1024*1024) == 0:
                elapsed = time.time() - self.start_time
                rate = self.total_bytes / elapsed if elapsed > 0 else 0
                print(f"Transferred {self.total_bytes} bytes at {rate:.0f} bytes/sec")
    
    # Create monitored throttles
    monitored = aioftp.StreamThrottle(
        read=MonitoredThrottle(limit=2*1024*1024),
        write=MonitoredThrottle(limit=2*1024*1024)
    )
    
    async with aioftp.Client.context("ftp.example.com") as client:
        async with client.download_stream("large_file.zip") as stream:
            stream.throttles["monitored"] = monitored
            
            with open("downloaded_file.zip", "wb") as f:
                async for chunk in stream.iter_by_block(count=64*1024):
                    f.write(chunk)

asyncio.run(custom_stream_processing())

Performance Considerations

  1. Block Size: Larger blocks reduce overhead but increase memory usage
  2. Throttle Limits: Set appropriate limits based on network capacity and requirements
  3. Timeout Values: Balance responsiveness with network conditions
  4. Stream Iteration: Use appropriate iteration method (line vs block) for data type
  5. Memory Usage: Stream processing keeps memory usage constant regardless of file size

Best Practices

  1. Always use context managers (async with) for proper resource cleanup
  2. Set appropriate timeouts to prevent hanging operations
  3. Use throttling to be respectful of network resources
  4. Monitor transfer progress for long-running operations
  5. Handle exceptions properly, especially timeout and connection errors
  6. Choose appropriate block sizes based on your use case and memory constraints

Install with Tessl CLI

npx tessl i tessl/pypi-aioftp

docs

client.md

errors.md

index.md

pathio.md

server.md

streaming.md

tile.json