0
# Path I/O Abstractions
1
2
Filesystem abstraction layer supporting synchronous, asynchronous, and in-memory implementations. This system enables custom backends, testing with memory-based filesystems, and provides a unified interface for different I/O patterns while maintaining compatibility across various deployment scenarios.
3
4
## Capabilities
5
6
### Abstract Path I/O Interface
7
8
Base interface defining the contract for filesystem operations.
9
10
```python { .api }
11
class AbstractPathIO(Generic[PathType]):
12
"""
13
Abstract base class for filesystem operations.
14
15
This generic class defines the interface that all PathIO implementations
16
must follow. PathType can be Path, PurePosixPath, or other path types.
17
"""
18
19
async def exists(self, path: PathType) -> bool:
20
"""
21
Check if path exists.
22
23
Parameters:
24
- path: Path to check for existence
25
26
Returns:
27
True if path exists, False otherwise
28
"""
29
30
async def is_file(self, path: PathType) -> bool:
31
"""
32
Check if path is a file.
33
34
Parameters:
35
- path: Path to check
36
37
Returns:
38
True if path exists and is a file, False otherwise
39
"""
40
41
async def is_dir(self, path: PathType) -> bool:
42
"""
43
Check if path is a directory.
44
45
Parameters:
46
- path: Path to check
47
48
Returns:
49
True if path exists and is a directory, False otherwise
50
"""
51
52
async def mkdir(self, path: PathType, parents: bool = False, exist_ok: bool = False) -> None:
53
"""
54
Create directory.
55
56
Parameters:
57
- path: Directory path to create
58
- parents: Create parent directories if they don't exist
59
- exist_ok: Don't raise error if directory already exists
60
"""
61
62
async def rmdir(self, path: PathType) -> None:
63
"""
64
Remove directory.
65
66
Parameters:
67
- path: Directory path to remove (must be empty)
68
"""
69
70
async def unlink(self, path: PathType) -> None:
71
"""
72
Remove file.
73
74
Parameters:
75
- path: File path to remove
76
"""
77
78
def list(self, path: PathType) -> AsyncIterable[PathType]:
79
"""
80
List directory contents.
81
82
Parameters:
83
- path: Directory path to list
84
85
Returns:
86
Async iterator yielding paths in directory
87
"""
88
89
async def stat(self, path: PathType) -> os.stat_result:
90
"""
91
Get file statistics.
92
93
Parameters:
94
- path: Path to get statistics for
95
96
Returns:
97
Stat result with file information (size, timestamps, etc.)
98
"""
99
100
async def _open(self, path: PathType, mode: str) -> io.BytesIO:
101
"""
102
Open file for reading/writing (internal method).
103
104
Parameters:
105
- path: File path to open
106
- mode: Open mode ("rb", "wb", "ab", etc.)
107
108
Returns:
109
File-like object for I/O operations
110
"""
111
112
async def seek(self, file: io.BytesIO, offset: int, whence: int = io.SEEK_SET) -> int:
113
"""
114
Seek to position in file.
115
116
Parameters:
117
- file: File object to seek in
118
- offset: Byte offset to seek to
119
- whence: Seek reference point (SEEK_SET, SEEK_CUR, SEEK_END)
120
121
Returns:
122
New file position
123
"""
124
125
async def write(self, file: io.BytesIO, data: bytes) -> int:
126
"""
127
Write data to file.
128
129
Parameters:
130
- file: File object to write to
131
- data: Bytes to write
132
133
Returns:
134
Number of bytes written
135
"""
136
137
async def read(self, file: io.BytesIO, block_size: int) -> bytes:
138
"""
139
Read data from file.
140
141
Parameters:
142
- file: File object to read from
143
- block_size: Maximum bytes to read
144
145
Returns:
146
Bytes read from file
147
"""
148
149
async def close(self, file: io.BytesIO) -> None:
150
"""
151
Close file.
152
153
Parameters:
154
- file: File object to close
155
"""
156
157
async def rename(self, source: PathType, destination: PathType) -> PathType:
158
"""
159
Rename/move file or directory.
160
161
Parameters:
162
- source: Current path
163
- destination: New path
164
165
Returns:
166
Destination path
167
"""
168
169
def open(self, path: PathType, mode: str = "rb") -> AsyncPathIOContext:
170
"""
171
Open file with context manager support.
172
173
Parameters:
174
- path: File path to open
175
- mode: Open mode ("rb", "wb", "ab", etc.)
176
177
Returns:
178
Async context manager for file operations
179
"""
180
```
181
182
### Synchronous Path I/O
183
184
Direct filesystem operations using blocking I/O.
185
186
```python { .api }
187
class PathIO(AbstractPathIO[Path]):
188
"""
189
Synchronous filesystem operations using pathlib.Path.
190
191
This implementation uses blocking filesystem operations directly.
192
Best for simple scenarios where blocking I/O is acceptable.
193
"""
194
195
def __init__(self, timeout: float = None, connection=None, state=None):
196
"""
197
Initialize PathIO.
198
199
Parameters:
200
- timeout: Operation timeout (not used in sync implementation)
201
- connection: Connection context (not used in sync implementation)
202
- state: Shared state (not used in sync implementation)
203
"""
204
205
# All abstract methods implemented using synchronous pathlib operations
206
# Methods maintain same signatures as AbstractPathIO but use blocking calls
207
```
208
209
### Asynchronous Path I/O
210
211
Non-blocking filesystem operations via thread executor.
212
213
```python { .api }
214
class AsyncPathIO(AbstractPathIO[Path]):
215
"""
216
Asynchronous filesystem operations via executor.
217
218
This implementation uses run_in_executor to make blocking filesystem
219
operations non-blocking. Best for high-concurrency scenarios.
220
"""
221
222
executor: Union[Executor, None]
223
"""Thread executor for running blocking operations."""
224
225
def __init__(self, timeout: float = None, connection=None, state=None):
226
"""
227
Initialize AsyncPathIO.
228
229
Parameters:
230
- timeout: Operation timeout in seconds
231
- connection: Connection context for shared state
232
- state: Shared state dictionary
233
"""
234
235
# All abstract methods implemented using run_in_executor
236
# for non-blocking filesystem operations
237
```
238
239
### In-Memory Path I/O
240
241
Memory-based filesystem simulation for testing.
242
243
```python { .api }
244
class MemoryPathIO(AbstractPathIO[PurePosixPath]):
245
"""
246
In-memory filesystem for testing and development.
247
248
This implementation simulates a filesystem entirely in memory using
249
Node objects. Perfect for testing without touching real filesystem.
250
"""
251
252
def __init__(self, timeout: float = None, connection=None, state=None):
253
"""
254
Initialize MemoryPathIO.
255
256
Parameters:
257
- timeout: Operation timeout (not used in memory implementation)
258
- connection: Connection context for shared state
259
- state: Shared state dictionary containing filesystem tree
260
"""
261
262
def get_node(self, path: PurePosixPath) -> Union[Node, None]:
263
"""
264
Get internal node object for path.
265
266
Parameters:
267
- path: Path to get node for
268
269
Returns:
270
Node object or None if path doesn't exist
271
"""
272
273
# All abstract methods implemented using in-memory Node tree
274
# Simulates complete filesystem behavior without disk I/O
275
```
276
277
### Path I/O Factory
278
279
Factory class for creating PathIO instances with shared configuration.
280
281
```python { .api }
282
class PathIONursery(Generic[PathIOType]):
283
"""
284
Factory for creating PathIO instances with shared state.
285
286
Enables creating multiple PathIO instances that share configuration
287
and state, useful for server implementations with multiple connections.
288
"""
289
290
def __init__(self, factory: type[PathIOType]):
291
"""
292
Initialize factory.
293
294
Parameters:
295
- factory: PathIO class to instantiate (PathIO, AsyncPathIO, MemoryPathIO)
296
"""
297
298
def __call__(self, timeout: float = None, connection=None, state=None) -> PathIOType:
299
"""
300
Create new PathIO instance.
301
302
Parameters:
303
- timeout: Operation timeout for this instance
304
- connection: Connection context
305
- state: Shared state dictionary
306
307
Returns:
308
Configured PathIO instance
309
"""
310
```
311
312
### Context Manager Support
313
314
Async context manager for file operations.
315
316
```python { .api }
317
class AsyncPathIOContext:
318
"""Async context manager for file operations."""
319
320
async def __aenter__(self) -> io.BytesIO:
321
"""
322
Enter context and return file object.
323
324
Returns:
325
File-like object for I/O operations
326
"""
327
328
async def __aexit__(exc_type, exc_val, exc_tb) -> None:
329
"""Exit context and close file."""
330
```
331
332
## Usage Examples
333
334
### Basic File Operations
335
336
```python
337
import aioftp
338
import asyncio
339
from pathlib import Path
340
341
async def basic_file_ops():
342
"""Example of basic file operations with different PathIO types."""
343
344
# Synchronous PathIO
345
sync_pathio = aioftp.PathIO()
346
347
# Check if file exists
348
path = Path("test_file.txt")
349
if await sync_pathio.exists(path):
350
print("File exists")
351
352
# Get file stats
353
stats = await sync_pathio.stat(path)
354
print(f"File size: {stats.st_size} bytes")
355
356
# Read file content
357
async with sync_pathio.open(path, "rb") as f:
358
content = await sync_pathio.read(f, 1024)
359
print(f"Content: {content.decode()}")
360
361
# Create new file
362
async with sync_pathio.open(Path("new_file.txt"), "wb") as f:
363
await sync_pathio.write(f, b"Hello, World!")
364
365
asyncio.run(basic_file_ops())
366
```
367
368
### Asynchronous Operations
369
370
```python
371
import aioftp
372
import asyncio
373
from pathlib import Path
374
375
async def async_file_ops():
376
"""Example using AsyncPathIO for non-blocking operations."""
377
378
# Asynchronous PathIO
379
async_pathio = aioftp.AsyncPathIO(timeout=30.0)
380
381
# Multiple concurrent operations
382
tasks = []
383
384
for i in range(10):
385
path = Path(f"file_{i}.txt")
386
tasks.append(create_file(async_pathio, path, f"Content {i}"))
387
388
# Run all operations concurrently
389
await asyncio.gather(*tasks)
390
391
# List directory contents
392
async for item in async_pathio.list(Path(".")):
393
if await async_pathio.is_file(item):
394
stats = await async_pathio.stat(item)
395
print(f"File: {item} ({stats.st_size} bytes)")
396
397
async def create_file(pathio, path, content):
398
"""Helper to create file with content."""
399
async with pathio.open(path, "wb") as f:
400
await pathio.write(f, content.encode())
401
402
asyncio.run(async_file_ops())
403
```
404
405
### In-Memory Testing
406
407
```python
408
import aioftp
409
import asyncio
410
from pathlib import PurePosixPath
411
412
async def memory_filesystem_test():
413
"""Example using MemoryPathIO for testing."""
414
415
# Create in-memory filesystem
416
memory_pathio = aioftp.MemoryPathIO()
417
418
# Create directory structure
419
await memory_pathio.mkdir(PurePosixPath("/home"))
420
await memory_pathio.mkdir(PurePosixPath("/home/user"), parents=True)
421
422
# Create files
423
file_path = PurePosixPath("/home/user/test.txt")
424
async with memory_pathio.open(file_path, "wb") as f:
425
await memory_pathio.write(f, b"Test content")
426
427
# Verify operations
428
assert await memory_pathio.exists(file_path)
429
assert await memory_pathio.is_file(file_path)
430
431
# Read back content
432
async with memory_pathio.open(file_path, "rb") as f:
433
content = await memory_pathio.read(f, 1024)
434
assert content == b"Test content"
435
436
# List directory
437
items = []
438
async for item in memory_pathio.list(PurePosixPath("/home/user")):
439
items.append(item)
440
441
print(f"Directory contains: {items}")
442
443
asyncio.run(memory_filesystem_test())
444
```
445
446
### Custom PathIO Factory
447
448
```python
449
import aioftp
450
import asyncio
451
from pathlib import Path
452
453
async def factory_example():
454
"""Example using PathIONursery factory."""
455
456
# Create factory for AsyncPathIO
457
pathio_factory = aioftp.PathIONursery(aioftp.AsyncPathIO)
458
459
# Create multiple instances with shared configuration
460
pathio1 = pathio_factory(timeout=10.0)
461
pathio2 = pathio_factory(timeout=20.0)
462
463
# Use instances independently
464
await pathio1.mkdir(Path("temp1"), exist_ok=True)
465
await pathio2.mkdir(Path("temp2"), exist_ok=True)
466
467
# Both share the same base configuration but can have different timeouts
468
async with pathio1.open(Path("temp1/file1.txt"), "wb") as f:
469
await pathio1.write(f, b"File 1 content")
470
471
async with pathio2.open(Path("temp2/file2.txt"), "wb") as f:
472
await pathio2.write(f, b"File 2 content")
473
474
asyncio.run(factory_example())
475
```
476
477
### FTP Server Integration
478
479
```python
480
import aioftp
481
import asyncio
482
from pathlib import Path
483
484
async def server_with_custom_pathio():
485
"""Example FTP server using custom PathIO."""
486
487
# Use AsyncPathIO for non-blocking filesystem operations
488
pathio_factory = aioftp.PathIONursery(aioftp.AsyncPathIO)
489
490
# Create user with custom PathIO
491
user = aioftp.User(
492
login="testuser",
493
password="testpass",
494
base_path=Path("/srv/ftp"),
495
permissions=[
496
aioftp.Permission("/", readable=True, writable=True)
497
]
498
)
499
500
# Server with custom PathIO factory
501
server = aioftp.Server(
502
users=[user],
503
path_io_factory=pathio_factory,
504
path_timeout=30.0 # 30 second timeout for filesystem ops
505
)
506
507
await server.run(host="localhost", port=2121)
508
509
# Uncomment to run server
510
# asyncio.run(server_with_custom_pathio())
511
```
512
513
### Advanced Memory Testing
514
515
```python
516
import aioftp
517
import asyncio
518
from pathlib import PurePosixPath
519
520
class TestableMemoryPathIO(aioftp.MemoryPathIO):
521
"""Extended MemoryPathIO with testing utilities."""
522
523
def __init__(self, *args, **kwargs):
524
super().__init__(*args, **kwargs)
525
self.operation_count = 0
526
527
async def exists(self, path):
528
"""Track operation calls."""
529
self.operation_count += 1
530
return await super().exists(path)
531
532
def get_filesystem_tree(self):
533
"""Get current filesystem state for testing."""
534
return self.state if hasattr(self, 'state') else {}
535
536
async def advanced_memory_testing():
537
"""Advanced testing with custom MemoryPathIO."""
538
539
# Create testable filesystem
540
pathio = TestableMemoryPathIO()
541
542
# Set up test scenario
543
test_paths = [
544
PurePosixPath("/app"),
545
PurePosixPath("/app/data"),
546
PurePosixPath("/app/logs"),
547
PurePosixPath("/app/data/file1.txt"),
548
PurePosixPath("/app/data/file2.txt"),
549
]
550
551
# Create directory structure
552
for path in test_paths:
553
if path.suffix: # It's a file
554
async with pathio.open(path, "wb") as f:
555
await pathio.write(f, f"Content for {path.name}".encode())
556
else: # It's a directory
557
await pathio.mkdir(path, parents=True, exist_ok=True)
558
559
# Verify structure
560
for path in test_paths:
561
assert await pathio.exists(path)
562
if path.suffix:
563
assert await pathio.is_file(path)
564
else:
565
assert await pathio.is_dir(path)
566
567
print(f"Performed {pathio.operation_count} filesystem operations")
568
print("All tests passed!")
569
570
asyncio.run(advanced_memory_testing())
571
```
572
573
## Implementation Notes
574
575
### PathIO Types Comparison
576
577
| Feature | PathIO | AsyncPathIO | MemoryPathIO |
578
|---------|--------|-------------|--------------|
579
| Blocking I/O | Yes | No | N/A |
580
| Concurrency | Limited | High | High |
581
| Real Filesystem | Yes | Yes | No |
582
| Testing Suitable | No | Partial | Yes |
583
| Performance | Good | Better | Best |
584
| Memory Usage | Low | Low | Variable |
585
586
### Best Practices
587
588
1. **Use AsyncPathIO** for servers with multiple concurrent connections
589
2. **Use PathIO** for simple scripts or single-connection scenarios
590
3. **Use MemoryPathIO** for testing and development
591
4. **Set appropriate timeouts** for filesystem operations
592
5. **Handle PathIOError exceptions** for robust error handling
593
6. **Use PathIONursery** when you need multiple instances with shared config
594
595
### Performance Considerations
596
597
1. **AsyncPathIO**: Best for I/O-bound applications with high concurrency
598
2. **PathIO**: Lower overhead but blocks event loop
599
3. **MemoryPathIO**: Fastest but limited by available memory
600
4. **Factory pattern**: Minimal overhead for creating multiple instances
601
5. **Timeout settings**: Balance responsiveness with operation completion time