or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

file-locking.mdindex.mdlock-classes.mdredis-locking.mdsemaphores.mdutilities.md

semaphores.mddocs/

0

# Semaphores and Resource Management

1

2

Bounded semaphores for limiting concurrent processes accessing shared resources, with support for named semaphores across process boundaries. These classes help coordinate multiple processes by allowing only a specified number to access a resource simultaneously.

3

4

## Capabilities

5

6

### NamedBoundedSemaphore Class

7

8

The recommended semaphore implementation that provides named bounded semaphores for cross-process resource coordination.

9

10

```python { .api }

11

class NamedBoundedSemaphore:

12

"""

13

Named bounded semaphore for limiting concurrent processes.

14

15

Parameters:

16

- maximum: Maximum number of concurrent processes allowed

17

- name: Semaphore name for cross-process coordination (auto-generated if None)

18

- filename_pattern: Pattern for lock filenames (default: '{name}.{number:02d}.lock')

19

- directory: Directory for lock files (default: system temp directory)

20

- timeout: Timeout when trying to acquire semaphore (default: 5.0)

21

- check_interval: Check interval while waiting (default: 0.25)

22

- fail_when_locked: Fail immediately if no slots available (default: True)

23

"""

24

25

def __init__(self, maximum: int, name: str | None = None,

26

filename_pattern: str = '{name}.{number:02d}.lock',

27

directory: str = tempfile.gettempdir(), timeout: float | None = 5.0,

28

check_interval: float | None = 0.25, fail_when_locked: bool | None = True) -> None: ...

29

30

def acquire(self, timeout: float | None = None, check_interval: float | None = None,

31

fail_when_locked: bool | None = None) -> Lock | None:

32

"""

33

Acquire a semaphore slot.

34

35

Returns:

36

- Lock object for the acquired slot, or None if acquisition failed

37

38

Raises:

39

- AlreadyLocked: If no slots available and fail_when_locked=True

40

"""

41

42

def release(self) -> None:

43

"""Release the currently held semaphore slot"""

44

45

def get_filenames(self) -> typing.Sequence[pathlib.Path]:

46

"""Get all possible lock filenames for this semaphore"""

47

48

def get_random_filenames(self) -> typing.Sequence[pathlib.Path]:

49

"""Get lock filenames in random order (for load balancing)"""

50

51

def __enter__(self) -> Lock:

52

"""Context manager entry - acquire semaphore slot"""

53

54

def __exit__(self, exc_type, exc_value, traceback) -> None:

55

"""Context manager exit - release semaphore slot"""

56

```

57

58

### BoundedSemaphore Class (Deprecated)

59

60

The original semaphore implementation, deprecated in favor of NamedBoundedSemaphore.

61

62

```python { .api }

63

class BoundedSemaphore:

64

"""

65

DEPRECATED: Use NamedBoundedSemaphore instead.

66

67

Bounded semaphore with potential naming conflicts between unrelated processes.

68

"""

69

70

def __init__(self, maximum: int, name: str = 'bounded_semaphore',

71

filename_pattern: str = '{name}.{number:02d}.lock',

72

directory: str = tempfile.gettempdir(), **kwargs) -> None: ...

73

```

74

75

### Usage Examples

76

77

Basic semaphore usage for limiting concurrent processes:

78

79

```python

80

import portalocker

81

82

# Allow maximum 3 concurrent processes

83

with portalocker.NamedBoundedSemaphore(3, name='database_workers') as lock:

84

# Only 3 processes can be in this block simultaneously

85

print("Processing database batch...")

86

process_database_batch()

87

print("Batch completed")

88

# Semaphore slot released automatically

89

```

90

91

Manual semaphore management:

92

93

```python

94

import portalocker

95

96

# Create semaphore for 2 concurrent downloaders

97

semaphore = portalocker.NamedBoundedSemaphore(2, name='file_downloaders')

98

99

try:

100

# Try to acquire a slot

101

lock = semaphore.acquire(timeout=10.0)

102

if lock:

103

print("Starting download...")

104

download_large_file()

105

print("Download completed")

106

else:

107

print("No download slots available")

108

finally:

109

if lock:

110

semaphore.release()

111

```

112

113

Resource pool management:

114

115

```python

116

import portalocker

117

import time

118

119

def worker_process(worker_id: int):

120

"""Worker that processes items with limited concurrency"""

121

122

# Limit to 5 concurrent workers across all processes

123

semaphore = portalocker.NamedBoundedSemaphore(

124

maximum=5,

125

name='worker_pool',

126

timeout=30.0 # Wait up to 30 seconds for a slot

127

)

128

129

try:

130

with semaphore:

131

print(f"Worker {worker_id} starting...")

132

133

# Simulate work that should be limited

134

process_cpu_intensive_task()

135

136

print(f"Worker {worker_id} completed")

137

138

except portalocker.AlreadyLocked:

139

print(f"Worker {worker_id} could not get a slot - too many workers running")

140

141

# Start multiple worker processes

142

for i in range(10):

143

worker_process(i)

144

```

145

146

Custom lock file locations and patterns:

147

148

```python

149

import portalocker

150

151

# Custom semaphore with specific lock file pattern

152

semaphore = portalocker.NamedBoundedSemaphore(

153

maximum=4,

154

name='video_processors',

155

filename_pattern='video_{name}_slot_{number:03d}.lock',

156

directory='/var/lock/myapp',

157

timeout=60.0

158

)

159

160

with semaphore:

161

# Lock files will be created like:

162

# /var/lock/myapp/video_video_processors_slot_000.lock

163

# /var/lock/myapp/video_video_processors_slot_001.lock

164

# etc.

165

process_video_file()

166

```

167

168

Cross-process coordination:

169

170

```python

171

import portalocker

172

import multiprocessing

173

import os

174

175

def worker_function(worker_id):

176

"""Function to run in separate processes"""

177

178

# Same semaphore name ensures coordination across processes

179

with portalocker.NamedBoundedSemaphore(2, name='shared_resource') as lock:

180

print(f"Process {os.getpid()} (worker {worker_id}) acquired resource")

181

182

# Simulate resource usage

183

time.sleep(2)

184

185

print(f"Process {os.getpid()} (worker {worker_id}) releasing resource")

186

187

# Start multiple processes - only 2 will run concurrently

188

processes = []

189

for i in range(6):

190

p = multiprocessing.Process(target=worker_function, args=(i,))

191

p.start()

192

processes.append(p)

193

194

# Wait for all processes to complete

195

for p in processes:

196

p.join()

197

```

198

199

Fail-fast behavior for non-blocking semaphores:

200

201

```python

202

import portalocker

203

204

# Create semaphore that fails immediately if no slots available

205

semaphore = portalocker.NamedBoundedSemaphore(

206

maximum=1,

207

name='singleton_process',

208

fail_when_locked=True,

209

timeout=0 # Don't wait

210

)

211

212

try:

213

with semaphore:

214

# This will only succeed if no other instance is running

215

print("Running singleton process...")

216

run_singleton_task()

217

except portalocker.AlreadyLocked:

218

print("Another instance is already running. Exiting.")

219

exit(1)

220

```

221

222

## Load Balancing

223

224

Semaphores automatically randomize lock file order to distribute load:

225

226

```python

227

import portalocker

228

229

# The semaphore will try lock files in random order

230

# to avoid all processes competing for the same slot

231

semaphore = portalocker.NamedBoundedSemaphore(10, name='load_balanced_workers')

232

233

with semaphore:

234

# Processes will naturally distribute across available slots

235

process_work_item()

236

```

237

238

## Error Handling

239

240

Semaphores use the same exception types as other portalocker components:

241

242

```python

243

import portalocker

244

245

try:

246

with portalocker.NamedBoundedSemaphore(3, name='workers') as lock:

247

do_work()

248

except portalocker.AlreadyLocked:

249

print("All semaphore slots are currently in use")

250

except portalocker.LockException as e:

251

print(f"Semaphore error: {e}")

252

except Exception as e:

253

print(f"Unexpected error: {e}")

254

```

255

256

## Migration from BoundedSemaphore

257

258

To migrate from deprecated BoundedSemaphore to NamedBoundedSemaphore:

259

260

```python

261

# Old (deprecated)

262

semaphore = portalocker.BoundedSemaphore(5, name='my_workers')

263

264

# New (recommended)

265

semaphore = portalocker.NamedBoundedSemaphore(5, name='my_workers')

266

267

# The interface is identical, just change the class name

268

```

269

270

## Type Definitions

271

272

```python { .api }

273

import pathlib

274

import tempfile

275

import typing

276

277

# Filename types

278

Filename = Union[str, pathlib.Path]

279

280

# Default values

281

DEFAULT_TIMEOUT = 5.0

282

DEFAULT_CHECK_INTERVAL = 0.25

283

284

# Semaphore uses Lock objects internally

285

from portalocker.utils import Lock

286

```