0
# Async Handlers
1
2
Pluggable concurrency models supporting different async frameworks. Handlers manage callback execution, timeouts, and async result objects with support for threading, gevent, and eventlet paradigms for scalable concurrent Zookeeper operations.
3
4
## Capabilities
5
6
### Handler Interface
7
8
Base interface and common functionality for all async handler implementations with standardized callback management and async result handling.
9
10
```python { .api }
11
class IHandler:
12
"""Interface for callback handlers."""
13
14
def start(self):
15
"""Start the handler."""
16
17
def stop(self):
18
"""Stop the handler."""
19
20
def select(self, sockets, timeout):
21
"""
22
Select on sockets for I/O readiness.
23
24
Parameters:
25
- sockets (list): List of socket file descriptors
26
- timeout (float): Timeout in seconds
27
28
Returns:
29
tuple: (read_ready, write_ready, error_ready) socket lists
30
"""
31
32
def callback_object(self):
33
"""
34
Create callback result object.
35
36
Returns:
37
IAsyncResult: Async result object
38
"""
39
40
def dispatch_callback(self, callback):
41
"""
42
Dispatch callback for execution.
43
44
Parameters:
45
- callback (Callback): Callback object to execute
46
"""
47
48
class IAsyncResult:
49
"""Interface for async result objects."""
50
51
def ready(self):
52
"""
53
Check if result is ready.
54
55
Returns:
56
bool: True if result is available
57
"""
58
59
def successful(self):
60
"""
61
Check if operation was successful.
62
63
Returns:
64
bool: True if successful, False if exception occurred
65
"""
66
67
def get(self, block=True, timeout=None):
68
"""
69
Get the result.
70
71
Parameters:
72
- block (bool): Block until result is available
73
- timeout (float): Maximum time to wait
74
75
Returns:
76
Result value or raises exception
77
"""
78
79
def get_nowait(self):
80
"""
81
Get result without blocking.
82
83
Returns:
84
Result value or raises exception if not ready
85
"""
86
87
def set(self, value):
88
"""
89
Set the result value.
90
91
Parameters:
92
- value: Result value to set
93
"""
94
95
def set_exception(self, exception):
96
"""
97
Set an exception as the result.
98
99
Parameters:
100
- exception (Exception): Exception to set
101
"""
102
```
103
104
### Threading Handler
105
106
Threading-based async handler using Python's threading module for traditional multi-threaded concurrency with thread-safe callback execution.
107
108
```python { .api }
109
class SequentialThreadingHandler:
110
def __init__(self):
111
"""
112
Threading-based handler for sequential callback execution.
113
114
Uses threading.Thread for concurrent operations and threading
115
primitives for synchronization and callback management.
116
"""
117
118
def start(self):
119
"""Start the threading handler."""
120
121
def stop(self):
122
"""Stop the threading handler and cleanup threads."""
123
124
def select(self, sockets, timeout):
125
"""
126
Socket selection using select.select().
127
128
Parameters:
129
- sockets (list): Socket file descriptors
130
- timeout (float): Selection timeout
131
132
Returns:
133
tuple: Ready socket lists (read, write, error)
134
"""
135
136
def callback_object(self):
137
"""
138
Create threading-based async result.
139
140
Returns:
141
AsyncResult: Threading async result object
142
"""
143
144
def dispatch_callback(self, callback):
145
"""
146
Dispatch callback in thread-safe manner.
147
148
Parameters:
149
- callback (Callback): Callback to execute
150
"""
151
152
def create_connection(self, *args, **kwargs):
153
"""
154
Create socket connection.
155
156
Returns:
157
socket: Connected socket object
158
"""
159
160
class AsyncResult:
161
"""Threading-based async result implementation."""
162
163
def __init__(self):
164
"""Initialize with threading.Event for synchronization."""
165
166
def ready(self):
167
"""Check if result is ready using threading.Event."""
168
169
def successful(self):
170
"""Check if operation succeeded."""
171
172
def get(self, block=True, timeout=None):
173
"""
174
Get result with optional blocking and timeout.
175
176
Uses threading.Event.wait() for blocking behavior.
177
"""
178
179
def get_nowait(self):
180
"""Get result immediately or raise exception."""
181
182
def set(self, value):
183
"""Set result value and notify waiters."""
184
185
def set_exception(self, exception):
186
"""Set exception and notify waiters."""
187
188
class KazooTimeoutError(Exception):
189
"""Timeout exception for threading handler operations."""
190
```
191
192
### Gevent Handler
193
194
Gevent-based async handler using greenlets for cooperative concurrency with gevent's async I/O capabilities and green threading model.
195
196
```python { .api }
197
class SequentialGeventHandler:
198
def __init__(self):
199
"""
200
Gevent-based handler for greenlet concurrency.
201
202
Requires gevent >= 1.2 for proper async I/O support.
203
Uses gevent.select() for socket operations.
204
"""
205
206
def start(self):
207
"""Start gevent handler."""
208
209
def stop(self):
210
"""Stop gevent handler."""
211
212
def select(self, sockets, timeout):
213
"""
214
Socket selection using gevent.select.select().
215
216
Parameters:
217
- sockets (list): Socket file descriptors
218
- timeout (float): Selection timeout
219
220
Returns:
221
tuple: Ready socket lists optimized for gevent
222
"""
223
224
def callback_object(self):
225
"""
226
Create gevent async result.
227
228
Returns:
229
AsyncResult: Gevent-compatible async result
230
"""
231
232
def dispatch_callback(self, callback):
233
"""
234
Dispatch callback using gevent.spawn().
235
236
Parameters:
237
- callback (Callback): Callback for greenlet execution
238
"""
239
240
def create_connection(self, *args, **kwargs):
241
"""
242
Create gevent-compatible socket connection.
243
244
Returns:
245
gevent.socket: Gevent socket object
246
"""
247
248
def create_socket_pair(self):
249
"""
250
Create gevent socket pair for communication.
251
252
Returns:
253
tuple: (socket1, socket2) gevent socket pair
254
"""
255
```
256
257
### Eventlet Handler
258
259
Eventlet-based async handler using green threads for cooperative concurrency with eventlet's async I/O and green threading capabilities.
260
261
```python { .api }
262
class SequentialEventletHandler:
263
def __init__(self):
264
"""
265
Eventlet-based handler for green thread concurrency.
266
267
Requires eventlet >= 0.17.1 for proper async support.
268
Uses eventlet.select() for socket operations.
269
"""
270
271
def start(self):
272
"""Start eventlet handler."""
273
274
def stop(self):
275
"""Stop eventlet handler."""
276
277
def select(self, sockets, timeout):
278
"""
279
Socket selection using eventlet.select.select().
280
281
Parameters:
282
- sockets (list): Socket file descriptors
283
- timeout (float): Selection timeout
284
285
Returns:
286
tuple: Ready socket lists for eventlet
287
"""
288
289
def callback_object(self):
290
"""
291
Create eventlet async result.
292
293
Returns:
294
AsyncResult: Eventlet-compatible async result
295
"""
296
297
def dispatch_callback(self, callback):
298
"""
299
Dispatch callback using eventlet.spawn().
300
301
Parameters:
302
- callback (Callback): Callback for green thread execution
303
"""
304
305
def create_connection(self, *args, **kwargs):
306
"""
307
Create eventlet-compatible socket connection.
308
309
Returns:
310
eventlet.green.socket: Eventlet socket object
311
"""
312
313
class AsyncResult:
314
"""Eventlet async result implementation."""
315
316
def __init__(self):
317
"""Initialize with eventlet.Event for coordination."""
318
319
def ready(self):
320
"""Check if result is ready using eventlet primitives."""
321
322
def successful(self):
323
"""Check operation success status."""
324
325
def get(self, block=True, timeout=None):
326
"""
327
Get result with eventlet timeout support.
328
329
Uses eventlet.timeout.Timeout for timeout handling.
330
"""
331
332
def get_nowait(self):
333
"""Get result without blocking in green thread."""
334
335
def set(self, value):
336
"""Set result and wake waiting green threads."""
337
338
def set_exception(self, exception):
339
"""Set exception and notify green threads."""
340
341
class TimeoutError(Exception):
342
"""Timeout exception for eventlet handler operations."""
343
```
344
345
### Handler Utilities
346
347
Utility functions and base classes supporting all handler implementations with common socket operations and async result patterns.
348
349
```python { .api }
350
class AsyncResult:
351
"""Base async result implementation."""
352
353
def __init__(self):
354
"""Initialize base async result."""
355
356
def ready(self):
357
"""Check if result is available."""
358
359
def successful(self):
360
"""Check if operation was successful."""
361
362
def get(self, block=True, timeout=None):
363
"""Get result value with optional timeout."""
364
365
def get_nowait(self):
366
"""Get result without blocking."""
367
368
def set(self, value):
369
"""Set the result value."""
370
371
def set_exception(self, exception):
372
"""Set an exception as result."""
373
374
def link(self, callback):
375
"""
376
Link callback to be called when result is ready.
377
378
Parameters:
379
- callback (callable): Function to call when ready
380
"""
381
382
def unlink(self, callback):
383
"""
384
Unlink previously linked callback.
385
386
Parameters:
387
- callback (callable): Callback to remove
388
"""
389
390
def create_socket_pair():
391
"""
392
Create connected socket pair for communication.
393
394
Returns:
395
tuple: (socket1, socket2) connected socket pair
396
"""
397
398
def create_tcp_socket(module):
399
"""
400
Create TCP socket using specified socket module.
401
402
Parameters:
403
- module: Socket module (socket, gevent.socket, etc.)
404
405
Returns:
406
socket: TCP socket object
407
"""
408
409
def create_tcp_connection(module, address, timeout=None):
410
"""
411
Create TCP connection to address.
412
413
Parameters:
414
- module: Socket module to use
415
- address (tuple): (host, port) tuple
416
- timeout (float): Connection timeout
417
418
Returns:
419
socket: Connected socket
420
"""
421
422
def capture_exceptions(async_object):
423
"""
424
Decorator to capture exceptions in async operations.
425
426
Parameters:
427
- async_object: Async result object
428
429
Returns:
430
Decorator function
431
"""
432
433
def wrap(async_object):
434
"""
435
Decorator to wrap function with async result.
436
437
Parameters:
438
- async_object: Async result object
439
440
Returns:
441
Decorator function
442
"""
443
444
def fileobj_to_fd(fileobj):
445
"""
446
Convert file object to file descriptor.
447
448
Parameters:
449
- fileobj: File-like object
450
451
Returns:
452
int: File descriptor
453
"""
454
455
def selector_select(selector, timeout):
456
"""
457
Select using selectors module.
458
459
Parameters:
460
- selector: Selector object
461
- timeout (float): Selection timeout
462
463
Returns:
464
list: Ready selectors
465
"""
466
```
467
468
## Usage Examples
469
470
### Threading Handler Example
471
472
```python
473
from kazoo.client import KazooClient
474
from kazoo.handlers.threading import SequentialThreadingHandler
475
import threading
476
import time
477
478
# Create client with threading handler (default)
479
handler = SequentialThreadingHandler()
480
zk = KazooClient(hosts='localhost:2181', handler=handler)
481
482
def connection_listener(state):
483
print(f"Connection state changed: {state}")
484
485
zk.add_listener(connection_listener)
486
487
try:
488
zk.start()
489
490
# Perform async operations
491
async_result = zk.create_async("/threading-test", b"test data", makepath=True)
492
493
# Wait for result with timeout
494
try:
495
path = async_result.get(timeout=5.0)
496
print(f"Created path: {path}")
497
except Exception as e:
498
print(f"Creation failed: {e}")
499
500
# Multiple async operations
501
get_result = zk.get_async("/threading-test")
502
exists_result = zk.exists_async("/threading-test")
503
504
# Wait for both results
505
data, stat = get_result.get()
506
exists_stat = exists_result.get()
507
508
print(f"Data: {data}, Exists: {exists_stat is not None}")
509
510
finally:
511
zk.stop()
512
```
513
514
### Gevent Handler Example
515
516
```python
517
from kazoo.client import KazooClient
518
from kazoo.handlers.gevent import SequentialGeventHandler
519
import gevent
520
from gevent import spawn
521
522
# Create client with gevent handler
523
handler = SequentialGeventHandler()
524
zk = KazooClient(hosts='localhost:2181', handler=handler)
525
526
def worker(worker_id, path_base):
527
"""Worker greenlet function."""
528
try:
529
# Each worker creates its own path
530
path = f"{path_base}/worker-{worker_id}"
531
zk.create(path, f"worker {worker_id} data".encode(), makepath=True)
532
533
# Simulate some work
534
gevent.sleep(1)
535
536
# Update data
537
zk.set(path, f"worker {worker_id} updated".encode())
538
539
print(f"Worker {worker_id} completed")
540
541
except Exception as e:
542
print(f"Worker {worker_id} failed: {e}")
543
544
try:
545
zk.start()
546
547
# Create base path
548
zk.create("/gevent-test", b"base", makepath=True)
549
550
# Spawn multiple worker greenlets
551
greenlets = []
552
for i in range(5):
553
g = spawn(worker, i, "/gevent-test")
554
greenlets.append(g)
555
556
# Wait for all workers to complete
557
gevent.joinall(greenlets)
558
559
# List all worker paths
560
children = zk.get_children("/gevent-test")
561
print(f"Created paths: {children}")
562
563
finally:
564
zk.stop()
565
```
566
567
### Eventlet Handler Example
568
569
```python
570
from kazoo.client import KazooClient
571
from kazoo.handlers.eventlet import SequentialEventletHandler
572
import eventlet
573
from eventlet import spawn
574
575
# Create client with eventlet handler
576
handler = SequentialEventletHandler()
577
zk = KazooClient(hosts='localhost:2181', handler=handler)
578
579
def async_worker(path, data):
580
"""Async worker using eventlet."""
581
try:
582
# Create node
583
actual_path = zk.create(path, data.encode(), sequence=True, makepath=True)
584
585
# Simulate async work
586
eventlet.sleep(0.5)
587
588
# Read back data
589
read_data, stat = zk.get(actual_path)
590
print(f"Created {actual_path}: {read_data.decode()}")
591
592
return actual_path
593
594
except Exception as e:
595
print(f"Worker failed for {path}: {e}")
596
return None
597
598
try:
599
zk.start()
600
601
# Create multiple async workers
602
pool = eventlet.GreenPool(10)
603
604
results = []
605
for i in range(10):
606
future = pool.spawn(async_worker, "/eventlet-test/item-", f"data-{i}")
607
results.append(future)
608
609
# Wait for all to complete
610
paths = [future.wait() for future in results]
611
successful_paths = [p for p in paths if p is not None]
612
613
print(f"Successfully created {len(successful_paths)} paths")
614
615
finally:
616
zk.stop()
617
```
618
619
### Custom Handler Configuration
620
621
```python
622
from kazoo.client import KazooClient
623
from kazoo.handlers.threading import SequentialThreadingHandler
624
from kazoo.retry import KazooRetry
625
626
# Configure custom handler with retry policy
627
handler = SequentialThreadingHandler()
628
handler.start()
629
630
# Configure retry policies
631
connection_retry = KazooRetry(max_tries=3, delay=1, backoff=2)
632
command_retry = KazooRetry(max_tries=5, delay=0.1, backoff=1.5)
633
634
# Create client with custom configuration
635
zk = KazooClient(
636
hosts='zk1:2181,zk2:2181,zk3:2181',
637
handler=handler,
638
connection_retry=connection_retry,
639
command_retry=command_retry,
640
timeout=30.0
641
)
642
643
def state_listener(state):
644
print(f"State changed to: {state}")
645
646
zk.add_listener(state_listener)
647
648
try:
649
zk.start(timeout=15)
650
651
# Perform operations with custom handler
652
result = zk.create("/custom-handler-test", b"handler data", makepath=True)
653
print(f"Created with custom handler: {result}")
654
655
finally:
656
zk.stop()
657
handler.stop()
658
```
659
660
### Async Result Patterns
661
662
```python
663
from kazoo.client import KazooClient
664
from kazoo.handlers.threading import AsyncResult
665
import threading
666
import time
667
668
zk = KazooClient()
669
zk.start()
670
671
def result_callback(async_result):
672
"""Callback function for async result."""
673
try:
674
result = async_result.get_nowait()
675
print(f"Callback received result: {result}")
676
except Exception as e:
677
print(f"Callback received exception: {e}")
678
679
try:
680
# Create async operation
681
async_result = zk.create_async("/async-test", b"async data", makepath=True)
682
683
# Link callback to be called when ready
684
async_result.link(result_callback)
685
686
# Wait for result in another thread
687
def wait_for_result():
688
try:
689
path = async_result.get(timeout=10)
690
print(f"Background thread got result: {path}")
691
except Exception as e:
692
print(f"Background thread got exception: {e}")
693
694
thread = threading.Thread(target=wait_for_result)
695
thread.start()
696
697
# Main thread continues other work
698
time.sleep(1)
699
print("Main thread continuing...")
700
701
# Wait for background thread
702
thread.join()
703
704
# Check if result is ready
705
if async_result.ready():
706
print(f"Result is ready: {async_result.successful()}")
707
708
finally:
709
zk.stop()
710
```