or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-io.mdauthentication.mdhttp-client-server.mdindex.mdnetworking.mdtemplates.mdtesting.mdutilities.mdweb-framework.mdwebsocket.md

async-io.mddocs/

0

# Asynchronous I/O

1

2

Core asynchronous I/O primitives including event loops, streams, locks, queues, and futures. These components enable non-blocking operations and concurrent programming patterns.

3

4

## Capabilities

5

6

### Event Loop

7

8

Core event loop managing all asynchronous operations, timers, callbacks, and I/O events in Tornado applications.

9

10

```python { .api }

11

class IOLoop:

12

"""Asynchronous I/O event loop."""

13

14

@classmethod

15

def configure(cls, impl, **kwargs):

16

"""Configure IOLoop implementation."""

17

18

@classmethod

19

def instance(cls):

20

"""Get global IOLoop instance."""

21

22

@classmethod

23

def current(cls, instance: bool = True):

24

"""

25

Get current thread's IOLoop.

26

27

Args:

28

instance: Whether to create instance if none exists

29

30

Returns:

31

Current IOLoop instance

32

"""

33

34

@classmethod

35

def install(cls):

36

"""Install this IOLoop as global instance."""

37

38

def start(self):

39

"""Start the event loop."""

40

41

def stop(self):

42

"""Stop the event loop."""

43

44

def run_sync(self, func, timeout: float = None):

45

"""

46

Run coroutine synchronously with timeout.

47

48

Args:

49

func: Coroutine function to run

50

timeout: Optional timeout in seconds

51

52

Returns:

53

Result of coroutine

54

"""

55

56

def add_handler(self, fd, handler, events):

57

"""

58

Add I/O event handler for file descriptor.

59

60

Args:

61

fd: File descriptor

62

handler: Handler function

63

events: Event mask (IOLoop.READ, IOLoop.WRITE, IOLoop.ERROR)

64

"""

65

66

def update_handler(self, fd, events):

67

"""Update events for file descriptor."""

68

69

def remove_handler(self, fd):

70

"""Remove handler for file descriptor."""

71

72

def add_timeout(self, deadline, callback, *args, **kwargs):

73

"""

74

Add timeout callback.

75

76

Args:

77

deadline: Absolute time or timedelta

78

callback: Callback function

79

*args: Callback arguments

80

**kwargs: Callback keyword arguments

81

82

Returns:

83

Timeout handle

84

"""

85

86

def call_later(self, delay: float, callback, *args, **kwargs):

87

"""

88

Call function after delay.

89

90

Args:

91

delay: Delay in seconds

92

callback: Callback function

93

*args: Callback arguments

94

**kwargs: Callback keyword arguments

95

96

Returns:

97

Timeout handle

98

"""

99

100

def call_at(self, when: float, callback, *args, **kwargs):

101

"""

102

Call function at specific time.

103

104

Args:

105

when: Absolute time

106

callback: Callback function

107

*args: Callback arguments

108

**kwargs: Callback keyword arguments

109

110

Returns:

111

Timeout handle

112

"""

113

114

def remove_timeout(self, timeout):

115

"""Remove timeout callback."""

116

117

def add_callback(self, callback, *args, **kwargs):

118

"""

119

Add callback to be executed on next iteration.

120

121

Args:

122

callback: Callback function

123

*args: Callback arguments

124

**kwargs: Callback keyword arguments

125

"""

126

127

def add_callback_from_signal(self, callback, *args, **kwargs):

128

"""Add callback from signal handler context."""

129

130

def spawn_callback(self, callback, *args, **kwargs):

131

"""

132

Spawn callback as separate task.

133

134

Args:

135

callback: Callback function

136

*args: Callback arguments

137

**kwargs: Callback keyword arguments

138

"""

139

140

def add_future(self, future, callback):

141

"""

142

Add callback to be called when future completes.

143

144

Args:

145

future: Future object

146

callback: Callback function

147

"""

148

149

def run_in_executor(self, executor, func, *args):

150

"""

151

Run function in executor.

152

153

Args:

154

executor: Executor instance

155

func: Function to execute

156

*args: Function arguments

157

158

Returns:

159

Future resolving to function result

160

"""

161

162

def set_default_executor(self, executor):

163

"""Set default executor for run_in_executor."""

164

165

def time(self) -> float:

166

"""Get current time."""

167

168

class PeriodicCallback:

169

"""Periodic callback scheduler."""

170

171

def __init__(self, callback, callback_time: float, io_loop=None):

172

"""

173

Initialize periodic callback.

174

175

Args:

176

callback: Function to call periodically

177

callback_time: Interval in milliseconds

178

io_loop: IOLoop instance (uses current if None)

179

"""

180

181

def start(self):

182

"""Start periodic callbacks."""

183

184

def stop(self):

185

"""Stop periodic callbacks."""

186

187

def is_running(self) -> bool:

188

"""Check if callback is running."""

189

```

190

191

### Asynchronous Streams

192

193

Stream abstractions for non-blocking I/O operations on sockets, pipes, and other file descriptors.

194

195

```python { .api }

196

class BaseIOStream:

197

"""Base class for asynchronous streams."""

198

199

def __init__(self, io_loop=None, max_buffer_size: int = None, read_chunk_size: int = None, max_write_buffer_size: int = None):

200

"""

201

Initialize stream.

202

203

Args:

204

io_loop: IOLoop instance

205

max_buffer_size: Maximum read buffer size

206

read_chunk_size: Read chunk size

207

max_write_buffer_size: Maximum write buffer size

208

"""

209

210

def read_bytes(self, num_bytes: int, callback=None, streaming_callback=None, partial: bool = False) -> Future:

211

"""

212

Read specified number of bytes.

213

214

Args:

215

num_bytes: Number of bytes to read

216

callback: Callback function (if not using async/await)

217

streaming_callback: Callback for data chunks

218

partial: Whether to allow partial reads

219

220

Returns:

221

Future resolving to bytes

222

"""

223

224

def read_until_regex(self, regex, callback=None, max_bytes: int = None) -> Future:

225

"""

226

Read until regex pattern matches.

227

228

Args:

229

regex: Regular expression pattern

230

callback: Callback function (if not using async/await)

231

max_bytes: Maximum bytes to read

232

233

Returns:

234

Future resolving to bytes

235

"""

236

237

def read_until(self, delimiter: bytes, callback=None, max_bytes: int = None) -> Future:

238

"""

239

Read until delimiter found.

240

241

Args:

242

delimiter: Delimiter bytes

243

callback: Callback function (if not using async/await)

244

max_bytes: Maximum bytes to read

245

246

Returns:

247

Future resolving to bytes

248

"""

249

250

def read_into(self, buf, callback=None, partial: bool = False) -> Future:

251

"""

252

Read data into existing buffer.

253

254

Args:

255

buf: Buffer to read into

256

callback: Callback function (if not using async/await)

257

partial: Whether to allow partial reads

258

259

Returns:

260

Future resolving to number of bytes read

261

"""

262

263

def read_until_close(self, callback=None, streaming_callback=None) -> Future:

264

"""

265

Read all data until stream closes.

266

267

Args:

268

callback: Callback function (if not using async/await)

269

streaming_callback: Callback for data chunks

270

271

Returns:

272

Future resolving to all bytes

273

"""

274

275

def write(self, data: bytes, callback=None) -> Future:

276

"""

277

Write data to stream.

278

279

Args:

280

data: Data to write

281

callback: Callback function (if not using async/await)

282

283

Returns:

284

Future resolving when write completes

285

"""

286

287

def close(self, exc_info: bool = False):

288

"""

289

Close stream.

290

291

Args:

292

exc_info: Whether to log exception info

293

"""

294

295

def set_close_callback(self, callback):

296

"""

297

Set callback to be called when stream closes.

298

299

Args:

300

callback: Close callback function

301

"""

302

303

def closed(self) -> bool:

304

"""Check if stream is closed."""

305

306

def reading(self) -> bool:

307

"""Check if stream is reading."""

308

309

def writing(self) -> bool:

310

"""Check if stream is writing."""

311

312

def set_nodelay(self, value: bool):

313

"""Enable/disable Nagle's algorithm."""

314

315

class IOStream(BaseIOStream):

316

"""Socket-based stream implementation."""

317

318

def __init__(self, socket, io_loop=None, **kwargs):

319

"""

320

Initialize socket stream.

321

322

Args:

323

socket: Socket object

324

io_loop: IOLoop instance

325

**kwargs: Additional stream options

326

"""

327

328

async def connect(self, address, callback=None, server_hostname: str = None):

329

"""

330

Connect to remote address.

331

332

Args:

333

address: Remote address tuple (host, port)

334

callback: Callback function (if not using async/await)

335

server_hostname: Server hostname for SNI

336

"""

337

338

def start_tls(self, server_side: bool, ssl_options=None, server_hostname: str = None) -> Future:

339

"""

340

Start TLS/SSL on connection.

341

342

Args:

343

server_side: Whether this is server side

344

ssl_options: SSL configuration options

345

server_hostname: Server hostname for SNI

346

347

Returns:

348

Future resolving when TLS handshake completes

349

"""

350

351

class SSLIOStream(IOStream):

352

"""SSL/TLS socket stream."""

353

354

def __init__(self, *args, **kwargs):

355

"""Initialize SSL stream."""

356

357

def wait_for_handshake(self, callback=None) -> Future:

358

"""

359

Wait for SSL handshake to complete.

360

361

Args:

362

callback: Callback function (if not using async/await)

363

364

Returns:

365

Future resolving when handshake completes

366

"""

367

368

class PipeIOStream(BaseIOStream):

369

"""Pipe-based stream for subprocess communication."""

370

371

def __init__(self, fd, io_loop=None, **kwargs):

372

"""

373

Initialize pipe stream.

374

375

Args:

376

fd: File descriptor

377

io_loop: IOLoop instance

378

**kwargs: Additional stream options

379

"""

380

```

381

382

### Synchronization Primitives

383

384

Asynchronous versions of threading primitives like locks, conditions, events, and semaphores for coordinating coroutines.

385

386

```python { .api }

387

class Lock:

388

"""Asynchronous lock (mutex)."""

389

390

def __init__(self):

391

"""Initialize lock."""

392

393

async def __aenter__(self):

394

"""Async context manager entry."""

395

await self.acquire()

396

return self

397

398

async def __aexit__(self, exc_type, exc_val, exc_tb):

399

"""Async context manager exit."""

400

self.release()

401

402

async def acquire(self):

403

"""

404

Acquire lock.

405

406

Blocks until lock is available.

407

"""

408

409

def release(self):

410

"""

411

Release lock.

412

413

Raises:

414

RuntimeError: If lock is not currently held

415

"""

416

417

class Condition:

418

"""Asynchronous condition variable."""

419

420

def __init__(self, lock: Lock = None):

421

"""

422

Initialize condition.

423

424

Args:

425

lock: Optional lock to use (creates new if None)

426

"""

427

428

async def __aenter__(self):

429

"""Async context manager entry."""

430

await self.acquire()

431

return self

432

433

async def __aexit__(self, exc_type, exc_val, exc_tb):

434

"""Async context manager exit."""

435

self.release()

436

437

async def acquire(self):

438

"""Acquire underlying lock."""

439

440

def release(self):

441

"""Release underlying lock."""

442

443

async def wait(self, timeout: float = None) -> bool:

444

"""

445

Wait for condition to be notified.

446

447

Args:

448

timeout: Optional timeout in seconds

449

450

Returns:

451

True if notified, False if timeout

452

"""

453

454

def notify(self, n: int = 1):

455

"""

456

Notify waiting coroutines.

457

458

Args:

459

n: Number of coroutines to notify

460

"""

461

462

def notify_all(self):

463

"""Notify all waiting coroutines."""

464

465

class Event:

466

"""Asynchronous event flag."""

467

468

def __init__(self):

469

"""Initialize event (starts unset)."""

470

471

def is_set(self) -> bool:

472

"""Check if event is set."""

473

474

def set(self):

475

"""Set event flag and notify waiters."""

476

477

def clear(self):

478

"""Clear event flag."""

479

480

async def wait(self, timeout: float = None) -> bool:

481

"""

482

Wait for event to be set.

483

484

Args:

485

timeout: Optional timeout in seconds

486

487

Returns:

488

True if event was set, False if timeout

489

"""

490

491

class Semaphore:

492

"""Asynchronous semaphore."""

493

494

def __init__(self, value: int = 1):

495

"""

496

Initialize semaphore.

497

498

Args:

499

value: Initial semaphore value

500

"""

501

502

async def __aenter__(self):

503

"""Async context manager entry."""

504

await self.acquire()

505

return self

506

507

async def __aexit__(self, exc_type, exc_val, exc_tb):

508

"""Async context manager exit."""

509

self.release()

510

511

async def acquire(self):

512

"""Acquire semaphore (decrements counter)."""

513

514

def release(self):

515

"""Release semaphore (increments counter)."""

516

517

class BoundedSemaphore(Semaphore):

518

"""Semaphore with bounded release operation."""

519

520

def release(self):

521

"""

522

Release semaphore with bounds checking.

523

524

Raises:

525

ValueError: If releasing would exceed initial value

526

"""

527

```

528

529

### Asynchronous Queues

530

531

Queue implementations for passing data between coroutines with different ordering strategies and flow control.

532

533

```python { .api }

534

class Queue:

535

"""Asynchronous FIFO queue."""

536

537

def __init__(self, maxsize: int = 0):

538

"""

539

Initialize queue.

540

541

Args:

542

maxsize: Maximum queue size (0 for unlimited)

543

"""

544

545

def qsize(self) -> int:

546

"""Get current queue size."""

547

548

def empty(self) -> bool:

549

"""Check if queue is empty."""

550

551

def full(self) -> bool:

552

"""Check if queue is full."""

553

554

async def put(self, item):

555

"""

556

Put item in queue.

557

558

Args:

559

item: Item to add to queue

560

561

Blocks if queue is full.

562

"""

563

564

def put_nowait(self, item):

565

"""

566

Put item in queue without blocking.

567

568

Args:

569

item: Item to add to queue

570

571

Raises:

572

QueueFull: If queue is full

573

"""

574

575

async def get(self):

576

"""

577

Get item from queue.

578

579

Returns:

580

Item from queue

581

582

Blocks if queue is empty.

583

"""

584

585

def get_nowait(self):

586

"""

587

Get item from queue without blocking.

588

589

Returns:

590

Item from queue

591

592

Raises:

593

QueueEmpty: If queue is empty

594

"""

595

596

def task_done(self):

597

"""Indicate that queued task is complete."""

598

599

async def join(self):

600

"""Wait until all tasks are done."""

601

602

class PriorityQueue(Queue):

603

"""Asynchronous priority queue (lowest priority first)."""

604

605

def __init__(self, maxsize: int = 0):

606

"""Initialize priority queue."""

607

608

class LifoQueue(Queue):

609

"""Asynchronous LIFO queue (stack)."""

610

611

def __init__(self, maxsize: int = 0):

612

"""Initialize LIFO queue."""

613

```

614

615

### Future Utilities

616

617

Utilities for working with Future objects, including conversion, timeouts, and execution control.

618

619

```python { .api }

620

Future = asyncio.Future

621

622

def is_future(obj) -> bool:

623

"""

624

Check if object is a Future.

625

626

Args:

627

obj: Object to check

628

629

Returns:

630

True if object is a Future

631

"""

632

633

def run_on_executor(executor=None, io_loop=None):

634

"""

635

Decorator to run function on executor.

636

637

Args:

638

executor: Executor to use

639

io_loop: IOLoop instance

640

641

Returns:

642

Decorator function

643

"""

644

645

def chain_future(a: Future, b: Future):

646

"""

647

Chain two futures together.

648

649

Args:

650

a: Source future

651

b: Target future

652

"""

653

654

def future_set_result_unless_cancelled(future: Future, value):

655

"""

656

Set future result unless cancelled.

657

658

Args:

659

future: Future to set

660

value: Result value

661

"""

662

663

def future_set_exception_unless_cancelled(future: Future, exc):

664

"""

665

Set future exception unless cancelled.

666

667

Args:

668

future: Future to set

669

exc: Exception to set

670

"""

671

672

async def with_timeout(timeout: float, future: Future):

673

"""

674

Wrap future with timeout.

675

676

Args:

677

timeout: Timeout in seconds

678

future: Future to wrap

679

680

Returns:

681

Future result

682

683

Raises:

684

asyncio.TimeoutError: If timeout expires

685

"""

686

687

async def sleep(duration: float):

688

"""

689

Sleep for specified duration.

690

691

Args:

692

duration: Sleep duration in seconds

693

"""

694

695

class DummyExecutor:

696

"""Executor that runs functions synchronously."""

697

698

def submit(self, fn, *args, **kwargs):

699

"""Submit function for execution."""

700

```

701

702

## Types

703

704

```python { .api }

705

# Event mask constants for IOLoop

706

READ = 0x001

707

WRITE = 0x004

708

ERROR = 0x008

709

710

# Timeout handle type

711

TimeoutHandle = object

712

713

# File descriptor type

714

FileDescriptor = Union[int, socket.socket]

715

716

# Event callback type

717

EventCallback = Callable[[int, int], None]

718

719

# Timeout callback type

720

TimeoutCallback = Callable[[], None]

721

722

# Stream callback types

723

StreamCallback = Callable[[bytes], None]

724

CloseCallback = Callable[[], None]

725

```

726

727

## Exceptions

728

729

```python { .api }

730

class StreamClosedError(Exception):

731

"""Exception when stream operation attempted on closed stream."""

732

733

def __init__(self, real_error=None):

734

"""

735

Initialize stream closed error.

736

737

Args:

738

real_error: Underlying error that caused closure

739

"""

740

741

class UnsatisfiableReadError(Exception):

742

"""Exception when read cannot be satisfied."""

743

744

class StreamBufferFullError(Exception):

745

"""Exception when stream buffer is full."""

746

747

class QueueEmpty(Exception):

748

"""Exception when queue is empty."""

749

750

class QueueFull(Exception):

751

"""Exception when queue is full."""

752

```