or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application.mdasync-io.mdauth.mddistributed.mddns.mdemail.mdhttp.mdindex.mdlogging.mdprotocols.mdssh.mdtesting.md

async-io.mddocs/

0

# Asynchronous I/O and Reactors

1

2

Core event loop and asynchronous programming primitives that form the foundation of Twisted's networking capabilities. The reactor provides the central event loop, while Deferreds manage asynchronous operations and callbacks.

3

4

## Capabilities

5

6

### Deferred Objects

7

8

Deferred objects are Twisted's implementation of promises/futures for managing asynchronous operations. They allow chaining callbacks and error handlers for non-blocking operations.

9

10

```python { .api }

11

class defer.Deferred:

12

"""

13

A Deferred object represents a value that may not be available yet.

14

15

Methods:

16

- addCallback(callback): Add success callback

17

- addErrback(errback): Add error callback

18

- addBoth(callback): Add callback for both success and error

19

- callback(result): Fire with success result

20

- errback(failure): Fire with error

21

- cancel(): Cancel the deferred

22

"""

23

def addCallback(self, callback, *args, **kwargs): ...

24

def addErrback(self, errback, *args, **kwargs): ...

25

def addBoth(self, callback, *args, **kwargs): ...

26

def callback(self, result): ...

27

def errback(self, failure): ...

28

def cancel(self): ...

29

30

def defer.succeed(result):

31

"""

32

Create a Deferred that has already been fired with a success result.

33

34

Args:

35

result: The result value

36

37

Returns:

38

Deferred: Already-fired deferred

39

"""

40

41

def defer.fail(failure):

42

"""

43

Create a Deferred that has already been fired with a failure.

44

45

Args:

46

failure: Failure object or exception

47

48

Returns:

49

Deferred: Already-failed deferred

50

"""

51

52

def defer.gatherResults(deferredList):

53

"""

54

Collect results from multiple deferreds into a list.

55

56

Args:

57

deferredList (list): List of Deferred objects

58

59

Returns:

60

Deferred: Fires with list of results when all complete

61

"""

62

63

def defer.maybeDeferred(f, *args, **kwargs):

64

"""

65

Ensure a function call returns a Deferred.

66

67

Args:

68

f: Function to call

69

*args, **kwargs: Arguments to pass to function

70

71

Returns:

72

Deferred: Either the returned Deferred or wrapped result

73

"""

74

75

def defer.inlineCallbacks(func):

76

"""

77

Decorator allowing generator-based async/await-like syntax.

78

79

Args:

80

func: Generator function using yield for async operations

81

82

Returns:

83

Function that returns a Deferred

84

"""

85

86

def defer.returnValue(value):

87

"""

88

Return a value from an inlineCallbacks generator.

89

90

Args:

91

value: Value to return

92

"""

93

```

94

95

**Usage Example**:

96

97

```python

98

from twisted.internet import defer

99

100

@defer.inlineCallbacks

101

def processData(data):

102

# Async operations using yield

103

result1 = yield asyncOperation1(data)

104

result2 = yield asyncOperation2(result1)

105

defer.returnValue(result2)

106

107

# Using callbacks

108

d = processData("input")

109

d.addCallback(lambda result: print(f"Final result: {result}"))

110

d.addErrback(lambda failure: print(f"Error: {failure.value}"))

111

```

112

113

### Deferred Collections and Coordination

114

115

Utilities for managing multiple Deferred objects and coordinating asynchronous operations.

116

117

```python { .api }

118

class defer.DeferredList(defer.Deferred):

119

"""

120

Collect results from multiple Deferreds into a single result.

121

122

Attributes:

123

- fireOnOneCallback: Fire when first Deferred succeeds

124

- fireOnOneErrback: Fire when first Deferred fails

125

"""

126

def __init__(self, deferredList, fireOnOneCallback=False, fireOnOneErrback=False, consumeErrors=False):

127

"""

128

Args:

129

deferredList (list): List of Deferred objects to track

130

fireOnOneCallback (bool): Fire on first success

131

fireOnOneErrback (bool): Fire on first failure

132

consumeErrors (bool): Consume errors from component Deferreds

133

"""

134

135

def defer.gatherResults(deferredList, consumeErrors=False):

136

"""

137

Collect results from multiple deferreds into a list (equivalent to DeferredList with consumeErrors=True).

138

139

Args:

140

deferredList (list): List of Deferred objects

141

consumeErrors (bool): Whether to consume errors

142

143

Returns:

144

Deferred: Fires with list of results when all complete

145

"""

146

```

147

148

### Concurrency Primitives

149

150

Synchronization primitives for coordinating asynchronous operations.

151

152

```python { .api }

153

class defer.DeferredLock:

154

"""

155

An event-driven lock for coordinating access to shared resources.

156

157

Attributes:

158

- locked: True when acquired, False otherwise

159

"""

160

locked = False

161

162

def acquire(self):

163

"""

164

Acquire the lock.

165

166

Returns:

167

Deferred[DeferredLock]: Fires with lock when acquired

168

"""

169

170

def release(self):

171

"""Release the lock."""

172

173

class defer.DeferredSemaphore:

174

"""

175

An event-driven semaphore for limiting concurrent operations.

176

177

Attributes:

178

- tokens: Available tokens

179

- limit: Maximum tokens

180

"""

181

def __init__(self, tokens):

182

"""

183

Args:

184

tokens (int): Number of tokens (must be >= 1)

185

"""

186

187

def acquire(self):

188

"""

189

Acquire a token.

190

191

Returns:

192

Deferred[DeferredSemaphore]: Fires when token acquired

193

"""

194

195

def release(self):

196

"""Release a token."""

197

198

class defer.DeferredQueue:

199

"""

200

An event-driven queue for producer-consumer scenarios.

201

202

Attributes:

203

- size: Maximum queue size (None for unlimited)

204

- backlog: Maximum waiting gets (None for unlimited)

205

"""

206

def __init__(self, size=None, backlog=None):

207

"""

208

Args:

209

size (int): Maximum queue size

210

backlog (int): Maximum pending gets

211

"""

212

213

def put(self, obj):

214

"""

215

Add an object to the queue.

216

217

Args:

218

obj: Object to add

219

220

Raises:

221

QueueOverflow: Queue is full

222

"""

223

224

def get(self):

225

"""

226

Get an object from the queue.

227

228

Returns:

229

Deferred: Fires with object when available

230

"""

231

```

232

233

**Usage Example**:

234

235

```python

236

from twisted.internet import defer

237

238

# Using DeferredList

239

d1 = someAsyncOperation()

240

d2 = anotherAsyncOperation()

241

dl = defer.DeferredList([d1, d2])

242

dl.addCallback(lambda results: print(f"All done: {results}"))

243

244

# Using DeferredLock for resource coordination

245

lock = defer.DeferredLock()

246

247

@defer.inlineCallbacks

248

def useResource():

249

yield lock.acquire()

250

try:

251

# Use shared resource

252

pass

253

finally:

254

lock.release()

255

256

# Using DeferredQueue for producer-consumer

257

queue = defer.DeferredQueue()

258

queue.put("item1")

259

d = queue.get()

260

d.addCallback(lambda item: print(f"Got: {item}"))

261

```

262

263

### Protocol and Factory

264

265

Network protocols and factories for managing connections and implementing network services.

266

267

```python { .api }

268

class protocol.Protocol:

269

"""

270

Base class for network protocols.

271

272

Key Methods:

273

- connectionMade(): Called when connection established

274

- dataReceived(data): Called when data arrives

275

- connectionLost(reason): Called when connection ends

276

"""

277

def connectionMade(self): ...

278

def dataReceived(self, data: bytes): ...

279

def connectionLost(self, reason): ...

280

281

transport = None # Set by framework to ITransport

282

283

class protocol.Factory:

284

"""

285

Base class for protocol factories.

286

"""

287

def buildProtocol(self, addr): ...

288

def startedConnecting(self, connector): ...

289

def clientConnectionFailed(self, connector, reason): ...

290

def clientConnectionLost(self, connector, reason): ...

291

292

class protocol.ClientFactory(protocol.Factory):

293

"""

294

Factory for client connections.

295

"""

296

297

class protocol.ServerFactory(protocol.Factory):

298

"""

299

Factory for server connections.

300

"""

301

302

class protocol.ReconnectingClientFactory(protocol.ClientFactory):

303

"""

304

Client factory that automatically reconnects on connection loss.

305

306

Attributes:

307

- initialDelay: Initial reconnection delay in seconds

308

- maxDelay: Maximum reconnection delay

309

- factor: Backoff multiplier

310

"""

311

initialDelay = 1.0

312

maxDelay = 3600.0

313

factor = 2.0

314

```

315

316

**Usage Example**:

317

318

```python

319

from twisted.internet import protocol, reactor, endpoints

320

321

class EchoProtocol(protocol.Protocol):

322

def connectionMade(self):

323

print(f"Connection from {self.transport.getPeer()}")

324

325

def dataReceived(self, data):

326

print(f"Received: {data.decode()}")

327

self.transport.write(data) # Echo back

328

329

def connectionLost(self, reason):

330

print(f"Connection lost: {reason.value}")

331

332

class EchoFactory(protocol.ServerFactory):

333

def buildProtocol(self, addr):

334

return EchoProtocol()

335

336

# Start server

337

endpoint = endpoints.TCP4ServerEndpoint(reactor, 8080)

338

endpoint.listen(EchoFactory())

339

```

340

341

### Endpoints

342

343

High-level abstractions for creating network connections without dealing directly with reactor APIs.

344

345

```python { .api }

346

def endpoints.clientFromString(reactor, description):

347

"""

348

Create a client endpoint from string description.

349

350

Args:

351

reactor: The reactor to use

352

description (str): Endpoint description (e.g., "tcp:host=example.com:port=80")

353

354

Returns:

355

IStreamClientEndpoint: Client endpoint

356

"""

357

358

def endpoints.serverFromString(reactor, description):

359

"""

360

Create a server endpoint from string description.

361

362

Args:

363

reactor: The reactor to use

364

description (str): Endpoint description (e.g., "tcp:port=8080")

365

366

Returns:

367

IStreamServerEndpoint: Server endpoint

368

"""

369

370

def endpoints.connectProtocol(endpoint, protocol):

371

"""

372

Connect a protocol instance to an endpoint.

373

374

Args:

375

endpoint: Client endpoint

376

protocol: Protocol instance

377

378

Returns:

379

Deferred: Fires with protocol when connected

380

"""

381

382

class endpoints.TCP4ServerEndpoint:

383

"""

384

IPv4 TCP server endpoint.

385

"""

386

def __init__(self, reactor, port, backlog=50, interface=''):

387

"""

388

Args:

389

reactor: The reactor

390

port (int): Port number to bind

391

backlog (int): Listen backlog size

392

interface (str): Interface to bind ('127.0.0.1', '', etc.)

393

"""

394

395

def listen(self, factory):

396

"""

397

Start listening with the given factory.

398

399

Args:

400

factory: Protocol factory

401

402

Returns:

403

Deferred: Fires with IListeningPort when listening

404

"""

405

406

class endpoints.TCP4ClientEndpoint:

407

"""

408

IPv4 TCP client endpoint.

409

"""

410

def __init__(self, reactor, host, port, timeout=30, bindAddress=None):

411

"""

412

Args:

413

reactor: The reactor

414

host (str): Hostname to connect to

415

port (int): Port number

416

timeout (int): Connection timeout in seconds

417

bindAddress: Local address to bind

418

"""

419

420

def connect(self, factory):

421

"""

422

Connect using the given factory.

423

424

Args:

425

factory: Protocol factory

426

427

Returns:

428

Deferred: Fires with protocol when connected

429

"""

430

431

class endpoints.UNIXServerEndpoint:

432

"""

433

Unix domain socket server endpoint.

434

"""

435

def __init__(self, reactor, address, backlog=50, mode=0o666, wantPID=0):

436

"""

437

Args:

438

reactor: The reactor

439

address (str): Socket file path

440

backlog (int): Listen backlog

441

mode (int): Socket file permissions

442

wantPID (bool): Include PID in address

443

"""

444

445

class endpoints.SSL4ServerEndpoint:

446

"""

447

SSL server endpoint.

448

"""

449

def __init__(self, reactor, port, sslContextFactory, backlog=50, interface=''):

450

"""

451

Args:

452

reactor: The reactor

453

port (int): Port to bind

454

sslContextFactory: SSL context factory

455

backlog (int): Listen backlog

456

interface (str): Interface to bind

457

"""

458

```

459

460

**Usage Example**:

461

462

```python

463

from twisted.internet import reactor, endpoints, protocol

464

465

class SimpleProtocol(protocol.Protocol):

466

def connectionMade(self):

467

self.transport.write(b"Hello, world!")

468

self.transport.loseConnection()

469

470

# Using string descriptions

471

server_endpoint = endpoints.serverFromString(reactor, "tcp:port=8080")

472

client_endpoint = endpoints.clientFromString(reactor, "tcp:host=localhost:port=8080")

473

474

# Direct endpoint creation

475

server_endpoint = endpoints.TCP4ServerEndpoint(reactor, 8080)

476

client_endpoint = endpoints.TCP4ClientEndpoint(reactor, "localhost", 8080)

477

478

# Connect

479

d = client_endpoint.connect(protocol.ClientFactory())

480

d.addCallback(lambda proto: proto.transport.write(b"data"))

481

```

482

483

### Task Scheduling

484

485

Utilities for scheduling and managing recurring tasks and delayed operations.

486

487

```python { .api }

488

class task.LoopingCall:

489

"""

490

Repeatedly call a function at regular intervals.

491

"""

492

def __init__(self, f, *args, **kwargs):

493

"""

494

Args:

495

f: Function to call

496

*args, **kwargs: Arguments for function

497

"""

498

499

def start(self, interval, now=True):

500

"""

501

Start the looping call.

502

503

Args:

504

interval (float): Seconds between calls

505

now (bool): Whether to call immediately

506

507

Returns:

508

Deferred: Fires when stopped

509

"""

510

511

def stop(self):

512

"""Stop the looping call."""

513

514

def task.deferLater(reactor, delay, callable, *args, **kwargs):

515

"""

516

Schedule a function call after a delay.

517

518

Args:

519

reactor: The reactor

520

delay (float): Delay in seconds

521

callable: Function to call

522

*args, **kwargs: Arguments for function

523

524

Returns:

525

Deferred: Fires with function result after delay

526

"""

527

528

def task.react(main, argv=None, reactor=None):

529

"""

530

Run a main function with the reactor, handling startup and shutdown.

531

532

Args:

533

main: Main function that takes (reactor, *argv)

534

argv: Command line arguments

535

reactor: Reactor to use (default: global reactor)

536

"""

537

538

class task.Clock:

539

"""

540

Deterministic clock for testing time-based operations.

541

"""

542

def advance(self, amount):

543

"""

544

Advance the clock by the given amount.

545

546

Args:

547

amount (float): Seconds to advance

548

"""

549

550

def callLater(self, delay, callable, *args, **kwargs):

551

"""

552

Schedule a delayed call.

553

554

Args:

555

delay (float): Delay in seconds

556

callable: Function to call

557

*args, **kwargs: Arguments

558

559

Returns:

560

IDelayedCall: Scheduled call object

561

"""

562

```

563

564

**Usage Example**:

565

566

```python

567

from twisted.internet import task, reactor

568

569

# Looping call

570

def heartbeat():

571

print("Heartbeat")

572

573

lc = task.LoopingCall(heartbeat)

574

lc.start(1.0) # Call every second

575

576

# Delayed call

577

def delayed_action():

578

print("Delayed action executed")

579

580

d = task.deferLater(reactor, 5.0, delayed_action)

581

d.addCallback(lambda _: print("Delay completed"))

582

583

# Using react for main function

584

def main(reactor, name):

585

print(f"Hello, {name}!")

586

return task.deferLater(reactor, 2.0, reactor.stop)

587

588

task.react(main, ["World"])

589

```

590

591

### Reactor Operations

592

593

Direct reactor interface for low-level event loop control and network operations.

594

595

```python { .api }

596

# The reactor is typically imported as a singleton

597

from twisted.internet import reactor

598

599

# Key reactor methods (available on reactor instance):

600

def reactor.run():

601

"""Start the event loop (blocks until stopped)."""

602

603

def reactor.stop():

604

"""Stop the event loop."""

605

606

def reactor.callLater(delay, callable, *args, **kwargs):

607

"""

608

Schedule a function call after a delay.

609

610

Args:

611

delay (float): Delay in seconds

612

callable: Function to call

613

*args, **kwargs: Arguments

614

615

Returns:

616

IDelayedCall: Scheduled call object

617

"""

618

619

def reactor.callWhenRunning(callable, *args, **kwargs):

620

"""

621

Schedule a function call when reactor starts running.

622

623

Args:

624

callable: Function to call

625

*args, **kwargs: Arguments

626

"""

627

628

def reactor.listenTCP(port, factory, backlog=50, interface=''):

629

"""

630

Listen for TCP connections.

631

632

Args:

633

port (int): Port number

634

factory: Protocol factory

635

backlog (int): Listen backlog

636

interface (str): Interface to bind

637

638

Returns:

639

IListeningPort: Listening port object

640

"""

641

642

def reactor.connectTCP(host, port, factory, timeout=30, bindAddress=None):

643

"""

644

Make a TCP connection.

645

646

Args:

647

host (str): Hostname

648

port (int): Port number

649

factory: Client factory

650

timeout (int): Connection timeout

651

bindAddress: Local bind address

652

653

Returns:

654

IConnector: Connection object

655

"""

656

```

657

658

### Error Handling

659

660

Common exception types and error handling patterns in Twisted's asynchronous operations.

661

662

```python { .api }

663

# Common exceptions from twisted.internet.error

664

class error.ConnectionDone(Exception):

665

"""Connection was closed cleanly."""

666

667

class error.ConnectionLost(Exception):

668

"""Connection was lost unexpectedly."""

669

670

class error.ConnectionRefusedError(Exception):

671

"""Connection was refused by the remote host."""

672

673

class error.TimeoutError(Exception):

674

"""Operation timed out."""

675

676

class error.DNSLookupError(Exception):

677

"""DNS lookup failed."""

678

679

# From twisted.internet.defer

680

class defer.CancelledError(Exception):

681

"""Deferred was cancelled."""

682

683

class defer.AlreadyCalledError(Exception):

684

"""Deferred has already been fired."""

685

686

# From twisted.python.failure

687

class failure.Failure:

688

"""

689

Exception wrapper that preserves tracebacks across async boundaries.

690

691

Attributes:

692

- value: The wrapped exception

693

- type: Exception type

694

- tb: Traceback object

695

"""

696

def __init__(self, exc_value=None, exc_type=None, exc_tb=None): ...

697

def trap(self, *errorTypes): ...

698

def check(self, *errorTypes): ...

699

def getErrorMessage(self): ...

700

def printTraceback(self, file=None): ...

701

```

702

703

**Error Handling Example**:

704

705

```python

706

from twisted.internet import defer, error

707

from twisted.python import failure

708

709

def handleError(f):

710

"""Handle different types of failures."""

711

if f.check(error.ConnectionRefusedError):

712

print("Connection refused")

713

elif f.check(error.TimeoutError):

714

print("Operation timed out")

715

else:

716

print(f"Unexpected error: {f.value}")

717

f.printTraceback()

718

719

def riskyOperation():

720

# This might fail

721

d = defer.Deferred()

722

# ... async operation ...

723

return d

724

725

d = riskyOperation()

726

d.addErrback(handleError)

727

```

728

729

### Interface Types

730

731

Key interfaces used throughout Twisted's asynchronous I/O system.

732

733

```python { .api }

734

# Common interfaces from twisted.internet.interfaces

735

class IDelayedCall:

736

"""

737

Interface for scheduled calls that can be cancelled.

738

739

Methods:

740

- cancel(): Cancel the scheduled call

741

- active(): Check if call is still scheduled

742

- getTime(): Get scheduled time

743

"""

744

def cancel(): ...

745

def active(): ...

746

def getTime(): ...

747

748

class IListeningPort:

749

"""

750

Interface for listening network ports.

751

752

Methods:

753

- getHost(): Get local address

754

- startListening(): Begin listening

755

- stopListening(): Stop listening and return Deferred

756

"""

757

def getHost(): ...

758

def startListening(): ...

759

def stopListening(): ...

760

761

class ITransport:

762

"""

763

Interface for network transports.

764

765

Methods:

766

- write(data): Write data to transport

767

- writeSequence(data): Write sequence of data

768

- loseConnection(): Close connection cleanly

769

- abortConnection(): Close connection immediately

770

- getPeer(): Get remote address

771

- getHost(): Get local address

772

"""

773

def write(data): ...

774

def writeSequence(data): ...

775

def loseConnection(): ...

776

def abortConnection(): ...

777

def getPeer(): ...

778

def getHost(): ...

779

780

# From twisted.python.failure

781

class Failure:

782

"""

783

Exception wrapper preserving tracebacks across async boundaries.

784

785

Attributes:

786

- value: The wrapped exception

787

- type: Exception type

788

- tb: Traceback object

789

"""

790

def __init__(self, exc_value=None, exc_type=None, exc_tb=None): ...

791

def trap(self, *errorTypes): ...

792

def check(self, *errorTypes): ...

793

def getErrorMessage(self): ...

794

def printTraceback(self, file=None): ...

795

```

796

797

### Required Imports

798

799

Complete import statements for using the asynchronous I/O capabilities:

800

801

```python

802

# Core deferred and reactor functionality

803

from twisted.internet import defer, reactor, protocol, endpoints, task, error

804

from twisted.internet.interfaces import IDelayedCall, IListeningPort, ITransport

805

from twisted.python.failure import Failure

806

807

# Common import patterns

808

from twisted.internet.defer import (

809

Deferred, DeferredList, DeferredLock, DeferredSemaphore, DeferredQueue,

810

succeed, fail, gatherResults, maybeDeferred, inlineCallbacks, returnValue

811

)

812

```