A robust implementation of concurrent.futures.ProcessPoolExecutor with reusable executors and transparent cloudpickle integration
npx @tessl/cli install tessl/pypi-loky@3.5.00
# Loky
1
2
A robust, cross-platform and cross-version implementation of the `ProcessPoolExecutor` class from `concurrent.futures`. Loky provides reusable executors, transparent cloudpickle integration, and deadlock-free process management for parallel Python computing.
3
4
## Package Information
5
6
- **Package Name**: loky
7
- **Language**: Python
8
- **Installation**: `pip install loky`
9
- **Requirements**: Python 3.9+, cloudpickle
10
11
## Core Imports
12
13
```python
14
import loky
15
from loky import get_reusable_executor, ProcessPoolExecutor
16
```
17
18
For specific functionality:
19
20
```python
21
from loky import (
22
ProcessPoolExecutor,
23
get_reusable_executor,
24
cpu_count,
25
wrap_non_picklable_objects,
26
set_loky_pickler,
27
BrokenProcessPool,
28
Future
29
)
30
```
31
32
## Basic Usage
33
34
```python
35
import os
36
from time import sleep
37
from loky import get_reusable_executor
38
39
def say_hello(k):
40
pid = os.getpid()
41
print(f"Hello from {pid} with arg {k}")
42
sleep(.01)
43
return pid
44
45
# Create an executor with 4 worker processes
46
# that will automatically shutdown after idling for 2s
47
executor = get_reusable_executor(max_workers=4, timeout=2)
48
49
# Submit a single task
50
res = executor.submit(say_hello, 1)
51
print("Got results:", res.result())
52
53
# Submit multiple tasks using map
54
results = executor.map(say_hello, range(10))
55
n_workers = len(set(results))
56
print("Number of used processes:", n_workers)
57
```
58
59
## Architecture
60
61
Loky provides a robust parallel processing architecture built around three core components:
62
63
- **ProcessPoolExecutor**: Drop-in replacement for `concurrent.futures.ProcessPoolExecutor` with enhanced robustness, consistent spawn behavior using fork+exec on POSIX systems, and better error handling
64
- **Reusable Executor**: Singleton pattern executor that can be reused across consecutive calls to reduce spawning overhead, with configurable automatic shutdown after idling
65
- **Cloudpickle Integration**: Transparent serialization for interactively defined functions and lambda expressions, with customizable pickling strategies
66
67
The library is designed for maximum reliability in parallel processing scenarios, particularly for scientific computing and data processing workflows where robust process management is critical.
68
69
## Capabilities
70
71
### Process Pool Executor
72
73
Core ProcessPoolExecutor implementation providing robust parallel task execution with configurable worker processes, timeout management, and enhanced error handling.
74
75
```python { .api }
76
class ProcessPoolExecutor(Executor):
77
def __init__(
78
self,
79
max_workers=None,
80
job_reducers=None,
81
result_reducers=None,
82
timeout=None,
83
context=None,
84
initializer=None,
85
initargs=(),
86
env=None
87
): ...
88
89
def submit(self, fn, *args, **kwargs): ...
90
def map(self, fn, *iterables, **kwargs): ...
91
def shutdown(self, wait=True, kill_workers=False): ...
92
```
93
94
[Process Pool Executor](./process-pool-executor.md)
95
96
### Reusable Executor Management
97
98
Singleton executor management for efficient resource usage across multiple parallel processing sessions.
99
100
```python { .api }
101
def get_reusable_executor(
102
max_workers=None,
103
context=None,
104
timeout=10,
105
kill_workers=False,
106
reuse="auto",
107
job_reducers=None,
108
result_reducers=None,
109
initializer=None,
110
initargs=(),
111
env=None
112
): ...
113
```
114
115
[Reusable Executor](./reusable-executor.md)
116
117
### Cloudpickle Integration
118
119
Functions for handling non-picklable objects and customizing serialization behavior.
120
121
```python { .api }
122
def wrap_non_picklable_objects(obj, keep_wrapper=True): ...
123
def set_loky_pickler(loky_pickler=None): ...
124
```
125
126
[Cloudpickle Integration](./cloudpickle-integration.md)
127
128
### Backend Context Management
129
130
Context and system information functions for multiprocessing configuration.
131
132
```python { .api }
133
def cpu_count(only_physical_cores=False): ...
134
```
135
136
[Backend Context](./backend-context.md)
137
138
### Error Handling
139
140
Exception classes and error management for robust parallel processing.
141
142
```python { .api }
143
class BrokenProcessPool(Exception): ...
144
class TerminatedWorkerError(BrokenProcessPool): ...
145
class ShutdownExecutorError(RuntimeError): ...
146
```
147
148
[Error Handling](./error-handling.md)
149
150
## Types
151
152
```python { .api }
153
class Future:
154
"""Enhanced Future implementation with improved callback handling."""
155
def __init__(self): ...
156
def result(self, timeout=None): ...
157
def exception(self, timeout=None): ...
158
def add_done_callback(self, fn): ...
159
def cancel(self): ...
160
def cancelled(self): ...
161
def running(self): ...
162
def done(self): ...
163
164
# Re-exported from concurrent.futures for convenience
165
def wait(fs, timeout=None, return_when=ALL_COMPLETED): ...
166
def as_completed(fs, timeout=None): ...
167
168
Executor = concurrent.futures.Executor
169
CancelledError = concurrent.futures.CancelledError
170
TimeoutError = concurrent.futures.TimeoutError
171
ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
172
FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
173
FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
174
```