0
# Billiard
1
2
A fork of Python's multiprocessing package providing enhanced functionality for distributed task processing. Billiard offers improved process pools with timeouts, enhanced error handling, worker management, and specialized features for task queue systems like Celery.
3
4
## Package Information
5
6
- **Package Name**: billiard
7
- **Language**: Python
8
- **Installation**: `pip install billiard`
9
- **Version**: 4.2.1
10
11
## Core Imports
12
13
```python
14
import billiard
15
```
16
17
Common imports for different components:
18
19
```python
20
from billiard import Process, Pool, Queue, Lock, Event
21
from billiard import current_process, active_children, cpu_count
22
```
23
24
## Basic Usage
25
26
```python
27
import billiard as mp
28
from billiard import Process, Pool, Queue
29
30
# Create and start processes
31
def worker_task(name, result_queue):
32
result = f"Hello from {name}"
33
result_queue.put(result)
34
35
if __name__ == '__main__':
36
# Create a queue for results
37
queue = Queue()
38
39
# Create and start processes
40
processes = []
41
for i in range(3):
42
p = Process(target=worker_task, args=(f"worker-{i}", queue))
43
p.start()
44
processes.append(p)
45
46
# Collect results
47
results = []
48
for _ in processes:
49
results.append(queue.get())
50
51
# Wait for completion
52
for p in processes:
53
p.join()
54
55
print("Results:", results)
56
57
# Use process pool for parallel execution
58
def square(x):
59
return x * x
60
61
if __name__ == '__main__':
62
with Pool(processes=4) as pool:
63
numbers = [1, 2, 3, 4, 5]
64
squared = pool.map(square, numbers)
65
print("Squared:", squared)
66
```
67
68
## Architecture
69
70
Billiard extends Python's multiprocessing architecture with several key enhancements:
71
72
- **Enhanced Process Pool**: Advanced pool implementation with timeouts, restart capabilities, and worker monitoring
73
- **Robust Error Handling**: Comprehensive exception hierarchy including worker loss detection and time limit management
74
- **Multiple Start Methods**: Support for fork, spawn, and forkserver process creation methods
75
- **Context Management**: Configurable process contexts for different execution environments
76
- **Celery Integration**: Specialized features optimized for distributed task processing systems
77
78
The package maintains API compatibility with Python's standard multiprocessing module while providing additional stability, performance optimizations, and features needed for production-scale distributed computing applications.
79
80
## Capabilities
81
82
### Process Management
83
84
Core process creation, lifecycle management, and process introspection functionality.
85
86
```python { .api }
87
class Process:
88
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, daemon=None): ...
89
def start(self): ...
90
def join(self, timeout=None): ...
91
def terminate(self): ...
92
def is_alive(self) -> bool: ...
93
94
def current_process() -> Process: ...
95
def active_children() -> list[Process]: ...
96
def cpu_count() -> int: ...
97
```
98
99
[Process Management](./process-management.md)
100
101
### Process Pools
102
103
Advanced process pool for parallel execution with timeout support, worker management, and enhanced error handling.
104
105
```python { .api }
106
class Pool:
107
def __init__(self, processes=None, initializer=None, initargs=(),
108
maxtasksperchild=None, timeout=None, soft_timeout=None,
109
lost_worker_timeout=None, max_restarts=None, max_restart_freq=1,
110
on_process_up=None, on_process_down=None, on_timeout_set=None,
111
on_timeout_cancel=None, threads=True, semaphore=None, putlocks=False,
112
allow_restart=False, synack=False, on_process_exit=None,
113
context=None, max_memory_per_child=None, enable_timeouts=False): ...
114
def apply(self, func, args=(), kwds={}): ...
115
def apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None): ...
116
def map(self, func, iterable, chunksize=None): ...
117
def close(self): ...
118
def terminate(self): ...
119
def join(self): ...
120
```
121
122
[Process Pools](./process-pools.md)
123
124
### Queues
125
126
Thread-safe queues for inter-process communication using pipes, with task completion tracking support.
127
128
```python { .api }
129
class Queue:
130
def __init__(self, maxsize=0): ...
131
def put(self, obj, block=True, timeout=None): ...
132
def get(self, block=True, timeout=None): ...
133
def qsize(self) -> int: ...
134
def empty(self) -> bool: ...
135
136
class JoinableQueue(Queue):
137
def task_done(self): ...
138
def join(self): ...
139
```
140
141
[Queues](./queues.md)
142
143
### Synchronization
144
145
Synchronization primitives including locks, semaphores, events, conditions, and barriers for coordinating processes.
146
147
```python { .api }
148
class Lock:
149
def acquire(self, block=True, timeout=None) -> bool: ...
150
def release(self): ...
151
152
class Event:
153
def set(self): ...
154
def clear(self): ...
155
def is_set(self) -> bool: ...
156
def wait(self, timeout=None) -> bool: ...
157
158
class Semaphore:
159
def __init__(self, value=1): ...
160
def acquire(self, block=True, timeout=None) -> bool: ...
161
def release(self): ...
162
```
163
164
[Synchronization](./synchronization.md)
165
166
### Communication
167
168
Inter-process communication through pipes and connections with support for both object and byte-level messaging.
169
170
```python { .api }
171
def Pipe(duplex=True, rnonblock=False, wnonblock=False) -> tuple[Connection, Connection]: ...
172
173
class Connection:
174
def send(self, obj): ...
175
def recv(self): ...
176
def send_bytes(self, buf, offset=0, size=None): ...
177
def recv_bytes(self, maxlength=None) -> bytes: ...
178
def poll(self, timeout=None) -> bool: ...
179
def close(self): ...
180
```
181
182
[Communication](./communication.md)
183
184
### Shared Memory
185
186
Synchronized and unsynchronized shared memory objects for efficient data sharing between processes.
187
188
```python { .api }
189
def Value(typecode_or_type, *args, lock=True) -> SynchronizedBase: ...
190
def Array(typecode_or_type, size_or_initializer, lock=True) -> SynchronizedArray: ...
191
def RawValue(typecode_or_type, *args): ...
192
def RawArray(typecode_or_type, size_or_initializer): ...
193
```
194
195
[Shared Memory](./shared-memory.md)
196
197
### Managers
198
199
Shared object managers for creating and managing shared objects across multiple processes.
200
201
```python { .api }
202
def Manager() -> SyncManager: ...
203
204
class SyncManager:
205
def start(self): ...
206
def shutdown(self): ...
207
def dict(self) -> dict: ...
208
def list(self) -> list: ...
209
def Queue(self) -> Queue: ...
210
def Lock(self) -> Lock: ...
211
```
212
213
[Managers](./managers.md)
214
215
### Context Management
216
217
Process context configuration for controlling process start methods and execution environments.
218
219
```python { .api }
220
def get_context(method=None) -> BaseContext: ...
221
def set_start_method(method, force=False): ...
222
def get_start_method(allow_none=False) -> str: ...
223
def get_all_start_methods() -> list[str]: ...
224
```
225
226
[Context Management](./context-management.md)
227
228
### Utility Functions
229
230
Additional utility functions for platform support and configuration.
231
232
```python { .api }
233
def freeze_support(): ...
234
def get_logger(): ...
235
def log_to_stderr(level=None): ...
236
def allow_connection_pickling(): ...
237
def set_executable(executable): ...
238
def set_forkserver_preload(module_names): ...
239
def soft_timeout_sighandler(signum, frame): ...
240
```
241
242
Platform support functions:
243
- **freeze_support()**: Windows frozen executable support
244
- **get_logger()**: Get billiard logger
245
- **log_to_stderr()**: Enable stderr logging
246
- **allow_connection_pickling()**: Enable connection pickling
247
- **set_executable()**: Set Python executable path
248
- **set_forkserver_preload()**: Set forkserver preload modules
249
- **soft_timeout_sighandler()**: Signal handler that raises SoftTimeLimitExceeded
250
251
## Exception Types
252
253
```python { .api }
254
class ProcessError(Exception): ...
255
class TimeoutError(ProcessError): ...
256
class AuthenticationError(ProcessError): ...
257
class BufferTooShort(ProcessError): ...
258
class TimeLimitExceeded(Exception): ...
259
class SoftTimeLimitExceeded(Exception): ...
260
class WorkerLostError(Exception): ...
261
class Terminated(Exception): ...
262
class RestartFreqExceeded(Exception): ...
263
class CoroStop(Exception): ...
264
```
265
266
Common exceptions:
267
- **ProcessError**: Base exception for process-related errors
268
- **TimeoutError**: Operation exceeded timeout limit (subclass of ProcessError)
269
- **AuthenticationError**: Authentication failed (subclass of ProcessError)
270
- **BufferTooShort**: Buffer too short for message (subclass of ProcessError)
271
- **TimeLimitExceeded**: Hard time limit exceeded (immediate termination)
272
- **SoftTimeLimitExceeded**: Soft time limit exceeded (allows cleanup)
273
- **WorkerLostError**: Worker process died unexpectedly
274
- **Terminated**: Worker processing job terminated by user request
275
- **RestartFreqExceeded**: Worker restarts happening too frequently
276
- **CoroStop**: Coroutine exit (distinct from StopIteration)