or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-client.mdexceptions.mdhandlers.mdindex.mdrecipes.mdsecurity.mdtesting.md

handlers.mddocs/

0

# Async Handlers

1

2

Pluggable concurrency models supporting different async frameworks. Handlers manage callback execution, timeouts, and async result objects with support for threading, gevent, and eventlet paradigms for scalable concurrent Zookeeper operations.

3

4

## Capabilities

5

6

### Handler Interface

7

8

Base interface and common functionality for all async handler implementations with standardized callback management and async result handling.

9

10

```python { .api }

11

class IHandler:

12

"""Interface for callback handlers."""

13

14

def start(self):

15

"""Start the handler."""

16

17

def stop(self):

18

"""Stop the handler."""

19

20

def select(self, sockets, timeout):

21

"""

22

Select on sockets for I/O readiness.

23

24

Parameters:

25

- sockets (list): List of socket file descriptors

26

- timeout (float): Timeout in seconds

27

28

Returns:

29

tuple: (read_ready, write_ready, error_ready) socket lists

30

"""

31

32

def callback_object(self):

33

"""

34

Create callback result object.

35

36

Returns:

37

IAsyncResult: Async result object

38

"""

39

40

def dispatch_callback(self, callback):

41

"""

42

Dispatch callback for execution.

43

44

Parameters:

45

- callback (Callback): Callback object to execute

46

"""

47

48

class IAsyncResult:

49

"""Interface for async result objects."""

50

51

def ready(self):

52

"""

53

Check if result is ready.

54

55

Returns:

56

bool: True if result is available

57

"""

58

59

def successful(self):

60

"""

61

Check if operation was successful.

62

63

Returns:

64

bool: True if successful, False if exception occurred

65

"""

66

67

def get(self, block=True, timeout=None):

68

"""

69

Get the result.

70

71

Parameters:

72

- block (bool): Block until result is available

73

- timeout (float): Maximum time to wait

74

75

Returns:

76

Result value or raises exception

77

"""

78

79

def get_nowait(self):

80

"""

81

Get result without blocking.

82

83

Returns:

84

Result value or raises exception if not ready

85

"""

86

87

def set(self, value):

88

"""

89

Set the result value.

90

91

Parameters:

92

- value: Result value to set

93

"""

94

95

def set_exception(self, exception):

96

"""

97

Set an exception as the result.

98

99

Parameters:

100

- exception (Exception): Exception to set

101

"""

102

```

103

104

### Threading Handler

105

106

Threading-based async handler using Python's threading module for traditional multi-threaded concurrency with thread-safe callback execution.

107

108

```python { .api }

109

class SequentialThreadingHandler:

110

def __init__(self):

111

"""

112

Threading-based handler for sequential callback execution.

113

114

Uses threading.Thread for concurrent operations and threading

115

primitives for synchronization and callback management.

116

"""

117

118

def start(self):

119

"""Start the threading handler."""

120

121

def stop(self):

122

"""Stop the threading handler and cleanup threads."""

123

124

def select(self, sockets, timeout):

125

"""

126

Socket selection using select.select().

127

128

Parameters:

129

- sockets (list): Socket file descriptors

130

- timeout (float): Selection timeout

131

132

Returns:

133

tuple: Ready socket lists (read, write, error)

134

"""

135

136

def callback_object(self):

137

"""

138

Create threading-based async result.

139

140

Returns:

141

AsyncResult: Threading async result object

142

"""

143

144

def dispatch_callback(self, callback):

145

"""

146

Dispatch callback in thread-safe manner.

147

148

Parameters:

149

- callback (Callback): Callback to execute

150

"""

151

152

def create_connection(self, *args, **kwargs):

153

"""

154

Create socket connection.

155

156

Returns:

157

socket: Connected socket object

158

"""

159

160

class AsyncResult:

161

"""Threading-based async result implementation."""

162

163

def __init__(self):

164

"""Initialize with threading.Event for synchronization."""

165

166

def ready(self):

167

"""Check if result is ready using threading.Event."""

168

169

def successful(self):

170

"""Check if operation succeeded."""

171

172

def get(self, block=True, timeout=None):

173

"""

174

Get result with optional blocking and timeout.

175

176

Uses threading.Event.wait() for blocking behavior.

177

"""

178

179

def get_nowait(self):

180

"""Get result immediately or raise exception."""

181

182

def set(self, value):

183

"""Set result value and notify waiters."""

184

185

def set_exception(self, exception):

186

"""Set exception and notify waiters."""

187

188

class KazooTimeoutError(Exception):

189

"""Timeout exception for threading handler operations."""

190

```

191

192

### Gevent Handler

193

194

Gevent-based async handler using greenlets for cooperative concurrency with gevent's async I/O capabilities and green threading model.

195

196

```python { .api }

197

class SequentialGeventHandler:

198

def __init__(self):

199

"""

200

Gevent-based handler for greenlet concurrency.

201

202

Requires gevent >= 1.2 for proper async I/O support.

203

Uses gevent.select() for socket operations.

204

"""

205

206

def start(self):

207

"""Start gevent handler."""

208

209

def stop(self):

210

"""Stop gevent handler."""

211

212

def select(self, sockets, timeout):

213

"""

214

Socket selection using gevent.select.select().

215

216

Parameters:

217

- sockets (list): Socket file descriptors

218

- timeout (float): Selection timeout

219

220

Returns:

221

tuple: Ready socket lists optimized for gevent

222

"""

223

224

def callback_object(self):

225

"""

226

Create gevent async result.

227

228

Returns:

229

AsyncResult: Gevent-compatible async result

230

"""

231

232

def dispatch_callback(self, callback):

233

"""

234

Dispatch callback using gevent.spawn().

235

236

Parameters:

237

- callback (Callback): Callback for greenlet execution

238

"""

239

240

def create_connection(self, *args, **kwargs):

241

"""

242

Create gevent-compatible socket connection.

243

244

Returns:

245

gevent.socket: Gevent socket object

246

"""

247

248

def create_socket_pair(self):

249

"""

250

Create gevent socket pair for communication.

251

252

Returns:

253

tuple: (socket1, socket2) gevent socket pair

254

"""

255

```

256

257

### Eventlet Handler

258

259

Eventlet-based async handler using green threads for cooperative concurrency with eventlet's async I/O and green threading capabilities.

260

261

```python { .api }

262

class SequentialEventletHandler:

263

def __init__(self):

264

"""

265

Eventlet-based handler for green thread concurrency.

266

267

Requires eventlet >= 0.17.1 for proper async support.

268

Uses eventlet.select() for socket operations.

269

"""

270

271

def start(self):

272

"""Start eventlet handler."""

273

274

def stop(self):

275

"""Stop eventlet handler."""

276

277

def select(self, sockets, timeout):

278

"""

279

Socket selection using eventlet.select.select().

280

281

Parameters:

282

- sockets (list): Socket file descriptors

283

- timeout (float): Selection timeout

284

285

Returns:

286

tuple: Ready socket lists for eventlet

287

"""

288

289

def callback_object(self):

290

"""

291

Create eventlet async result.

292

293

Returns:

294

AsyncResult: Eventlet-compatible async result

295

"""

296

297

def dispatch_callback(self, callback):

298

"""

299

Dispatch callback using eventlet.spawn().

300

301

Parameters:

302

- callback (Callback): Callback for green thread execution

303

"""

304

305

def create_connection(self, *args, **kwargs):

306

"""

307

Create eventlet-compatible socket connection.

308

309

Returns:

310

eventlet.green.socket: Eventlet socket object

311

"""

312

313

class AsyncResult:

314

"""Eventlet async result implementation."""

315

316

def __init__(self):

317

"""Initialize with eventlet.Event for coordination."""

318

319

def ready(self):

320

"""Check if result is ready using eventlet primitives."""

321

322

def successful(self):

323

"""Check operation success status."""

324

325

def get(self, block=True, timeout=None):

326

"""

327

Get result with eventlet timeout support.

328

329

Uses eventlet.timeout.Timeout for timeout handling.

330

"""

331

332

def get_nowait(self):

333

"""Get result without blocking in green thread."""

334

335

def set(self, value):

336

"""Set result and wake waiting green threads."""

337

338

def set_exception(self, exception):

339

"""Set exception and notify green threads."""

340

341

class TimeoutError(Exception):

342

"""Timeout exception for eventlet handler operations."""

343

```

344

345

### Handler Utilities

346

347

Utility functions and base classes supporting all handler implementations with common socket operations and async result patterns.

348

349

```python { .api }

350

class AsyncResult:

351

"""Base async result implementation."""

352

353

def __init__(self):

354

"""Initialize base async result."""

355

356

def ready(self):

357

"""Check if result is available."""

358

359

def successful(self):

360

"""Check if operation was successful."""

361

362

def get(self, block=True, timeout=None):

363

"""Get result value with optional timeout."""

364

365

def get_nowait(self):

366

"""Get result without blocking."""

367

368

def set(self, value):

369

"""Set the result value."""

370

371

def set_exception(self, exception):

372

"""Set an exception as result."""

373

374

def link(self, callback):

375

"""

376

Link callback to be called when result is ready.

377

378

Parameters:

379

- callback (callable): Function to call when ready

380

"""

381

382

def unlink(self, callback):

383

"""

384

Unlink previously linked callback.

385

386

Parameters:

387

- callback (callable): Callback to remove

388

"""

389

390

def create_socket_pair():

391

"""

392

Create connected socket pair for communication.

393

394

Returns:

395

tuple: (socket1, socket2) connected socket pair

396

"""

397

398

def create_tcp_socket(module):

399

"""

400

Create TCP socket using specified socket module.

401

402

Parameters:

403

- module: Socket module (socket, gevent.socket, etc.)

404

405

Returns:

406

socket: TCP socket object

407

"""

408

409

def create_tcp_connection(module, address, timeout=None):

410

"""

411

Create TCP connection to address.

412

413

Parameters:

414

- module: Socket module to use

415

- address (tuple): (host, port) tuple

416

- timeout (float): Connection timeout

417

418

Returns:

419

socket: Connected socket

420

"""

421

422

def capture_exceptions(async_object):

423

"""

424

Decorator to capture exceptions in async operations.

425

426

Parameters:

427

- async_object: Async result object

428

429

Returns:

430

Decorator function

431

"""

432

433

def wrap(async_object):

434

"""

435

Decorator to wrap function with async result.

436

437

Parameters:

438

- async_object: Async result object

439

440

Returns:

441

Decorator function

442

"""

443

444

def fileobj_to_fd(fileobj):

445

"""

446

Convert file object to file descriptor.

447

448

Parameters:

449

- fileobj: File-like object

450

451

Returns:

452

int: File descriptor

453

"""

454

455

def selector_select(selector, timeout):

456

"""

457

Select using selectors module.

458

459

Parameters:

460

- selector: Selector object

461

- timeout (float): Selection timeout

462

463

Returns:

464

list: Ready selectors

465

"""

466

```

467

468

## Usage Examples

469

470

### Threading Handler Example

471

472

```python

473

from kazoo.client import KazooClient

474

from kazoo.handlers.threading import SequentialThreadingHandler

475

import threading

476

import time

477

478

# Create client with threading handler (default)

479

handler = SequentialThreadingHandler()

480

zk = KazooClient(hosts='localhost:2181', handler=handler)

481

482

def connection_listener(state):

483

print(f"Connection state changed: {state}")

484

485

zk.add_listener(connection_listener)

486

487

try:

488

zk.start()

489

490

# Perform async operations

491

async_result = zk.create_async("/threading-test", b"test data", makepath=True)

492

493

# Wait for result with timeout

494

try:

495

path = async_result.get(timeout=5.0)

496

print(f"Created path: {path}")

497

except Exception as e:

498

print(f"Creation failed: {e}")

499

500

# Multiple async operations

501

get_result = zk.get_async("/threading-test")

502

exists_result = zk.exists_async("/threading-test")

503

504

# Wait for both results

505

data, stat = get_result.get()

506

exists_stat = exists_result.get()

507

508

print(f"Data: {data}, Exists: {exists_stat is not None}")

509

510

finally:

511

zk.stop()

512

```

513

514

### Gevent Handler Example

515

516

```python

517

from kazoo.client import KazooClient

518

from kazoo.handlers.gevent import SequentialGeventHandler

519

import gevent

520

from gevent import spawn

521

522

# Create client with gevent handler

523

handler = SequentialGeventHandler()

524

zk = KazooClient(hosts='localhost:2181', handler=handler)

525

526

def worker(worker_id, path_base):

527

"""Worker greenlet function."""

528

try:

529

# Each worker creates its own path

530

path = f"{path_base}/worker-{worker_id}"

531

zk.create(path, f"worker {worker_id} data".encode(), makepath=True)

532

533

# Simulate some work

534

gevent.sleep(1)

535

536

# Update data

537

zk.set(path, f"worker {worker_id} updated".encode())

538

539

print(f"Worker {worker_id} completed")

540

541

except Exception as e:

542

print(f"Worker {worker_id} failed: {e}")

543

544

try:

545

zk.start()

546

547

# Create base path

548

zk.create("/gevent-test", b"base", makepath=True)

549

550

# Spawn multiple worker greenlets

551

greenlets = []

552

for i in range(5):

553

g = spawn(worker, i, "/gevent-test")

554

greenlets.append(g)

555

556

# Wait for all workers to complete

557

gevent.joinall(greenlets)

558

559

# List all worker paths

560

children = zk.get_children("/gevent-test")

561

print(f"Created paths: {children}")

562

563

finally:

564

zk.stop()

565

```

566

567

### Eventlet Handler Example

568

569

```python

570

from kazoo.client import KazooClient

571

from kazoo.handlers.eventlet import SequentialEventletHandler

572

import eventlet

573

from eventlet import spawn

574

575

# Create client with eventlet handler

576

handler = SequentialEventletHandler()

577

zk = KazooClient(hosts='localhost:2181', handler=handler)

578

579

def async_worker(path, data):

580

"""Async worker using eventlet."""

581

try:

582

# Create node

583

actual_path = zk.create(path, data.encode(), sequence=True, makepath=True)

584

585

# Simulate async work

586

eventlet.sleep(0.5)

587

588

# Read back data

589

read_data, stat = zk.get(actual_path)

590

print(f"Created {actual_path}: {read_data.decode()}")

591

592

return actual_path

593

594

except Exception as e:

595

print(f"Worker failed for {path}: {e}")

596

return None

597

598

try:

599

zk.start()

600

601

# Create multiple async workers

602

pool = eventlet.GreenPool(10)

603

604

results = []

605

for i in range(10):

606

future = pool.spawn(async_worker, "/eventlet-test/item-", f"data-{i}")

607

results.append(future)

608

609

# Wait for all to complete

610

paths = [future.wait() for future in results]

611

successful_paths = [p for p in paths if p is not None]

612

613

print(f"Successfully created {len(successful_paths)} paths")

614

615

finally:

616

zk.stop()

617

```

618

619

### Custom Handler Configuration

620

621

```python

622

from kazoo.client import KazooClient

623

from kazoo.handlers.threading import SequentialThreadingHandler

624

from kazoo.retry import KazooRetry

625

626

# Configure custom handler with retry policy

627

handler = SequentialThreadingHandler()

628

handler.start()

629

630

# Configure retry policies

631

connection_retry = KazooRetry(max_tries=3, delay=1, backoff=2)

632

command_retry = KazooRetry(max_tries=5, delay=0.1, backoff=1.5)

633

634

# Create client with custom configuration

635

zk = KazooClient(

636

hosts='zk1:2181,zk2:2181,zk3:2181',

637

handler=handler,

638

connection_retry=connection_retry,

639

command_retry=command_retry,

640

timeout=30.0

641

)

642

643

def state_listener(state):

644

print(f"State changed to: {state}")

645

646

zk.add_listener(state_listener)

647

648

try:

649

zk.start(timeout=15)

650

651

# Perform operations with custom handler

652

result = zk.create("/custom-handler-test", b"handler data", makepath=True)

653

print(f"Created with custom handler: {result}")

654

655

finally:

656

zk.stop()

657

handler.stop()

658

```

659

660

### Async Result Patterns

661

662

```python

663

from kazoo.client import KazooClient

664

from kazoo.handlers.threading import AsyncResult

665

import threading

666

import time

667

668

zk = KazooClient()

669

zk.start()

670

671

def result_callback(async_result):

672

"""Callback function for async result."""

673

try:

674

result = async_result.get_nowait()

675

print(f"Callback received result: {result}")

676

except Exception as e:

677

print(f"Callback received exception: {e}")

678

679

try:

680

# Create async operation

681

async_result = zk.create_async("/async-test", b"async data", makepath=True)

682

683

# Link callback to be called when ready

684

async_result.link(result_callback)

685

686

# Wait for result in another thread

687

def wait_for_result():

688

try:

689

path = async_result.get(timeout=10)

690

print(f"Background thread got result: {path}")

691

except Exception as e:

692

print(f"Background thread got exception: {e}")

693

694

thread = threading.Thread(target=wait_for_result)

695

thread.start()

696

697

# Main thread continues other work

698

time.sleep(1)

699

print("Main thread continuing...")

700

701

# Wait for background thread

702

thread.join()

703

704

# Check if result is ready

705

if async_result.ready():

706

print(f"Result is ready: {async_result.successful()}")

707

708

finally:

709

zk.stop()

710

```