0
# Streaming Operations
1
2
Async iterators for chunk-based reading and sequential writing operations. These classes provide efficient memory usage for large files and streaming data processing with configurable chunk sizes.
3
4
## Capabilities
5
6
### Reader Class
7
8
Async iterator for reading file chunks with configurable chunk size. Provides efficient streaming reads for large files.
9
10
```python { .api }
11
class Reader:
12
CHUNK_SIZE = 32768 # Default chunk size (32KB)
13
14
def __init__(
15
self,
16
aio_file: AIOFile,
17
offset: int = 0,
18
chunk_size: int = CHUNK_SIZE
19
):
20
"""
21
Initialize chunk reader.
22
23
Args:
24
aio_file: AIOFile instance to read from
25
offset: Starting byte offset
26
chunk_size: Size of each chunk in bytes
27
"""
28
29
@property
30
def file(self) -> AIOFile:
31
"""Associated AIOFile instance."""
32
33
@property
34
def encoding(self) -> str:
35
"""File encoding (same as underlying AIOFile)."""
36
37
async def read_chunk(self) -> Union[str, bytes]:
38
"""
39
Read next chunk from file.
40
41
Returns:
42
Next chunk as bytes (binary mode) or str (text mode)
43
Empty bytes/string when end of file reached
44
"""
45
```
46
47
### Writer Class
48
49
Sequential writer for file operations. Maintains internal offset and writes data sequentially.
50
51
```python { .api }
52
class Writer:
53
def __init__(self, aio_file: AIOFile, offset: int = 0):
54
"""
55
Initialize sequential writer.
56
57
Args:
58
aio_file: AIOFile instance to write to
59
offset: Starting byte offset
60
"""
61
62
async def __call__(self, data: Union[str, bytes]) -> None:
63
"""
64
Write data sequentially to file.
65
66
Args:
67
data: Data to write (str or bytes, automatically handled)
68
"""
69
```
70
71
### LineReader Class
72
73
Async iterator for reading file lines with configurable line separator and buffer size.
74
75
```python { .api }
76
class LineReader:
77
CHUNK_SIZE = 4192 # Default chunk size for line reading
78
79
def __init__(
80
self,
81
aio_file: AIOFile,
82
offset: int = 0,
83
chunk_size: int = CHUNK_SIZE,
84
line_sep: str = "\n"
85
):
86
"""
87
Initialize line reader.
88
89
Args:
90
aio_file: AIOFile instance to read from
91
offset: Starting byte offset
92
chunk_size: Size of internal read buffer
93
line_sep: Line separator character/string
94
"""
95
96
@property
97
def linesep(self) -> Union[str, bytes]:
98
"""Line separator (str for text mode, bytes for binary mode)."""
99
100
async def readline(self) -> Union[str, bytes]:
101
"""
102
Read next line from file.
103
104
Returns:
105
Next line including separator, or remaining data at EOF
106
Empty bytes/string when end of file reached
107
"""
108
```
109
110
### Helper Function
111
112
```python { .api }
113
async def unicode_reader(
114
afp: AIOFile,
115
chunk_size: int,
116
offset: int,
117
encoding: str = "utf-8"
118
) -> Tuple[int, str]:
119
"""
120
Helper for reading Unicode data with proper encoding handling.
121
122
Handles partial Unicode characters at chunk boundaries by retrying
123
with larger chunks when decode errors occur.
124
125
Args:
126
afp: AIOFile instance to read from
127
chunk_size: Requested chunk size in bytes
128
offset: Byte offset to read from
129
encoding: Text encoding to use
130
131
Returns:
132
Tuple of (bytes_read, decoded_string)
133
134
Raises:
135
UnicodeDecodeError: If encoding fails after retries
136
"""
137
```
138
139
## Usage Examples
140
141
### Chunked File Reading
142
143
```python
144
import asyncio
145
from aiofile import AIOFile, Reader
146
147
async def chunked_reading():
148
async with AIOFile('large_file.txt', 'r') as afile:
149
reader = Reader(afile, chunk_size=8192)
150
151
async for chunk in reader:
152
print(f"Chunk size: {len(chunk)}")
153
# Process chunk without loading entire file into memory
154
155
# Alternative: Manual chunk reading
156
reader2 = Reader(afile, offset=0, chunk_size=1024)
157
while True:
158
chunk = await reader2.read_chunk()
159
if not chunk:
160
break
161
print(f"Manual chunk: {len(chunk)} characters")
162
163
asyncio.run(chunked_reading())
164
```
165
166
### Sequential Writing
167
168
```python
169
import asyncio
170
from aiofile import AIOFile, Writer
171
172
async def sequential_writing():
173
async with AIOFile('output.txt', 'w') as afile:
174
writer = Writer(afile)
175
176
# Write data sequentially
177
await writer("First line\n")
178
await writer("Second line\n")
179
await writer("Third line\n")
180
181
# Writer automatically maintains offset
182
await afile.fdsync() # Ensure data is written
183
184
asyncio.run(sequential_writing())
185
```
186
187
### Line-by-Line Processing
188
189
```python
190
import asyncio
191
from aiofile import AIOFile, LineReader
192
193
async def line_processing():
194
async with AIOFile('data.txt', 'r') as afile:
195
# Default line reader (newline separator)
196
line_reader = LineReader(afile)
197
198
async for line in line_reader:
199
print(f"Line: {line.rstrip()}")
200
201
# Custom line separator
202
csv_reader = LineReader(afile, offset=0, line_sep="\n")
203
async for row in csv_reader:
204
fields = row.strip().split(',')
205
print(f"CSV fields: {fields}")
206
207
asyncio.run(line_processing())
208
```
209
210
### Binary File Streaming
211
212
```python
213
import asyncio
214
from aiofile import AIOFile, Reader
215
216
async def binary_streaming():
217
async with AIOFile('data.bin', 'rb') as afile:
218
reader = Reader(afile, chunk_size=4096)
219
220
total_bytes = 0
221
async for chunk in reader:
222
total_bytes += len(chunk)
223
# Process binary chunk
224
print(f"Processed {len(chunk)} bytes")
225
226
print(f"Total bytes processed: {total_bytes}")
227
228
asyncio.run(binary_streaming())
229
```
230
231
### Custom Line Separators
232
233
```python
234
import asyncio
235
from aiofile import AIOFile, LineReader
236
237
async def custom_separators():
238
async with AIOFile('windows_file.txt', 'r') as afile:
239
# Windows line endings
240
reader = LineReader(afile, line_sep="\r\n")
241
242
async for line in reader:
243
print(f"Windows line: {line.rstrip()}")
244
245
async with AIOFile('mac_file.txt', 'r') as afile:
246
# Classic Mac line endings
247
reader = LineReader(afile, line_sep="\r")
248
249
async for line in reader:
250
print(f"Mac line: {line.rstrip()}")
251
252
asyncio.run(custom_separators())
253
```
254
255
### Processing Large Files with Limited Memory
256
257
```python
258
import asyncio
259
from aiofile import AIOFile, Reader
260
261
async def memory_efficient_processing():
262
"""Process huge file without loading into memory."""
263
async with AIOFile('huge_file.txt', 'r') as afile:
264
reader = Reader(afile, chunk_size=64 * 1024) # 64KB chunks
265
266
word_count = 0
267
line_count = 0
268
269
async for chunk in reader:
270
# Count words and lines in chunk
271
word_count += len(chunk.split())
272
line_count += chunk.count('\n')
273
274
print(f"Words: {word_count}, Lines: {line_count}")
275
276
asyncio.run(memory_efficient_processing())
277
```
278
279
### Parallel Processing with Multiple Readers
280
281
```python
282
import asyncio
283
from aiofile import AIOFile, Reader
284
285
async def parallel_processing():
286
"""Process different parts of file in parallel."""
287
async with AIOFile('large_file.txt', 'r') as afile:
288
# Get file size
289
import os
290
file_size = os.path.getsize(afile.name)
291
chunk_size = file_size // 4 # Split into 4 parts
292
293
# Create readers for different file sections
294
readers = [
295
Reader(afile, offset=i * chunk_size, chunk_size=8192)
296
for i in range(4)
297
]
298
299
async def process_section(reader, section_id):
300
char_count = 0
301
async for chunk in reader:
302
char_count += len(chunk)
303
print(f"Section {section_id}: {char_count} characters")
304
305
# Process sections in parallel
306
await asyncio.gather(*[
307
process_section(reader, i)
308
for i, reader in enumerate(readers)
309
])
310
311
asyncio.run(parallel_processing())
312
```
313
314
### Writing with Multiple Writers
315
316
```python
317
import asyncio
318
from aiofile import AIOFile, Writer
319
320
async def multiple_writers():
321
"""Use multiple writers for different file sections."""
322
async with AIOFile('output.txt', 'w') as afile:
323
# Writers for different file positions
324
header_writer = Writer(afile, offset=0)
325
body_writer = Writer(afile, offset=100) # Leave space for header
326
327
# Write body first
328
await body_writer("This is the body content\n")
329
await body_writer("More body content\n")
330
331
# Write header
332
await header_writer("HEADER: Important document\n")
333
await header_writer("Created: 2024\n")
334
335
await afile.fdsync()
336
337
asyncio.run(multiple_writers())
338
```
339
340
### Streaming with Error Handling
341
342
```python
343
import asyncio
344
from aiofile import AIOFile, Reader, LineReader
345
346
async def robust_streaming():
347
try:
348
async with AIOFile('data.txt', 'r') as afile:
349
reader = Reader(afile, chunk_size=1024)
350
351
async for chunk in reader:
352
try:
353
# Process chunk
354
processed = chunk.upper()
355
print(f"Processed: {len(processed)} chars")
356
except Exception as e:
357
print(f"Error processing chunk: {e}")
358
continue
359
360
except FileNotFoundError:
361
print("File not found")
362
except PermissionError:
363
print("Permission denied")
364
365
asyncio.run(robust_streaming())
366
```
367
368
## Memory Efficiency
369
370
The streaming classes are designed for memory efficiency:
371
372
- **Reader**: Loads only one chunk at a time, suitable for files larger than available memory
373
- **Writer**: Writes data immediately, no internal buffering
374
- **LineReader**: Uses small internal buffer for line assembly, memory usage independent of line length
375
376
## Thread Safety
377
378
All streaming classes use internal locks to ensure thread safety:
379
380
- Multiple async tasks can safely use the same Reader/Writer/LineReader instance
381
- Operations are automatically serialized to maintain file position consistency
382
- No external locking required when sharing instances across tasks
383
384
## Constants
385
386
```python { .api }
387
# Default chunk sizes optimized for different use cases
388
Reader.CHUNK_SIZE = 32768 # 32KB - general purpose reading
389
LineReader.CHUNK_SIZE = 4192 # 4KB - line-oriented reading
390
391
# Encoding retry map for Unicode handling
392
ENCODING_MAP = {
393
"utf-8": 4, # Max 4-byte UTF-8 sequences
394
"utf-16": 8, # Max 8-byte UTF-16 sequences
395
"UTF-8": 4,
396
"UTF-16": 8,
397
}
398
```