0
# Synchronization
1
2
Thread-safe synchronization primitives for coordinating between greenthreads, including events, semaphores, queues, and timeouts.
3
4
## Capabilities
5
6
### Event Synchronization
7
8
Events provide a way for greenthreads to wait for and signal the completion of operations.
9
10
```python { .api }
11
class Event:
12
"""
13
A synchronization primitive that allows greenthreads to wait for an event
14
to occur and to signal that event to waiting greenthreads.
15
"""
16
17
def __init__(self):
18
"""Create a new Event object."""
19
20
def send(self, result=None):
21
"""
22
Signal the event and wake all waiting greenthreads.
23
24
Parameters:
25
- result: value to send to waiting greenthreads
26
27
Returns:
28
None
29
"""
30
31
def wait(self):
32
"""
33
Block until another greenthread calls send().
34
35
Returns:
36
The result value passed to send(), if any
37
"""
38
39
def ready(self):
40
"""
41
Check if the event has been sent.
42
43
Returns:
44
bool: True if send() has been called, False otherwise
45
"""
46
47
def reset(self):
48
"""
49
Reset the event to unsent state.
50
51
Returns:
52
None
53
"""
54
55
def has_exception(self):
56
"""
57
Check if the event was sent with an exception.
58
59
Returns:
60
bool: True if event has an exception, False otherwise
61
"""
62
63
def has_result(self):
64
"""
65
Check if the event was sent with a result (not an exception).
66
67
Returns:
68
bool: True if event has a result value, False otherwise
69
"""
70
71
def poll(self, notready=None):
72
"""
73
Return the result if ready, otherwise return notready value.
74
75
Parameters:
76
- notready: value to return if event is not ready
77
78
Returns:
79
Event result if ready, notready value otherwise
80
"""
81
82
def poll_exception(self, notready=None):
83
"""
84
Return the exception if available, otherwise return notready value.
85
86
Parameters:
87
- notready: value to return if no exception is available
88
89
Returns:
90
Exception if available, notready value otherwise
91
"""
92
93
def poll_result(self, notready=None):
94
"""
95
Return the result if available (and not an exception), otherwise return notready value.
96
97
Parameters:
98
- notready: value to return if no result is available
99
100
Returns:
101
Result value if available, notready value otherwise
102
"""
103
104
def send_exception(self, *args):
105
"""
106
Signal the event with an exception to wake waiting greenthreads.
107
108
Parameters:
109
- *args: exception arguments (same as for raise statement)
110
111
Returns:
112
None
113
"""
114
```
115
116
### Semaphore Controls
117
118
Semaphores control access to resources by maintaining a counter that tracks available resources.
119
120
```python { .api }
121
class Semaphore:
122
"""
123
A semaphore manages a counter representing number of release() calls
124
minus the number of acquire() calls, plus an initial value.
125
"""
126
127
def __init__(self, value=1):
128
"""
129
Create a semaphore.
130
131
Parameters:
132
- value: int, initial counter value (default: 1)
133
"""
134
135
def acquire(self, blocking=True):
136
"""
137
Acquire the semaphore, decrementing the counter.
138
139
Parameters:
140
- blocking: bool, whether to block if counter is zero
141
142
Returns:
143
bool: True if acquired, False if non-blocking and unavailable
144
"""
145
146
def release(self):
147
"""
148
Release the semaphore, incrementing the counter.
149
150
Returns:
151
None
152
"""
153
154
def locked(self):
155
"""
156
Check if the semaphore counter is zero.
157
158
Returns:
159
bool: True if counter is zero, False otherwise
160
"""
161
162
@property
163
def balance(self):
164
"""
165
Get the current counter value.
166
167
Returns:
168
int: current semaphore counter value
169
"""
170
171
class BoundedSemaphore(Semaphore):
172
"""
173
A semaphore that prevents release() from incrementing counter above
174
the initial value.
175
"""
176
177
def __init__(self, value=1):
178
"""
179
Create a bounded semaphore.
180
181
Parameters:
182
- value: int, initial and maximum counter value
183
"""
184
185
def release(self):
186
"""
187
Release the semaphore if counter is below initial value.
188
189
Raises:
190
ValueError: if counter would exceed initial value
191
"""
192
193
class CappedSemaphore:
194
"""
195
A blockingly bounded semaphore with configurable upper and lower bounds.
196
"""
197
198
def __init__(self, balance, lower_bound=None, upper_bound=None):
199
"""
200
Create a capped semaphore.
201
202
Parameters:
203
- balance: int, initial counter value
204
- lower_bound: int, minimum counter value (default: 0)
205
- upper_bound: int, maximum counter value (default: balance)
206
"""
207
208
def acquire(self, blocking=True):
209
"""
210
Acquire the semaphore, potentially blocking on lower bound.
211
212
Parameters:
213
- blocking: bool, whether to block if at lower bound
214
215
Returns:
216
bool: True if acquired, False if non-blocking and at bound
217
"""
218
219
def release(self, blocking=True):
220
"""
221
Release the semaphore, potentially blocking on upper bound.
222
223
Parameters:
224
- blocking: bool, whether to block if at upper bound
225
226
Returns:
227
bool: True if released, False if non-blocking and at bound
228
"""
229
```
230
231
### Queue Operations
232
233
Thread-safe queues for passing data between greenthreads with various ordering semantics.
234
235
```python { .api }
236
class Queue:
237
"""
238
Multi-producer multi-consumer queue for greenthreads with task tracking.
239
"""
240
241
def __init__(self, maxsize=0):
242
"""
243
Create a queue.
244
245
Parameters:
246
- maxsize: int, maximum queue size (0 = unlimited)
247
"""
248
249
def put(self, item, block=True, timeout=None):
250
"""
251
Put an item into the queue.
252
253
Parameters:
254
- item: object to put in queue
255
- block: bool, whether to block if queue is full
256
- timeout: float, maximum time to wait if blocking
257
258
Raises:
259
Full: if queue is full and non-blocking or timeout exceeded
260
"""
261
262
def get(self, block=True, timeout=None):
263
"""
264
Get an item from the queue.
265
266
Parameters:
267
- block: bool, whether to block if queue is empty
268
- timeout: float, maximum time to wait if blocking
269
270
Returns:
271
Item from the queue
272
273
Raises:
274
Empty: if queue is empty and non-blocking or timeout exceeded
275
"""
276
277
def put_nowait(self, item):
278
"""
279
Put item without blocking.
280
281
Raises:
282
Full: if queue is full
283
"""
284
285
def get_nowait(self):
286
"""
287
Get item without blocking.
288
289
Returns:
290
Item from queue
291
292
Raises:
293
Empty: if queue is empty
294
"""
295
296
def task_done(self):
297
"""
298
Indicate that a formerly enqueued task is complete.
299
"""
300
301
def join(self):
302
"""
303
Block until all items in the queue have been processed.
304
"""
305
306
def qsize(self):
307
"""
308
Return approximate queue size.
309
310
Returns:
311
int: number of items in queue
312
"""
313
314
def empty(self):
315
"""
316
Check if queue is empty.
317
318
Returns:
319
bool: True if empty, False otherwise
320
"""
321
322
def full(self):
323
"""
324
Check if queue is full.
325
326
Returns:
327
bool: True if full, False otherwise
328
"""
329
330
class PriorityQueue(Queue):
331
"""
332
A queue where items are ordered by priority (lowest first).
333
Items should be comparable or tuples of (priority, item).
334
"""
335
336
class LifoQueue(Queue):
337
"""
338
A Last-In-First-Out (LIFO) queue implementation.
339
"""
340
341
class LightQueue:
342
"""
343
A faster queue implementation without task_done() and join() support.
344
"""
345
346
def __init__(self, maxsize=0):
347
"""
348
Create a light queue.
349
350
Parameters:
351
- maxsize: int, maximum queue size (0 = unlimited)
352
"""
353
354
def put(self, item, block=True, timeout=None):
355
"""Put an item into the queue."""
356
357
def get(self, block=True, timeout=None):
358
"""Get an item from the queue."""
359
```
360
361
### Timeout Management
362
363
Context managers and utilities for implementing timeouts in greenthread operations.
364
365
```python { .api }
366
class Timeout:
367
"""
368
A timeout context manager that raises an exception after a delay.
369
"""
370
371
def __init__(self, seconds=None, exception=None):
372
"""
373
Create a timeout.
374
375
Parameters:
376
- seconds: float, timeout duration in seconds
377
- exception: exception instance to raise (default: Timeout)
378
"""
379
380
def __enter__(self):
381
"""
382
Start the timeout context.
383
384
Returns:
385
self
386
"""
387
388
def __exit__(self, typ, value, tb):
389
"""
390
Exit the timeout context and cancel timeout if active.
391
392
Returns:
393
bool: False to propagate exceptions, True to suppress timeout
394
"""
395
396
def cancel(self):
397
"""
398
Cancel the timeout if it hasn't triggered yet.
399
"""
400
401
def with_timeout(seconds, func, *args, **kwargs):
402
"""
403
Call func with timeout protection.
404
405
Parameters:
406
- seconds: float, timeout duration
407
- func: callable to invoke
408
- *args: positional arguments for func
409
- **kwargs: keyword arguments for func
410
411
Returns:
412
Return value of func
413
414
Raises:
415
Timeout: if func doesn't complete within timeout
416
"""
417
418
def is_timeout(obj):
419
"""
420
Check if an object is a timeout exception.
421
422
Parameters:
423
- obj: object to check
424
425
Returns:
426
bool: True if obj is timeout-related exception
427
"""
428
429
def wrap_is_timeout(base_exception):
430
"""
431
Add is_timeout=True attribute to exceptions created by base function.
432
433
Parameters:
434
- base_exception: exception class or instance
435
436
Returns:
437
Wrapped exception with is_timeout attribute
438
"""
439
```
440
441
## Usage Examples
442
443
```python
444
import eventlet
445
446
# Event synchronization example
447
event = eventlet.Event()
448
449
def waiter():
450
print("Waiting for event...")
451
result = event.wait()
452
print(f"Got result: {result}")
453
454
def sender():
455
eventlet.sleep(2) # Simulate work
456
event.send("Hello from sender!")
457
458
eventlet.spawn(waiter)
459
eventlet.spawn(sender)
460
461
# Semaphore example
462
sem = eventlet.Semaphore(2) # Allow 2 concurrent accesses
463
464
def worker(name):
465
with sem: # Acquire on enter, release on exit
466
print(f"Worker {name} accessing resource")
467
eventlet.sleep(1)
468
print(f"Worker {name} done")
469
470
for i in range(5):
471
eventlet.spawn(worker, i)
472
473
# Queue example
474
q = eventlet.Queue()
475
476
def producer():
477
for i in range(5):
478
q.put(f"item {i}")
479
eventlet.sleep(0.1)
480
481
def consumer():
482
while True:
483
item = q.get()
484
print(f"Got: {item}")
485
q.task_done()
486
487
eventlet.spawn(producer)
488
eventlet.spawn(consumer)
489
q.join() # Wait for all tasks to complete
490
491
# Timeout example
492
try:
493
with eventlet.Timeout(5.0):
494
# This will timeout after 5 seconds
495
eventlet.sleep(10)
496
except eventlet.Timeout:
497
print("Operation timed out!")
498
499
# Function with timeout
500
try:
501
result = eventlet.with_timeout(2.0, slow_function, arg1, arg2)
502
except eventlet.Timeout:
503
print("Function call timed out!")
504
```
505
506
## Exception Types
507
508
```python { .api }
509
class Full(Exception):
510
"""Raised when a queue is full and put() is called with block=False."""
511
pass
512
513
class Empty(Exception):
514
"""Raised when a queue is empty and get() is called with block=False."""
515
pass
516
```