0
# Async I/O Framework
1
2
High-performance asynchronous I/O framework providing event-driven network programming with platform-specific optimizations. pyftpdlib's I/O loop implements efficient polling mechanisms (epoll, kqueue, select) and provides network programming primitives for building scalable network services.
3
4
## Capabilities
5
6
### I/O Event Loop
7
8
Main event loop providing cross-platform asynchronous I/O with automatic selection of the best polling mechanism for each platform.
9
10
```python { .api }
11
class IOLoop:
12
# Event flags
13
READ: int = 1
14
WRITE: int = 2
15
16
@classmethod
17
def instance(cls):
18
"""
19
Get global IOLoop singleton instance.
20
21
Returns:
22
- Global IOLoop instance
23
"""
24
25
@classmethod
26
def factory(cls):
27
"""
28
Create new IOLoop instance.
29
30
Returns:
31
- New IOLoop instance with best poller for platform
32
"""
33
34
def register(self, fd, instance, events):
35
"""
36
Register file descriptor for I/O events.
37
38
Parameters:
39
- fd: file descriptor number
40
- instance: object to handle events (must have handle_read/handle_write methods)
41
- events: combination of READ and/or WRITE flags
42
"""
43
44
def unregister(self, fd):
45
"""
46
Unregister file descriptor from event loop.
47
48
Parameters:
49
- fd: file descriptor to remove
50
"""
51
52
def modify(self, fd, events):
53
"""
54
Modify events for registered file descriptor.
55
56
Parameters:
57
- fd: file descriptor to modify
58
- events: new event flags (READ, WRITE, or both)
59
"""
60
61
def poll(self, timeout):
62
"""
63
Poll for I/O events once.
64
65
Parameters:
66
- timeout: polling timeout in seconds (None = block indefinitely)
67
68
Returns:
69
- List of (fd, events) tuples for ready file descriptors
70
"""
71
72
def loop(self, timeout=None, blocking=True):
73
"""
74
Main event loop.
75
76
Parameters:
77
- timeout: overall loop timeout
78
- blocking: whether to block waiting for events
79
"""
80
81
def call_later(self, seconds, target, *args, **kwargs):
82
"""
83
Schedule function call after delay.
84
85
Parameters:
86
- seconds: delay in seconds
87
- target: function to call
88
- *args, **kwargs: arguments for function
89
90
Returns:
91
- _CallLater object (can be cancelled)
92
"""
93
94
def call_every(self, seconds, target, *args, **kwargs):
95
"""
96
Schedule periodic function calls.
97
98
Parameters:
99
- seconds: interval between calls
100
- target: function to call
101
- *args, **kwargs: arguments for function
102
103
Returns:
104
- _CallEvery object (can be cancelled)
105
"""
106
107
def close(self):
108
"""Close I/O loop and cleanup resources."""
109
```
110
111
### Network Base Classes
112
113
Base classes for implementing network protocols with asynchronous I/O support.
114
115
```python { .api }
116
class AsyncChat:
117
def __init__(self, sock=None, ioloop=None):
118
"""
119
Initialize async chat handler.
120
121
Parameters:
122
- sock: existing socket object (optional)
123
- ioloop: IOLoop instance to use (None = use global)
124
"""
125
126
# IOLoop integration
127
def add_channel(self, map=None, events=None):
128
"""Register with IOLoop for event handling."""
129
130
def del_channel(self, map=None, events=None):
131
"""Unregister from IOLoop."""
132
133
def modify_ioloop_events(self, events, logdebug=False):
134
"""
135
Change I/O events for this connection.
136
137
Parameters:
138
- events: new event flags (READ, WRITE, or both)
139
- logdebug: enable debug logging
140
"""
141
142
# Scheduling
143
def call_later(self, seconds, target, *args, **kwargs):
144
"""Schedule delayed function call via IOLoop."""
145
146
# Connection management
147
def connect(self, addr):
148
"""
149
Connect to remote address.
150
151
Parameters:
152
- addr: (host, port) tuple
153
"""
154
155
def connect_af_unspecified(self, addr, source_address=None):
156
"""
157
Connect with automatic address family detection.
158
159
Parameters:
160
- addr: (host, port) tuple
161
- source_address: local (host, port) to bind to
162
"""
163
164
# Data transfer
165
def send(self, data):
166
"""
167
Send data with error handling.
168
169
Parameters:
170
- data: bytes to send
171
172
Returns:
173
- Number of bytes sent
174
"""
175
176
def recv(self, buffer_size):
177
"""
178
Receive data with error handling.
179
180
Parameters:
181
- buffer_size: maximum bytes to receive
182
183
Returns:
184
- Received data bytes
185
"""
186
187
# Connection shutdown
188
def close_when_done(self):
189
"""Close connection after all data is sent."""
190
191
def close(self):
192
"""Close connection immediately."""
193
194
class Acceptor(AsyncChat):
195
def bind_af_unspecified(self, addr):
196
"""
197
Bind socket with automatic address family detection.
198
199
Parameters:
200
- addr: (host, port) tuple to bind to
201
"""
202
203
def listen(self, num):
204
"""
205
Start listening for connections.
206
207
Parameters:
208
- num: maximum queued connections
209
"""
210
211
def handle_accept(self):
212
"""Handle incoming connection (calls handle_accepted)."""
213
214
def handle_accepted(self, sock, addr):
215
"""
216
Process accepted connection.
217
218
Parameters:
219
- sock: new client socket
220
- addr: client address tuple
221
"""
222
223
class Connector(AsyncChat):
224
"""Client connection handler for outgoing connections."""
225
```
226
227
### Platform-specific Pollers
228
229
Optimized polling implementations for different platforms, automatically selected by IOLoop.
230
231
```python { .api }
232
class Select:
233
"""select()-based poller (POSIX/Windows)."""
234
235
class Poll:
236
"""poll()-based poller (POSIX)."""
237
238
class Epoll:
239
"""epoll()-based poller (Linux)."""
240
241
class Kqueue:
242
"""kqueue()-based poller (BSD/macOS)."""
243
244
class DevPoll:
245
"""/dev/poll-based poller (Solaris)."""
246
```
247
248
### Scheduler Classes
249
250
Internal classes for managing timed events and callbacks.
251
252
```python { .api }
253
class _CallLater:
254
"""Container for delayed function calls."""
255
256
def cancel(self):
257
"""Cancel scheduled call."""
258
259
class _CallEvery:
260
"""Container for periodic function calls."""
261
262
def cancel(self):
263
"""Cancel periodic calls."""
264
265
class _Scheduler:
266
"""Internal scheduler for timed events."""
267
```
268
269
## Exception Classes
270
271
```python { .api }
272
class RetryError(Exception):
273
"""Indicates that an operation should be retried."""
274
```
275
276
## Constants
277
278
```python { .api }
279
timer: callable # High-resolution timer function (time.monotonic or time.time)
280
_ERRNOS_DISCONNECTED: frozenset # Error codes indicating connection closed
281
_ERRNOS_RETRY: frozenset # Error codes indicating operation should be retried
282
```
283
284
## Usage Examples
285
286
### Basic Event Loop
287
288
```python
289
from pyftpdlib.ioloop import IOLoop
290
291
# Get global IOLoop instance
292
ioloop = IOLoop.instance()
293
294
# Schedule delayed execution
295
def delayed_task():
296
print("Task executed after 5 seconds")
297
298
callback = ioloop.call_later(5.0, delayed_task)
299
300
# Schedule periodic execution
301
def periodic_task():
302
print("Periodic task")
303
304
periodic = ioloop.call_every(10.0, periodic_task)
305
306
# Run event loop
307
ioloop.loop()
308
309
# Cancel scheduled tasks
310
callback.cancel()
311
periodic.cancel()
312
```
313
314
### Custom Network Handler
315
316
```python
317
from pyftpdlib.ioloop import AsyncChat, IOLoop
318
319
class EchoHandler(AsyncChat):
320
def __init__(self, sock, ioloop=None):
321
super().__init__(sock, ioloop)
322
self.set_terminator(b'\n')
323
self.buffer = []
324
325
def collect_incoming_data(self, data):
326
"""Collect incoming data."""
327
self.buffer.append(data)
328
329
def found_terminator(self):
330
"""Process complete line."""
331
line = b''.join(self.buffer)
332
self.buffer = []
333
334
# Echo the line back
335
self.push(b"Echo: " + line + b'\n')
336
337
def handle_close(self):
338
"""Handle connection close."""
339
print(f"Client {self.addr} disconnected")
340
341
class EchoServer(Acceptor):
342
def __init__(self, host, port):
343
super().__init__()
344
self.create_socket()
345
self.bind((host, port))
346
self.listen(5)
347
print(f"Echo server listening on {host}:{port}")
348
349
def handle_accepted(self, sock, addr):
350
"""Handle new client connection."""
351
print(f"New client connected from {addr}")
352
EchoHandler(sock)
353
354
# Start echo server
355
server = EchoServer('localhost', 8888)
356
IOLoop.instance().loop()
357
```
358
359
### Client Connection
360
361
```python
362
from pyftpdlib.ioloop import Connector
363
364
class ClientHandler(Connector):
365
def __init__(self, host, port):
366
super().__init__()
367
self.create_socket()
368
self.connect((host, port))
369
370
def handle_connect(self):
371
"""Called when connection is established."""
372
print("Connected to server")
373
self.send(b"Hello server\n")
374
375
def handle_read(self):
376
"""Handle incoming data."""
377
data = self.recv(1024)
378
print(f"Received: {data.decode()}")
379
380
def handle_close(self):
381
"""Handle connection close."""
382
print("Connection closed")
383
384
# Connect to server
385
client = ClientHandler('localhost', 8888)
386
IOLoop.instance().loop()
387
```
388
389
### Server with Multiple Ports
390
391
```python
392
class MultiPortServer:
393
def __init__(self):
394
self.ioloop = IOLoop.instance()
395
self.servers = []
396
397
def add_server(self, handler_class, host, port):
398
"""Add server on specific port."""
399
server = handler_class(host, port)
400
self.servers.append(server)
401
return server
402
403
def start(self):
404
"""Start all servers."""
405
self.ioloop.loop()
406
407
def stop(self):
408
"""Stop all servers."""
409
for server in self.servers:
410
server.close()
411
412
# Setup multiple servers
413
multi_server = MultiPortServer()
414
multi_server.add_server(EchoServer, 'localhost', 8888)
415
multi_server.add_server(EchoServer, 'localhost', 8889)
416
multi_server.start()
417
```
418
419
### Integration with FTP Components
420
421
```python
422
from pyftpdlib.servers import FTPServer
423
from pyftpdlib.ioloop import IOLoop
424
425
# Create FTP server with custom IOLoop
426
ioloop = IOLoop.factory() # Create new IOLoop instance
427
server = FTPServer(("0.0.0.0", 21), handler, ioloop=ioloop)
428
429
# Schedule periodic maintenance
430
def cleanup_task():
431
print("Performing cleanup...")
432
433
ioloop.call_every(3600, cleanup_task) # Run every hour
434
435
# Start server
436
server.serve_forever()
437
```
438
439
### Error Handling
440
441
```python
442
from pyftpdlib.ioloop import RetryError
443
import errno
444
445
class RobustHandler(AsyncChat):
446
def handle_read(self):
447
try:
448
data = self.recv(1024)
449
self.process_data(data)
450
except socket.error as err:
451
if err.errno in (errno.EAGAIN, errno.EWOULDBLOCK):
452
# Would block - normal for non-blocking sockets
453
return
454
elif err.errno in (errno.ECONNRESET, errno.EPIPE):
455
# Connection closed by peer
456
self.handle_close()
457
else:
458
# Other error - log and close
459
print(f"Socket error: {err}")
460
self.close()
461
except RetryError:
462
# Retry the operation later
463
self.call_later(0.1, self.handle_read)
464
```
465
466
### Performance Monitoring
467
468
```python
469
import time
470
471
class MonitoredIOLoop(IOLoop):
472
def __init__(self):
473
super().__init__()
474
self.start_time = time.time()
475
self.poll_count = 0
476
477
def poll(self, timeout):
478
"""Monitor polling performance."""
479
start = time.time()
480
result = super().poll(timeout)
481
poll_time = time.time() - start
482
483
self.poll_count += 1
484
if self.poll_count % 1000 == 0:
485
uptime = time.time() - self.start_time
486
print(f"Uptime: {uptime:.1f}s, Polls: {self.poll_count}, "
487
f"Last poll: {poll_time*1000:.1f}ms")
488
489
return result
490
491
# Use monitored IOLoop
492
ioloop = MonitoredIOLoop()
493
server = FTPServer(("0.0.0.0", 21), handler, ioloop=ioloop)
494
server.serve_forever()
495
```
496
497
## Platform Optimization
498
499
- **Linux**: Automatically uses epoll() for best performance with many connections
500
- **BSD/macOS**: Uses kqueue() for efficient event notification
501
- **Windows**: Falls back to select() with IOCP-style handling where possible
502
- **Solaris**: Uses /dev/poll for scalable I/O multiplexing
503
504
## Performance Considerations
505
506
- **IOLoop.instance()**: Returns singleton - use for shared event loop across components
507
- **IOLoop.factory()**: Creates new instance - use for isolated event loops
508
- **Event Registration**: Minimize register/unregister operations for better performance
509
- **Timeouts**: Use appropriate timeout values to balance responsiveness and efficiency