or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authorizers.mdfilesystems.mdhandlers.mdindex.mdioloop.mdservers.mdutilities.md

ioloop.mddocs/

0

# Async I/O Framework

1

2

High-performance asynchronous I/O framework providing event-driven network programming with platform-specific optimizations. pyftpdlib's I/O loop implements efficient polling mechanisms (epoll, kqueue, select) and provides network programming primitives for building scalable network services.

3

4

## Capabilities

5

6

### I/O Event Loop

7

8

Main event loop providing cross-platform asynchronous I/O with automatic selection of the best polling mechanism for each platform.

9

10

```python { .api }

11

class IOLoop:

12

# Event flags

13

READ: int = 1

14

WRITE: int = 2

15

16

@classmethod

17

def instance(cls):

18

"""

19

Get global IOLoop singleton instance.

20

21

Returns:

22

- Global IOLoop instance

23

"""

24

25

@classmethod

26

def factory(cls):

27

"""

28

Create new IOLoop instance.

29

30

Returns:

31

- New IOLoop instance with best poller for platform

32

"""

33

34

def register(self, fd, instance, events):

35

"""

36

Register file descriptor for I/O events.

37

38

Parameters:

39

- fd: file descriptor number

40

- instance: object to handle events (must have handle_read/handle_write methods)

41

- events: combination of READ and/or WRITE flags

42

"""

43

44

def unregister(self, fd):

45

"""

46

Unregister file descriptor from event loop.

47

48

Parameters:

49

- fd: file descriptor to remove

50

"""

51

52

def modify(self, fd, events):

53

"""

54

Modify events for registered file descriptor.

55

56

Parameters:

57

- fd: file descriptor to modify

58

- events: new event flags (READ, WRITE, or both)

59

"""

60

61

def poll(self, timeout):

62

"""

63

Poll for I/O events once.

64

65

Parameters:

66

- timeout: polling timeout in seconds (None = block indefinitely)

67

68

Returns:

69

- List of (fd, events) tuples for ready file descriptors

70

"""

71

72

def loop(self, timeout=None, blocking=True):

73

"""

74

Main event loop.

75

76

Parameters:

77

- timeout: overall loop timeout

78

- blocking: whether to block waiting for events

79

"""

80

81

def call_later(self, seconds, target, *args, **kwargs):

82

"""

83

Schedule function call after delay.

84

85

Parameters:

86

- seconds: delay in seconds

87

- target: function to call

88

- *args, **kwargs: arguments for function

89

90

Returns:

91

- _CallLater object (can be cancelled)

92

"""

93

94

def call_every(self, seconds, target, *args, **kwargs):

95

"""

96

Schedule periodic function calls.

97

98

Parameters:

99

- seconds: interval between calls

100

- target: function to call

101

- *args, **kwargs: arguments for function

102

103

Returns:

104

- _CallEvery object (can be cancelled)

105

"""

106

107

def close(self):

108

"""Close I/O loop and cleanup resources."""

109

```

110

111

### Network Base Classes

112

113

Base classes for implementing network protocols with asynchronous I/O support.

114

115

```python { .api }

116

class AsyncChat:

117

def __init__(self, sock=None, ioloop=None):

118

"""

119

Initialize async chat handler.

120

121

Parameters:

122

- sock: existing socket object (optional)

123

- ioloop: IOLoop instance to use (None = use global)

124

"""

125

126

# IOLoop integration

127

def add_channel(self, map=None, events=None):

128

"""Register with IOLoop for event handling."""

129

130

def del_channel(self, map=None, events=None):

131

"""Unregister from IOLoop."""

132

133

def modify_ioloop_events(self, events, logdebug=False):

134

"""

135

Change I/O events for this connection.

136

137

Parameters:

138

- events: new event flags (READ, WRITE, or both)

139

- logdebug: enable debug logging

140

"""

141

142

# Scheduling

143

def call_later(self, seconds, target, *args, **kwargs):

144

"""Schedule delayed function call via IOLoop."""

145

146

# Connection management

147

def connect(self, addr):

148

"""

149

Connect to remote address.

150

151

Parameters:

152

- addr: (host, port) tuple

153

"""

154

155

def connect_af_unspecified(self, addr, source_address=None):

156

"""

157

Connect with automatic address family detection.

158

159

Parameters:

160

- addr: (host, port) tuple

161

- source_address: local (host, port) to bind to

162

"""

163

164

# Data transfer

165

def send(self, data):

166

"""

167

Send data with error handling.

168

169

Parameters:

170

- data: bytes to send

171

172

Returns:

173

- Number of bytes sent

174

"""

175

176

def recv(self, buffer_size):

177

"""

178

Receive data with error handling.

179

180

Parameters:

181

- buffer_size: maximum bytes to receive

182

183

Returns:

184

- Received data bytes

185

"""

186

187

# Connection shutdown

188

def close_when_done(self):

189

"""Close connection after all data is sent."""

190

191

def close(self):

192

"""Close connection immediately."""

193

194

class Acceptor(AsyncChat):

195

def bind_af_unspecified(self, addr):

196

"""

197

Bind socket with automatic address family detection.

198

199

Parameters:

200

- addr: (host, port) tuple to bind to

201

"""

202

203

def listen(self, num):

204

"""

205

Start listening for connections.

206

207

Parameters:

208

- num: maximum queued connections

209

"""

210

211

def handle_accept(self):

212

"""Handle incoming connection (calls handle_accepted)."""

213

214

def handle_accepted(self, sock, addr):

215

"""

216

Process accepted connection.

217

218

Parameters:

219

- sock: new client socket

220

- addr: client address tuple

221

"""

222

223

class Connector(AsyncChat):

224

"""Client connection handler for outgoing connections."""

225

```

226

227

### Platform-specific Pollers

228

229

Optimized polling implementations for different platforms, automatically selected by IOLoop.

230

231

```python { .api }

232

class Select:

233

"""select()-based poller (POSIX/Windows)."""

234

235

class Poll:

236

"""poll()-based poller (POSIX)."""

237

238

class Epoll:

239

"""epoll()-based poller (Linux)."""

240

241

class Kqueue:

242

"""kqueue()-based poller (BSD/macOS)."""

243

244

class DevPoll:

245

"""/dev/poll-based poller (Solaris)."""

246

```

247

248

### Scheduler Classes

249

250

Internal classes for managing timed events and callbacks.

251

252

```python { .api }

253

class _CallLater:

254

"""Container for delayed function calls."""

255

256

def cancel(self):

257

"""Cancel scheduled call."""

258

259

class _CallEvery:

260

"""Container for periodic function calls."""

261

262

def cancel(self):

263

"""Cancel periodic calls."""

264

265

class _Scheduler:

266

"""Internal scheduler for timed events."""

267

```

268

269

## Exception Classes

270

271

```python { .api }

272

class RetryError(Exception):

273

"""Indicates that an operation should be retried."""

274

```

275

276

## Constants

277

278

```python { .api }

279

timer: callable # High-resolution timer function (time.monotonic or time.time)

280

_ERRNOS_DISCONNECTED: frozenset # Error codes indicating connection closed

281

_ERRNOS_RETRY: frozenset # Error codes indicating operation should be retried

282

```

283

284

## Usage Examples

285

286

### Basic Event Loop

287

288

```python

289

from pyftpdlib.ioloop import IOLoop

290

291

# Get global IOLoop instance

292

ioloop = IOLoop.instance()

293

294

# Schedule delayed execution

295

def delayed_task():

296

print("Task executed after 5 seconds")

297

298

callback = ioloop.call_later(5.0, delayed_task)

299

300

# Schedule periodic execution

301

def periodic_task():

302

print("Periodic task")

303

304

periodic = ioloop.call_every(10.0, periodic_task)

305

306

# Run event loop

307

ioloop.loop()

308

309

# Cancel scheduled tasks

310

callback.cancel()

311

periodic.cancel()

312

```

313

314

### Custom Network Handler

315

316

```python

317

from pyftpdlib.ioloop import AsyncChat, IOLoop

318

319

class EchoHandler(AsyncChat):

320

def __init__(self, sock, ioloop=None):

321

super().__init__(sock, ioloop)

322

self.set_terminator(b'\n')

323

self.buffer = []

324

325

def collect_incoming_data(self, data):

326

"""Collect incoming data."""

327

self.buffer.append(data)

328

329

def found_terminator(self):

330

"""Process complete line."""

331

line = b''.join(self.buffer)

332

self.buffer = []

333

334

# Echo the line back

335

self.push(b"Echo: " + line + b'\n')

336

337

def handle_close(self):

338

"""Handle connection close."""

339

print(f"Client {self.addr} disconnected")

340

341

class EchoServer(Acceptor):

342

def __init__(self, host, port):

343

super().__init__()

344

self.create_socket()

345

self.bind((host, port))

346

self.listen(5)

347

print(f"Echo server listening on {host}:{port}")

348

349

def handle_accepted(self, sock, addr):

350

"""Handle new client connection."""

351

print(f"New client connected from {addr}")

352

EchoHandler(sock)

353

354

# Start echo server

355

server = EchoServer('localhost', 8888)

356

IOLoop.instance().loop()

357

```

358

359

### Client Connection

360

361

```python

362

from pyftpdlib.ioloop import Connector

363

364

class ClientHandler(Connector):

365

def __init__(self, host, port):

366

super().__init__()

367

self.create_socket()

368

self.connect((host, port))

369

370

def handle_connect(self):

371

"""Called when connection is established."""

372

print("Connected to server")

373

self.send(b"Hello server\n")

374

375

def handle_read(self):

376

"""Handle incoming data."""

377

data = self.recv(1024)

378

print(f"Received: {data.decode()}")

379

380

def handle_close(self):

381

"""Handle connection close."""

382

print("Connection closed")

383

384

# Connect to server

385

client = ClientHandler('localhost', 8888)

386

IOLoop.instance().loop()

387

```

388

389

### Server with Multiple Ports

390

391

```python

392

class MultiPortServer:

393

def __init__(self):

394

self.ioloop = IOLoop.instance()

395

self.servers = []

396

397

def add_server(self, handler_class, host, port):

398

"""Add server on specific port."""

399

server = handler_class(host, port)

400

self.servers.append(server)

401

return server

402

403

def start(self):

404

"""Start all servers."""

405

self.ioloop.loop()

406

407

def stop(self):

408

"""Stop all servers."""

409

for server in self.servers:

410

server.close()

411

412

# Setup multiple servers

413

multi_server = MultiPortServer()

414

multi_server.add_server(EchoServer, 'localhost', 8888)

415

multi_server.add_server(EchoServer, 'localhost', 8889)

416

multi_server.start()

417

```

418

419

### Integration with FTP Components

420

421

```python

422

from pyftpdlib.servers import FTPServer

423

from pyftpdlib.ioloop import IOLoop

424

425

# Create FTP server with custom IOLoop

426

ioloop = IOLoop.factory() # Create new IOLoop instance

427

server = FTPServer(("0.0.0.0", 21), handler, ioloop=ioloop)

428

429

# Schedule periodic maintenance

430

def cleanup_task():

431

print("Performing cleanup...")

432

433

ioloop.call_every(3600, cleanup_task) # Run every hour

434

435

# Start server

436

server.serve_forever()

437

```

438

439

### Error Handling

440

441

```python

442

from pyftpdlib.ioloop import RetryError

443

import errno

444

445

class RobustHandler(AsyncChat):

446

def handle_read(self):

447

try:

448

data = self.recv(1024)

449

self.process_data(data)

450

except socket.error as err:

451

if err.errno in (errno.EAGAIN, errno.EWOULDBLOCK):

452

# Would block - normal for non-blocking sockets

453

return

454

elif err.errno in (errno.ECONNRESET, errno.EPIPE):

455

# Connection closed by peer

456

self.handle_close()

457

else:

458

# Other error - log and close

459

print(f"Socket error: {err}")

460

self.close()

461

except RetryError:

462

# Retry the operation later

463

self.call_later(0.1, self.handle_read)

464

```

465

466

### Performance Monitoring

467

468

```python

469

import time

470

471

class MonitoredIOLoop(IOLoop):

472

def __init__(self):

473

super().__init__()

474

self.start_time = time.time()

475

self.poll_count = 0

476

477

def poll(self, timeout):

478

"""Monitor polling performance."""

479

start = time.time()

480

result = super().poll(timeout)

481

poll_time = time.time() - start

482

483

self.poll_count += 1

484

if self.poll_count % 1000 == 0:

485

uptime = time.time() - self.start_time

486

print(f"Uptime: {uptime:.1f}s, Polls: {self.poll_count}, "

487

f"Last poll: {poll_time*1000:.1f}ms")

488

489

return result

490

491

# Use monitored IOLoop

492

ioloop = MonitoredIOLoop()

493

server = FTPServer(("0.0.0.0", 21), handler, ioloop=ioloop)

494

server.serve_forever()

495

```

496

497

## Platform Optimization

498

499

- **Linux**: Automatically uses epoll() for best performance with many connections

500

- **BSD/macOS**: Uses kqueue() for efficient event notification

501

- **Windows**: Falls back to select() with IOCP-style handling where possible

502

- **Solaris**: Uses /dev/poll for scalable I/O multiplexing

503

504

## Performance Considerations

505

506

- **IOLoop.instance()**: Returns singleton - use for shared event loop across components

507

- **IOLoop.factory()**: Creates new instance - use for isolated event loops

508

- **Event Registration**: Minimize register/unregister operations for better performance

509

- **Timeouts**: Use appropriate timeout values to balance responsiveness and efficiency