Asynchronous file operations with asyncio support.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Async iterators for chunk-based reading and sequential writing operations. These classes provide efficient memory usage for large files and streaming data processing with configurable chunk sizes.
Async iterator for reading file chunks with configurable chunk size. Provides efficient streaming reads for large files.
class Reader:
CHUNK_SIZE = 32768 # Default chunk size (32KB)
def __init__(
self,
aio_file: AIOFile,
offset: int = 0,
chunk_size: int = CHUNK_SIZE
):
"""
Initialize chunk reader.
Args:
aio_file: AIOFile instance to read from
offset: Starting byte offset
chunk_size: Size of each chunk in bytes
"""
@property
def file(self) -> AIOFile:
"""Associated AIOFile instance."""
@property
def encoding(self) -> str:
"""File encoding (same as underlying AIOFile)."""
async def read_chunk(self) -> Union[str, bytes]:
"""
Read next chunk from file.
Returns:
Next chunk as bytes (binary mode) or str (text mode)
Empty bytes/string when end of file reached
"""Sequential writer for file operations. Maintains internal offset and writes data sequentially.
class Writer:
def __init__(self, aio_file: AIOFile, offset: int = 0):
"""
Initialize sequential writer.
Args:
aio_file: AIOFile instance to write to
offset: Starting byte offset
"""
async def __call__(self, data: Union[str, bytes]) -> None:
"""
Write data sequentially to file.
Args:
data: Data to write (str or bytes, automatically handled)
"""Async iterator for reading file lines with configurable line separator and buffer size.
class LineReader:
CHUNK_SIZE = 4192 # Default chunk size for line reading
def __init__(
self,
aio_file: AIOFile,
offset: int = 0,
chunk_size: int = CHUNK_SIZE,
line_sep: str = "\n"
):
"""
Initialize line reader.
Args:
aio_file: AIOFile instance to read from
offset: Starting byte offset
chunk_size: Size of internal read buffer
line_sep: Line separator character/string
"""
@property
def linesep(self) -> Union[str, bytes]:
"""Line separator (str for text mode, bytes for binary mode)."""
async def readline(self) -> Union[str, bytes]:
"""
Read next line from file.
Returns:
Next line including separator, or remaining data at EOF
Empty bytes/string when end of file reached
"""async def unicode_reader(
afp: AIOFile,
chunk_size: int,
offset: int,
encoding: str = "utf-8"
) -> Tuple[int, str]:
"""
Helper for reading Unicode data with proper encoding handling.
Handles partial Unicode characters at chunk boundaries by retrying
with larger chunks when decode errors occur.
Args:
afp: AIOFile instance to read from
chunk_size: Requested chunk size in bytes
offset: Byte offset to read from
encoding: Text encoding to use
Returns:
Tuple of (bytes_read, decoded_string)
Raises:
UnicodeDecodeError: If encoding fails after retries
"""import asyncio
from aiofile import AIOFile, Reader
async def chunked_reading():
async with AIOFile('large_file.txt', 'r') as afile:
reader = Reader(afile, chunk_size=8192)
async for chunk in reader:
print(f"Chunk size: {len(chunk)}")
# Process chunk without loading entire file into memory
# Alternative: Manual chunk reading
reader2 = Reader(afile, offset=0, chunk_size=1024)
while True:
chunk = await reader2.read_chunk()
if not chunk:
break
print(f"Manual chunk: {len(chunk)} characters")
asyncio.run(chunked_reading())import asyncio
from aiofile import AIOFile, Writer
async def sequential_writing():
async with AIOFile('output.txt', 'w') as afile:
writer = Writer(afile)
# Write data sequentially
await writer("First line\n")
await writer("Second line\n")
await writer("Third line\n")
# Writer automatically maintains offset
await afile.fdsync() # Ensure data is written
asyncio.run(sequential_writing())import asyncio
from aiofile import AIOFile, LineReader
async def line_processing():
async with AIOFile('data.txt', 'r') as afile:
# Default line reader (newline separator)
line_reader = LineReader(afile)
async for line in line_reader:
print(f"Line: {line.rstrip()}")
# Custom line separator
csv_reader = LineReader(afile, offset=0, line_sep="\n")
async for row in csv_reader:
fields = row.strip().split(',')
print(f"CSV fields: {fields}")
asyncio.run(line_processing())import asyncio
from aiofile import AIOFile, Reader
async def binary_streaming():
async with AIOFile('data.bin', 'rb') as afile:
reader = Reader(afile, chunk_size=4096)
total_bytes = 0
async for chunk in reader:
total_bytes += len(chunk)
# Process binary chunk
print(f"Processed {len(chunk)} bytes")
print(f"Total bytes processed: {total_bytes}")
asyncio.run(binary_streaming())import asyncio
from aiofile import AIOFile, LineReader
async def custom_separators():
async with AIOFile('windows_file.txt', 'r') as afile:
# Windows line endings
reader = LineReader(afile, line_sep="\r\n")
async for line in reader:
print(f"Windows line: {line.rstrip()}")
async with AIOFile('mac_file.txt', 'r') as afile:
# Classic Mac line endings
reader = LineReader(afile, line_sep="\r")
async for line in reader:
print(f"Mac line: {line.rstrip()}")
asyncio.run(custom_separators())import asyncio
from aiofile import AIOFile, Reader
async def memory_efficient_processing():
"""Process huge file without loading into memory."""
async with AIOFile('huge_file.txt', 'r') as afile:
reader = Reader(afile, chunk_size=64 * 1024) # 64KB chunks
word_count = 0
line_count = 0
async for chunk in reader:
# Count words and lines in chunk
word_count += len(chunk.split())
line_count += chunk.count('\n')
print(f"Words: {word_count}, Lines: {line_count}")
asyncio.run(memory_efficient_processing())import asyncio
from aiofile import AIOFile, Reader
async def parallel_processing():
"""Process different parts of file in parallel."""
async with AIOFile('large_file.txt', 'r') as afile:
# Get file size
import os
file_size = os.path.getsize(afile.name)
chunk_size = file_size // 4 # Split into 4 parts
# Create readers for different file sections
readers = [
Reader(afile, offset=i * chunk_size, chunk_size=8192)
for i in range(4)
]
async def process_section(reader, section_id):
char_count = 0
async for chunk in reader:
char_count += len(chunk)
print(f"Section {section_id}: {char_count} characters")
# Process sections in parallel
await asyncio.gather(*[
process_section(reader, i)
for i, reader in enumerate(readers)
])
asyncio.run(parallel_processing())import asyncio
from aiofile import AIOFile, Writer
async def multiple_writers():
"""Use multiple writers for different file sections."""
async with AIOFile('output.txt', 'w') as afile:
# Writers for different file positions
header_writer = Writer(afile, offset=0)
body_writer = Writer(afile, offset=100) # Leave space for header
# Write body first
await body_writer("This is the body content\n")
await body_writer("More body content\n")
# Write header
await header_writer("HEADER: Important document\n")
await header_writer("Created: 2024\n")
await afile.fdsync()
asyncio.run(multiple_writers())import asyncio
from aiofile import AIOFile, Reader, LineReader
async def robust_streaming():
try:
async with AIOFile('data.txt', 'r') as afile:
reader = Reader(afile, chunk_size=1024)
async for chunk in reader:
try:
# Process chunk
processed = chunk.upper()
print(f"Processed: {len(processed)} chars")
except Exception as e:
print(f"Error processing chunk: {e}")
continue
except FileNotFoundError:
print("File not found")
except PermissionError:
print("Permission denied")
asyncio.run(robust_streaming())The streaming classes are designed for memory efficiency:
All streaming classes use internal locks to ensure thread safety:
# Default chunk sizes optimized for different use cases
Reader.CHUNK_SIZE = 32768 # 32KB - general purpose reading
LineReader.CHUNK_SIZE = 4192 # 4KB - line-oriented reading
# Encoding retry map for Unicode handling
ENCODING_MAP = {
"utf-8": 4, # Max 4-byte UTF-8 sequences
"utf-16": 8, # Max 8-byte UTF-16 sequences
"UTF-8": 4,
"UTF-16": 8,
}Install with Tessl CLI
npx tessl i tessl/pypi-aiofile