or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/pypi-billiard

Python multiprocessing fork with improvements and bugfixes for distributed task processing

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/billiard@4.2.x

To install, run

npx @tessl/cli install tessl/pypi-billiard@4.2.0

0

# Billiard

1

2

A fork of Python's multiprocessing package providing enhanced functionality for distributed task processing. Billiard offers improved process pools with timeouts, enhanced error handling, worker management, and specialized features for task queue systems like Celery.

3

4

## Package Information

5

6

- **Package Name**: billiard

7

- **Language**: Python

8

- **Installation**: `pip install billiard`

9

- **Version**: 4.2.1

10

11

## Core Imports

12

13

```python

14

import billiard

15

```

16

17

Common imports for different components:

18

19

```python

20

from billiard import Process, Pool, Queue, Lock, Event

21

from billiard import current_process, active_children, cpu_count

22

```

23

24

## Basic Usage

25

26

```python

27

import billiard as mp

28

from billiard import Process, Pool, Queue

29

30

# Create and start processes

31

def worker_task(name, result_queue):

32

result = f"Hello from {name}"

33

result_queue.put(result)

34

35

if __name__ == '__main__':

36

# Create a queue for results

37

queue = Queue()

38

39

# Create and start processes

40

processes = []

41

for i in range(3):

42

p = Process(target=worker_task, args=(f"worker-{i}", queue))

43

p.start()

44

processes.append(p)

45

46

# Collect results

47

results = []

48

for _ in processes:

49

results.append(queue.get())

50

51

# Wait for completion

52

for p in processes:

53

p.join()

54

55

print("Results:", results)

56

57

# Use process pool for parallel execution

58

def square(x):

59

return x * x

60

61

if __name__ == '__main__':

62

with Pool(processes=4) as pool:

63

numbers = [1, 2, 3, 4, 5]

64

squared = pool.map(square, numbers)

65

print("Squared:", squared)

66

```

67

68

## Architecture

69

70

Billiard extends Python's multiprocessing architecture with several key enhancements:

71

72

- **Enhanced Process Pool**: Advanced pool implementation with timeouts, restart capabilities, and worker monitoring

73

- **Robust Error Handling**: Comprehensive exception hierarchy including worker loss detection and time limit management

74

- **Multiple Start Methods**: Support for fork, spawn, and forkserver process creation methods

75

- **Context Management**: Configurable process contexts for different execution environments

76

- **Celery Integration**: Specialized features optimized for distributed task processing systems

77

78

The package maintains API compatibility with Python's standard multiprocessing module while providing additional stability, performance optimizations, and features needed for production-scale distributed computing applications.

79

80

## Capabilities

81

82

### Process Management

83

84

Core process creation, lifecycle management, and process introspection functionality.

85

86

```python { .api }

87

class Process:

88

def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, daemon=None): ...

89

def start(self): ...

90

def join(self, timeout=None): ...

91

def terminate(self): ...

92

def is_alive(self) -> bool: ...

93

94

def current_process() -> Process: ...

95

def active_children() -> list[Process]: ...

96

def cpu_count() -> int: ...

97

```

98

99

[Process Management](./process-management.md)

100

101

### Process Pools

102

103

Advanced process pool for parallel execution with timeout support, worker management, and enhanced error handling.

104

105

```python { .api }

106

class Pool:

107

def __init__(self, processes=None, initializer=None, initargs=(),

108

maxtasksperchild=None, timeout=None, soft_timeout=None,

109

lost_worker_timeout=None, max_restarts=None, max_restart_freq=1,

110

on_process_up=None, on_process_down=None, on_timeout_set=None,

111

on_timeout_cancel=None, threads=True, semaphore=None, putlocks=False,

112

allow_restart=False, synack=False, on_process_exit=None,

113

context=None, max_memory_per_child=None, enable_timeouts=False): ...

114

def apply(self, func, args=(), kwds={}): ...

115

def apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None): ...

116

def map(self, func, iterable, chunksize=None): ...

117

def close(self): ...

118

def terminate(self): ...

119

def join(self): ...

120

```

121

122

[Process Pools](./process-pools.md)

123

124

### Queues

125

126

Thread-safe queues for inter-process communication using pipes, with task completion tracking support.

127

128

```python { .api }

129

class Queue:

130

def __init__(self, maxsize=0): ...

131

def put(self, obj, block=True, timeout=None): ...

132

def get(self, block=True, timeout=None): ...

133

def qsize(self) -> int: ...

134

def empty(self) -> bool: ...

135

136

class JoinableQueue(Queue):

137

def task_done(self): ...

138

def join(self): ...

139

```

140

141

[Queues](./queues.md)

142

143

### Synchronization

144

145

Synchronization primitives including locks, semaphores, events, conditions, and barriers for coordinating processes.

146

147

```python { .api }

148

class Lock:

149

def acquire(self, block=True, timeout=None) -> bool: ...

150

def release(self): ...

151

152

class Event:

153

def set(self): ...

154

def clear(self): ...

155

def is_set(self) -> bool: ...

156

def wait(self, timeout=None) -> bool: ...

157

158

class Semaphore:

159

def __init__(self, value=1): ...

160

def acquire(self, block=True, timeout=None) -> bool: ...

161

def release(self): ...

162

```

163

164

[Synchronization](./synchronization.md)

165

166

### Communication

167

168

Inter-process communication through pipes and connections with support for both object and byte-level messaging.

169

170

```python { .api }

171

def Pipe(duplex=True, rnonblock=False, wnonblock=False) -> tuple[Connection, Connection]: ...

172

173

class Connection:

174

def send(self, obj): ...

175

def recv(self): ...

176

def send_bytes(self, buf, offset=0, size=None): ...

177

def recv_bytes(self, maxlength=None) -> bytes: ...

178

def poll(self, timeout=None) -> bool: ...

179

def close(self): ...

180

```

181

182

[Communication](./communication.md)

183

184

### Shared Memory

185

186

Synchronized and unsynchronized shared memory objects for efficient data sharing between processes.

187

188

```python { .api }

189

def Value(typecode_or_type, *args, lock=True) -> SynchronizedBase: ...

190

def Array(typecode_or_type, size_or_initializer, lock=True) -> SynchronizedArray: ...

191

def RawValue(typecode_or_type, *args): ...

192

def RawArray(typecode_or_type, size_or_initializer): ...

193

```

194

195

[Shared Memory](./shared-memory.md)

196

197

### Managers

198

199

Shared object managers for creating and managing shared objects across multiple processes.

200

201

```python { .api }

202

def Manager() -> SyncManager: ...

203

204

class SyncManager:

205

def start(self): ...

206

def shutdown(self): ...

207

def dict(self) -> dict: ...

208

def list(self) -> list: ...

209

def Queue(self) -> Queue: ...

210

def Lock(self) -> Lock: ...

211

```

212

213

[Managers](./managers.md)

214

215

### Context Management

216

217

Process context configuration for controlling process start methods and execution environments.

218

219

```python { .api }

220

def get_context(method=None) -> BaseContext: ...

221

def set_start_method(method, force=False): ...

222

def get_start_method(allow_none=False) -> str: ...

223

def get_all_start_methods() -> list[str]: ...

224

```

225

226

[Context Management](./context-management.md)

227

228

### Utility Functions

229

230

Additional utility functions for platform support and configuration.

231

232

```python { .api }

233

def freeze_support(): ...

234

def get_logger(): ...

235

def log_to_stderr(level=None): ...

236

def allow_connection_pickling(): ...

237

def set_executable(executable): ...

238

def set_forkserver_preload(module_names): ...

239

def soft_timeout_sighandler(signum, frame): ...

240

```

241

242

Platform support functions:

243

- **freeze_support()**: Windows frozen executable support

244

- **get_logger()**: Get billiard logger

245

- **log_to_stderr()**: Enable stderr logging

246

- **allow_connection_pickling()**: Enable connection pickling

247

- **set_executable()**: Set Python executable path

248

- **set_forkserver_preload()**: Set forkserver preload modules

249

- **soft_timeout_sighandler()**: Signal handler that raises SoftTimeLimitExceeded

250

251

## Exception Types

252

253

```python { .api }

254

class ProcessError(Exception): ...

255

class TimeoutError(ProcessError): ...

256

class AuthenticationError(ProcessError): ...

257

class BufferTooShort(ProcessError): ...

258

class TimeLimitExceeded(Exception): ...

259

class SoftTimeLimitExceeded(Exception): ...

260

class WorkerLostError(Exception): ...

261

class Terminated(Exception): ...

262

class RestartFreqExceeded(Exception): ...

263

class CoroStop(Exception): ...

264

```

265

266

Common exceptions:

267

- **ProcessError**: Base exception for process-related errors

268

- **TimeoutError**: Operation exceeded timeout limit (subclass of ProcessError)

269

- **AuthenticationError**: Authentication failed (subclass of ProcessError)

270

- **BufferTooShort**: Buffer too short for message (subclass of ProcessError)

271

- **TimeLimitExceeded**: Hard time limit exceeded (immediate termination)

272

- **SoftTimeLimitExceeded**: Soft time limit exceeded (allows cleanup)

273

- **WorkerLostError**: Worker process died unexpectedly

274

- **Terminated**: Worker processing job terminated by user request

275

- **RestartFreqExceeded**: Worker restarts happening too frequently

276

- **CoroStop**: Coroutine exit (distinct from StopIteration)