0
# Semaphores and Resource Management
1
2
Bounded semaphores for limiting concurrent processes accessing shared resources, with support for named semaphores across process boundaries. These classes help coordinate multiple processes by allowing only a specified number to access a resource simultaneously.
3
4
## Capabilities
5
6
### NamedBoundedSemaphore Class
7
8
The recommended semaphore implementation that provides named bounded semaphores for cross-process resource coordination.
9
10
```python { .api }
11
class NamedBoundedSemaphore:
12
"""
13
Named bounded semaphore for limiting concurrent processes.
14
15
Parameters:
16
- maximum: Maximum number of concurrent processes allowed
17
- name: Semaphore name for cross-process coordination (auto-generated if None)
18
- filename_pattern: Pattern for lock filenames (default: '{name}.{number:02d}.lock')
19
- directory: Directory for lock files (default: system temp directory)
20
- timeout: Timeout when trying to acquire semaphore (default: 5.0)
21
- check_interval: Check interval while waiting (default: 0.25)
22
- fail_when_locked: Fail immediately if no slots available (default: True)
23
"""
24
25
def __init__(self, maximum: int, name: str | None = None,
26
filename_pattern: str = '{name}.{number:02d}.lock',
27
directory: str = tempfile.gettempdir(), timeout: float | None = 5.0,
28
check_interval: float | None = 0.25, fail_when_locked: bool | None = True) -> None: ...
29
30
def acquire(self, timeout: float | None = None, check_interval: float | None = None,
31
fail_when_locked: bool | None = None) -> Lock | None:
32
"""
33
Acquire a semaphore slot.
34
35
Returns:
36
- Lock object for the acquired slot, or None if acquisition failed
37
38
Raises:
39
- AlreadyLocked: If no slots available and fail_when_locked=True
40
"""
41
42
def release(self) -> None:
43
"""Release the currently held semaphore slot"""
44
45
def get_filenames(self) -> typing.Sequence[pathlib.Path]:
46
"""Get all possible lock filenames for this semaphore"""
47
48
def get_random_filenames(self) -> typing.Sequence[pathlib.Path]:
49
"""Get lock filenames in random order (for load balancing)"""
50
51
def __enter__(self) -> Lock:
52
"""Context manager entry - acquire semaphore slot"""
53
54
def __exit__(self, exc_type, exc_value, traceback) -> None:
55
"""Context manager exit - release semaphore slot"""
56
```
57
58
### BoundedSemaphore Class (Deprecated)
59
60
The original semaphore implementation, deprecated in favor of NamedBoundedSemaphore.
61
62
```python { .api }
63
class BoundedSemaphore:
64
"""
65
DEPRECATED: Use NamedBoundedSemaphore instead.
66
67
Bounded semaphore with potential naming conflicts between unrelated processes.
68
"""
69
70
def __init__(self, maximum: int, name: str = 'bounded_semaphore',
71
filename_pattern: str = '{name}.{number:02d}.lock',
72
directory: str = tempfile.gettempdir(), **kwargs) -> None: ...
73
```
74
75
### Usage Examples
76
77
Basic semaphore usage for limiting concurrent processes:
78
79
```python
80
import portalocker
81
82
# Allow maximum 3 concurrent processes
83
with portalocker.NamedBoundedSemaphore(3, name='database_workers') as lock:
84
# Only 3 processes can be in this block simultaneously
85
print("Processing database batch...")
86
process_database_batch()
87
print("Batch completed")
88
# Semaphore slot released automatically
89
```
90
91
Manual semaphore management:
92
93
```python
94
import portalocker
95
96
# Create semaphore for 2 concurrent downloaders
97
semaphore = portalocker.NamedBoundedSemaphore(2, name='file_downloaders')
98
99
try:
100
# Try to acquire a slot
101
lock = semaphore.acquire(timeout=10.0)
102
if lock:
103
print("Starting download...")
104
download_large_file()
105
print("Download completed")
106
else:
107
print("No download slots available")
108
finally:
109
if lock:
110
semaphore.release()
111
```
112
113
Resource pool management:
114
115
```python
116
import portalocker
117
import time
118
119
def worker_process(worker_id: int):
120
"""Worker that processes items with limited concurrency"""
121
122
# Limit to 5 concurrent workers across all processes
123
semaphore = portalocker.NamedBoundedSemaphore(
124
maximum=5,
125
name='worker_pool',
126
timeout=30.0 # Wait up to 30 seconds for a slot
127
)
128
129
try:
130
with semaphore:
131
print(f"Worker {worker_id} starting...")
132
133
# Simulate work that should be limited
134
process_cpu_intensive_task()
135
136
print(f"Worker {worker_id} completed")
137
138
except portalocker.AlreadyLocked:
139
print(f"Worker {worker_id} could not get a slot - too many workers running")
140
141
# Start multiple worker processes
142
for i in range(10):
143
worker_process(i)
144
```
145
146
Custom lock file locations and patterns:
147
148
```python
149
import portalocker
150
151
# Custom semaphore with specific lock file pattern
152
semaphore = portalocker.NamedBoundedSemaphore(
153
maximum=4,
154
name='video_processors',
155
filename_pattern='video_{name}_slot_{number:03d}.lock',
156
directory='/var/lock/myapp',
157
timeout=60.0
158
)
159
160
with semaphore:
161
# Lock files will be created like:
162
# /var/lock/myapp/video_video_processors_slot_000.lock
163
# /var/lock/myapp/video_video_processors_slot_001.lock
164
# etc.
165
process_video_file()
166
```
167
168
Cross-process coordination:
169
170
```python
171
import portalocker
172
import multiprocessing
173
import os
174
175
def worker_function(worker_id):
176
"""Function to run in separate processes"""
177
178
# Same semaphore name ensures coordination across processes
179
with portalocker.NamedBoundedSemaphore(2, name='shared_resource') as lock:
180
print(f"Process {os.getpid()} (worker {worker_id}) acquired resource")
181
182
# Simulate resource usage
183
time.sleep(2)
184
185
print(f"Process {os.getpid()} (worker {worker_id}) releasing resource")
186
187
# Start multiple processes - only 2 will run concurrently
188
processes = []
189
for i in range(6):
190
p = multiprocessing.Process(target=worker_function, args=(i,))
191
p.start()
192
processes.append(p)
193
194
# Wait for all processes to complete
195
for p in processes:
196
p.join()
197
```
198
199
Fail-fast behavior for non-blocking semaphores:
200
201
```python
202
import portalocker
203
204
# Create semaphore that fails immediately if no slots available
205
semaphore = portalocker.NamedBoundedSemaphore(
206
maximum=1,
207
name='singleton_process',
208
fail_when_locked=True,
209
timeout=0 # Don't wait
210
)
211
212
try:
213
with semaphore:
214
# This will only succeed if no other instance is running
215
print("Running singleton process...")
216
run_singleton_task()
217
except portalocker.AlreadyLocked:
218
print("Another instance is already running. Exiting.")
219
exit(1)
220
```
221
222
## Load Balancing
223
224
Semaphores automatically randomize lock file order to distribute load:
225
226
```python
227
import portalocker
228
229
# The semaphore will try lock files in random order
230
# to avoid all processes competing for the same slot
231
semaphore = portalocker.NamedBoundedSemaphore(10, name='load_balanced_workers')
232
233
with semaphore:
234
# Processes will naturally distribute across available slots
235
process_work_item()
236
```
237
238
## Error Handling
239
240
Semaphores use the same exception types as other portalocker components:
241
242
```python
243
import portalocker
244
245
try:
246
with portalocker.NamedBoundedSemaphore(3, name='workers') as lock:
247
do_work()
248
except portalocker.AlreadyLocked:
249
print("All semaphore slots are currently in use")
250
except portalocker.LockException as e:
251
print(f"Semaphore error: {e}")
252
except Exception as e:
253
print(f"Unexpected error: {e}")
254
```
255
256
## Migration from BoundedSemaphore
257
258
To migrate from deprecated BoundedSemaphore to NamedBoundedSemaphore:
259
260
```python
261
# Old (deprecated)
262
semaphore = portalocker.BoundedSemaphore(5, name='my_workers')
263
264
# New (recommended)
265
semaphore = portalocker.NamedBoundedSemaphore(5, name='my_workers')
266
267
# The interface is identical, just change the class name
268
```
269
270
## Type Definitions
271
272
```python { .api }
273
import pathlib
274
import tempfile
275
import typing
276
277
# Filename types
278
Filename = Union[str, pathlib.Path]
279
280
# Default values
281
DEFAULT_TIMEOUT = 5.0
282
DEFAULT_CHECK_INTERVAL = 0.25
283
284
# Semaphore uses Lock objects internally
285
from portalocker.utils import Lock
286
```