or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-operations.mdhigh-level-interface.mdindex.mdstreaming-operations.md

streaming-operations.mddocs/

0

# Streaming Operations

1

2

Async iterators for chunk-based reading and sequential writing operations. These classes provide efficient memory usage for large files and streaming data processing with configurable chunk sizes.

3

4

## Capabilities

5

6

### Reader Class

7

8

Async iterator for reading file chunks with configurable chunk size. Provides efficient streaming reads for large files.

9

10

```python { .api }

11

class Reader:

12

CHUNK_SIZE = 32768 # Default chunk size (32KB)

13

14

def __init__(

15

self,

16

aio_file: AIOFile,

17

offset: int = 0,

18

chunk_size: int = CHUNK_SIZE

19

):

20

"""

21

Initialize chunk reader.

22

23

Args:

24

aio_file: AIOFile instance to read from

25

offset: Starting byte offset

26

chunk_size: Size of each chunk in bytes

27

"""

28

29

@property

30

def file(self) -> AIOFile:

31

"""Associated AIOFile instance."""

32

33

@property

34

def encoding(self) -> str:

35

"""File encoding (same as underlying AIOFile)."""

36

37

async def read_chunk(self) -> Union[str, bytes]:

38

"""

39

Read next chunk from file.

40

41

Returns:

42

Next chunk as bytes (binary mode) or str (text mode)

43

Empty bytes/string when end of file reached

44

"""

45

```

46

47

### Writer Class

48

49

Sequential writer for file operations. Maintains internal offset and writes data sequentially.

50

51

```python { .api }

52

class Writer:

53

def __init__(self, aio_file: AIOFile, offset: int = 0):

54

"""

55

Initialize sequential writer.

56

57

Args:

58

aio_file: AIOFile instance to write to

59

offset: Starting byte offset

60

"""

61

62

async def __call__(self, data: Union[str, bytes]) -> None:

63

"""

64

Write data sequentially to file.

65

66

Args:

67

data: Data to write (str or bytes, automatically handled)

68

"""

69

```

70

71

### LineReader Class

72

73

Async iterator for reading file lines with configurable line separator and buffer size.

74

75

```python { .api }

76

class LineReader:

77

CHUNK_SIZE = 4192 # Default chunk size for line reading

78

79

def __init__(

80

self,

81

aio_file: AIOFile,

82

offset: int = 0,

83

chunk_size: int = CHUNK_SIZE,

84

line_sep: str = "\n"

85

):

86

"""

87

Initialize line reader.

88

89

Args:

90

aio_file: AIOFile instance to read from

91

offset: Starting byte offset

92

chunk_size: Size of internal read buffer

93

line_sep: Line separator character/string

94

"""

95

96

@property

97

def linesep(self) -> Union[str, bytes]:

98

"""Line separator (str for text mode, bytes for binary mode)."""

99

100

async def readline(self) -> Union[str, bytes]:

101

"""

102

Read next line from file.

103

104

Returns:

105

Next line including separator, or remaining data at EOF

106

Empty bytes/string when end of file reached

107

"""

108

```

109

110

### Helper Function

111

112

```python { .api }

113

async def unicode_reader(

114

afp: AIOFile,

115

chunk_size: int,

116

offset: int,

117

encoding: str = "utf-8"

118

) -> Tuple[int, str]:

119

"""

120

Helper for reading Unicode data with proper encoding handling.

121

122

Handles partial Unicode characters at chunk boundaries by retrying

123

with larger chunks when decode errors occur.

124

125

Args:

126

afp: AIOFile instance to read from

127

chunk_size: Requested chunk size in bytes

128

offset: Byte offset to read from

129

encoding: Text encoding to use

130

131

Returns:

132

Tuple of (bytes_read, decoded_string)

133

134

Raises:

135

UnicodeDecodeError: If encoding fails after retries

136

"""

137

```

138

139

## Usage Examples

140

141

### Chunked File Reading

142

143

```python

144

import asyncio

145

from aiofile import AIOFile, Reader

146

147

async def chunked_reading():

148

async with AIOFile('large_file.txt', 'r') as afile:

149

reader = Reader(afile, chunk_size=8192)

150

151

async for chunk in reader:

152

print(f"Chunk size: {len(chunk)}")

153

# Process chunk without loading entire file into memory

154

155

# Alternative: Manual chunk reading

156

reader2 = Reader(afile, offset=0, chunk_size=1024)

157

while True:

158

chunk = await reader2.read_chunk()

159

if not chunk:

160

break

161

print(f"Manual chunk: {len(chunk)} characters")

162

163

asyncio.run(chunked_reading())

164

```

165

166

### Sequential Writing

167

168

```python

169

import asyncio

170

from aiofile import AIOFile, Writer

171

172

async def sequential_writing():

173

async with AIOFile('output.txt', 'w') as afile:

174

writer = Writer(afile)

175

176

# Write data sequentially

177

await writer("First line\n")

178

await writer("Second line\n")

179

await writer("Third line\n")

180

181

# Writer automatically maintains offset

182

await afile.fdsync() # Ensure data is written

183

184

asyncio.run(sequential_writing())

185

```

186

187

### Line-by-Line Processing

188

189

```python

190

import asyncio

191

from aiofile import AIOFile, LineReader

192

193

async def line_processing():

194

async with AIOFile('data.txt', 'r') as afile:

195

# Default line reader (newline separator)

196

line_reader = LineReader(afile)

197

198

async for line in line_reader:

199

print(f"Line: {line.rstrip()}")

200

201

# Custom line separator

202

csv_reader = LineReader(afile, offset=0, line_sep="\n")

203

async for row in csv_reader:

204

fields = row.strip().split(',')

205

print(f"CSV fields: {fields}")

206

207

asyncio.run(line_processing())

208

```

209

210

### Binary File Streaming

211

212

```python

213

import asyncio

214

from aiofile import AIOFile, Reader

215

216

async def binary_streaming():

217

async with AIOFile('data.bin', 'rb') as afile:

218

reader = Reader(afile, chunk_size=4096)

219

220

total_bytes = 0

221

async for chunk in reader:

222

total_bytes += len(chunk)

223

# Process binary chunk

224

print(f"Processed {len(chunk)} bytes")

225

226

print(f"Total bytes processed: {total_bytes}")

227

228

asyncio.run(binary_streaming())

229

```

230

231

### Custom Line Separators

232

233

```python

234

import asyncio

235

from aiofile import AIOFile, LineReader

236

237

async def custom_separators():

238

async with AIOFile('windows_file.txt', 'r') as afile:

239

# Windows line endings

240

reader = LineReader(afile, line_sep="\r\n")

241

242

async for line in reader:

243

print(f"Windows line: {line.rstrip()}")

244

245

async with AIOFile('mac_file.txt', 'r') as afile:

246

# Classic Mac line endings

247

reader = LineReader(afile, line_sep="\r")

248

249

async for line in reader:

250

print(f"Mac line: {line.rstrip()}")

251

252

asyncio.run(custom_separators())

253

```

254

255

### Processing Large Files with Limited Memory

256

257

```python

258

import asyncio

259

from aiofile import AIOFile, Reader

260

261

async def memory_efficient_processing():

262

"""Process huge file without loading into memory."""

263

async with AIOFile('huge_file.txt', 'r') as afile:

264

reader = Reader(afile, chunk_size=64 * 1024) # 64KB chunks

265

266

word_count = 0

267

line_count = 0

268

269

async for chunk in reader:

270

# Count words and lines in chunk

271

word_count += len(chunk.split())

272

line_count += chunk.count('\n')

273

274

print(f"Words: {word_count}, Lines: {line_count}")

275

276

asyncio.run(memory_efficient_processing())

277

```

278

279

### Parallel Processing with Multiple Readers

280

281

```python

282

import asyncio

283

from aiofile import AIOFile, Reader

284

285

async def parallel_processing():

286

"""Process different parts of file in parallel."""

287

async with AIOFile('large_file.txt', 'r') as afile:

288

# Get file size

289

import os

290

file_size = os.path.getsize(afile.name)

291

chunk_size = file_size // 4 # Split into 4 parts

292

293

# Create readers for different file sections

294

readers = [

295

Reader(afile, offset=i * chunk_size, chunk_size=8192)

296

for i in range(4)

297

]

298

299

async def process_section(reader, section_id):

300

char_count = 0

301

async for chunk in reader:

302

char_count += len(chunk)

303

print(f"Section {section_id}: {char_count} characters")

304

305

# Process sections in parallel

306

await asyncio.gather(*[

307

process_section(reader, i)

308

for i, reader in enumerate(readers)

309

])

310

311

asyncio.run(parallel_processing())

312

```

313

314

### Writing with Multiple Writers

315

316

```python

317

import asyncio

318

from aiofile import AIOFile, Writer

319

320

async def multiple_writers():

321

"""Use multiple writers for different file sections."""

322

async with AIOFile('output.txt', 'w') as afile:

323

# Writers for different file positions

324

header_writer = Writer(afile, offset=0)

325

body_writer = Writer(afile, offset=100) # Leave space for header

326

327

# Write body first

328

await body_writer("This is the body content\n")

329

await body_writer("More body content\n")

330

331

# Write header

332

await header_writer("HEADER: Important document\n")

333

await header_writer("Created: 2024\n")

334

335

await afile.fdsync()

336

337

asyncio.run(multiple_writers())

338

```

339

340

### Streaming with Error Handling

341

342

```python

343

import asyncio

344

from aiofile import AIOFile, Reader, LineReader

345

346

async def robust_streaming():

347

try:

348

async with AIOFile('data.txt', 'r') as afile:

349

reader = Reader(afile, chunk_size=1024)

350

351

async for chunk in reader:

352

try:

353

# Process chunk

354

processed = chunk.upper()

355

print(f"Processed: {len(processed)} chars")

356

except Exception as e:

357

print(f"Error processing chunk: {e}")

358

continue

359

360

except FileNotFoundError:

361

print("File not found")

362

except PermissionError:

363

print("Permission denied")

364

365

asyncio.run(robust_streaming())

366

```

367

368

## Memory Efficiency

369

370

The streaming classes are designed for memory efficiency:

371

372

- **Reader**: Loads only one chunk at a time, suitable for files larger than available memory

373

- **Writer**: Writes data immediately, no internal buffering

374

- **LineReader**: Uses small internal buffer for line assembly, memory usage independent of line length

375

376

## Thread Safety

377

378

All streaming classes use internal locks to ensure thread safety:

379

380

- Multiple async tasks can safely use the same Reader/Writer/LineReader instance

381

- Operations are automatically serialized to maintain file position consistency

382

- No external locking required when sharing instances across tasks

383

384

## Constants

385

386

```python { .api }

387

# Default chunk sizes optimized for different use cases

388

Reader.CHUNK_SIZE = 32768 # 32KB - general purpose reading

389

LineReader.CHUNK_SIZE = 4192 # 4KB - line-oriented reading

390

391

# Encoding retry map for Unicode handling

392

ENCODING_MAP = {

393

"utf-8": 4, # Max 4-byte UTF-8 sequences

394

"utf-16": 8, # Max 8-byte UTF-16 sequences

395

"UTF-8": 4,

396

"UTF-16": 8,

397

}

398

```