CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-aiofile

Asynchronous file operations with asyncio support.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

streaming-operations.mddocs/

Streaming Operations

Async iterators for chunk-based reading and sequential writing operations. These classes provide efficient memory usage for large files and streaming data processing with configurable chunk sizes.

Capabilities

Reader Class

Async iterator for reading file chunks with configurable chunk size. Provides efficient streaming reads for large files.

class Reader:
    CHUNK_SIZE = 32768  # Default chunk size (32KB)
    
    def __init__(
        self, 
        aio_file: AIOFile, 
        offset: int = 0, 
        chunk_size: int = CHUNK_SIZE
    ):
        """
        Initialize chunk reader.
        
        Args:
            aio_file: AIOFile instance to read from
            offset: Starting byte offset
            chunk_size: Size of each chunk in bytes
        """
    
    @property
    def file(self) -> AIOFile:
        """Associated AIOFile instance."""
    
    @property 
    def encoding(self) -> str:
        """File encoding (same as underlying AIOFile)."""
    
    async def read_chunk(self) -> Union[str, bytes]:
        """
        Read next chunk from file.
        
        Returns:
            Next chunk as bytes (binary mode) or str (text mode)
            Empty bytes/string when end of file reached
        """

Writer Class

Sequential writer for file operations. Maintains internal offset and writes data sequentially.

class Writer:
    def __init__(self, aio_file: AIOFile, offset: int = 0):
        """
        Initialize sequential writer.
        
        Args:
            aio_file: AIOFile instance to write to
            offset: Starting byte offset
        """
    
    async def __call__(self, data: Union[str, bytes]) -> None:
        """
        Write data sequentially to file.
        
        Args:
            data: Data to write (str or bytes, automatically handled)
        """

LineReader Class

Async iterator for reading file lines with configurable line separator and buffer size.

class LineReader:
    CHUNK_SIZE = 4192  # Default chunk size for line reading
    
    def __init__(
        self, 
        aio_file: AIOFile, 
        offset: int = 0, 
        chunk_size: int = CHUNK_SIZE, 
        line_sep: str = "\n"
    ):
        """
        Initialize line reader.
        
        Args:
            aio_file: AIOFile instance to read from
            offset: Starting byte offset
            chunk_size: Size of internal read buffer
            line_sep: Line separator character/string
        """
    
    @property
    def linesep(self) -> Union[str, bytes]:
        """Line separator (str for text mode, bytes for binary mode)."""
    
    async def readline(self) -> Union[str, bytes]:
        """
        Read next line from file.
        
        Returns:
            Next line including separator, or remaining data at EOF
            Empty bytes/string when end of file reached
        """

Helper Function

async def unicode_reader(
    afp: AIOFile, 
    chunk_size: int, 
    offset: int, 
    encoding: str = "utf-8"
) -> Tuple[int, str]:
    """
    Helper for reading Unicode data with proper encoding handling.
    
    Handles partial Unicode characters at chunk boundaries by retrying
    with larger chunks when decode errors occur.
    
    Args:
        afp: AIOFile instance to read from
        chunk_size: Requested chunk size in bytes
        offset: Byte offset to read from
        encoding: Text encoding to use
    
    Returns:
        Tuple of (bytes_read, decoded_string)
        
    Raises:
        UnicodeDecodeError: If encoding fails after retries
    """

Usage Examples

Chunked File Reading

import asyncio
from aiofile import AIOFile, Reader

async def chunked_reading():
    async with AIOFile('large_file.txt', 'r') as afile:
        reader = Reader(afile, chunk_size=8192)
        
        async for chunk in reader:
            print(f"Chunk size: {len(chunk)}")
            # Process chunk without loading entire file into memory
            
        # Alternative: Manual chunk reading
        reader2 = Reader(afile, offset=0, chunk_size=1024)
        while True:
            chunk = await reader2.read_chunk()
            if not chunk:
                break
            print(f"Manual chunk: {len(chunk)} characters")

asyncio.run(chunked_reading())

Sequential Writing

import asyncio
from aiofile import AIOFile, Writer

async def sequential_writing():
    async with AIOFile('output.txt', 'w') as afile:
        writer = Writer(afile)
        
        # Write data sequentially
        await writer("First line\n")
        await writer("Second line\n")
        await writer("Third line\n")
        
        # Writer automatically maintains offset
        await afile.fdsync()  # Ensure data is written

asyncio.run(sequential_writing())

Line-by-Line Processing

import asyncio
from aiofile import AIOFile, LineReader

async def line_processing():
    async with AIOFile('data.txt', 'r') as afile:
        # Default line reader (newline separator)
        line_reader = LineReader(afile)
        
        async for line in line_reader:
            print(f"Line: {line.rstrip()}")
        
        # Custom line separator
        csv_reader = LineReader(afile, offset=0, line_sep="\n")
        async for row in csv_reader:
            fields = row.strip().split(',')
            print(f"CSV fields: {fields}")

asyncio.run(line_processing())

Binary File Streaming

import asyncio
from aiofile import AIOFile, Reader

async def binary_streaming():
    async with AIOFile('data.bin', 'rb') as afile:
        reader = Reader(afile, chunk_size=4096)
        
        total_bytes = 0
        async for chunk in reader:
            total_bytes += len(chunk)
            # Process binary chunk
            print(f"Processed {len(chunk)} bytes")
        
        print(f"Total bytes processed: {total_bytes}")

asyncio.run(binary_streaming())

Custom Line Separators

import asyncio
from aiofile import AIOFile, LineReader

async def custom_separators():
    async with AIOFile('windows_file.txt', 'r') as afile:
        # Windows line endings
        reader = LineReader(afile, line_sep="\r\n")
        
        async for line in reader:
            print(f"Windows line: {line.rstrip()}")
    
    async with AIOFile('mac_file.txt', 'r') as afile:
        # Classic Mac line endings
        reader = LineReader(afile, line_sep="\r")
        
        async for line in reader:
            print(f"Mac line: {line.rstrip()}")

asyncio.run(custom_separators())

Processing Large Files with Limited Memory

import asyncio
from aiofile import AIOFile, Reader

async def memory_efficient_processing():
    """Process huge file without loading into memory."""
    async with AIOFile('huge_file.txt', 'r') as afile:
        reader = Reader(afile, chunk_size=64 * 1024)  # 64KB chunks
        
        word_count = 0
        line_count = 0
        
        async for chunk in reader:
            # Count words and lines in chunk
            word_count += len(chunk.split())
            line_count += chunk.count('\n')
        
        print(f"Words: {word_count}, Lines: {line_count}")

asyncio.run(memory_efficient_processing())

Parallel Processing with Multiple Readers

import asyncio
from aiofile import AIOFile, Reader

async def parallel_processing():
    """Process different parts of file in parallel."""
    async with AIOFile('large_file.txt', 'r') as afile:
        # Get file size
        import os
        file_size = os.path.getsize(afile.name)
        chunk_size = file_size // 4  # Split into 4 parts
        
        # Create readers for different file sections
        readers = [
            Reader(afile, offset=i * chunk_size, chunk_size=8192)
            for i in range(4)
        ]
        
        async def process_section(reader, section_id):
            char_count = 0
            async for chunk in reader:
                char_count += len(chunk)
            print(f"Section {section_id}: {char_count} characters")
        
        # Process sections in parallel
        await asyncio.gather(*[
            process_section(reader, i) 
            for i, reader in enumerate(readers)
        ])

asyncio.run(parallel_processing())

Writing with Multiple Writers

import asyncio
from aiofile import AIOFile, Writer

async def multiple_writers():
    """Use multiple writers for different file sections."""
    async with AIOFile('output.txt', 'w') as afile:
        # Writers for different file positions
        header_writer = Writer(afile, offset=0)
        body_writer = Writer(afile, offset=100)  # Leave space for header
        
        # Write body first
        await body_writer("This is the body content\n")
        await body_writer("More body content\n")
        
        # Write header
        await header_writer("HEADER: Important document\n")
        await header_writer("Created: 2024\n")
        
        await afile.fdsync()

asyncio.run(multiple_writers())

Streaming with Error Handling

import asyncio
from aiofile import AIOFile, Reader, LineReader

async def robust_streaming():
    try:
        async with AIOFile('data.txt', 'r') as afile:
            reader = Reader(afile, chunk_size=1024)
            
            async for chunk in reader:
                try:
                    # Process chunk
                    processed = chunk.upper()
                    print(f"Processed: {len(processed)} chars")
                except Exception as e:
                    print(f"Error processing chunk: {e}")
                    continue
                    
    except FileNotFoundError:
        print("File not found")
    except PermissionError:
        print("Permission denied")

asyncio.run(robust_streaming())

Memory Efficiency

The streaming classes are designed for memory efficiency:

  • Reader: Loads only one chunk at a time, suitable for files larger than available memory
  • Writer: Writes data immediately, no internal buffering
  • LineReader: Uses small internal buffer for line assembly, memory usage independent of line length

Thread Safety

All streaming classes use internal locks to ensure thread safety:

  • Multiple async tasks can safely use the same Reader/Writer/LineReader instance
  • Operations are automatically serialized to maintain file position consistency
  • No external locking required when sharing instances across tasks

Constants

# Default chunk sizes optimized for different use cases
Reader.CHUNK_SIZE = 32768      # 32KB - general purpose reading
LineReader.CHUNK_SIZE = 4192   # 4KB - line-oriented reading

# Encoding retry map for Unicode handling
ENCODING_MAP = {
    "utf-8": 4,    # Max 4-byte UTF-8 sequences
    "utf-16": 8,   # Max 8-byte UTF-16 sequences  
    "UTF-8": 4,
    "UTF-16": 8,
}

Install with Tessl CLI

npx tessl i tessl/pypi-aiofile

docs

core-operations.md

high-level-interface.md

index.md

streaming-operations.md

tile.json