0
# Concurrent Decorators
1
2
Synchronous decorators for thread and process-based execution that return `concurrent.futures.Future` objects. These decorators provide the simplest way to execute functions concurrently with minimal code changes.
3
4
## Capabilities
5
6
### Thread Decorator
7
8
Executes the decorated function in a separate thread and returns a `concurrent.futures.Future` object. Ideal for I/O-bound tasks that can benefit from concurrent execution without the overhead of process creation.
9
10
```python { .api }
11
def thread(
12
func: Callable = None,
13
*,
14
name: Optional[str] = None,
15
daemon: bool = True,
16
pool: Optional[ThreadPool] = None
17
) -> Callable[..., Future]:
18
"""
19
Decorator for thread-based concurrent execution.
20
21
Parameters:
22
- func: Function to decorate (when used without parameters)
23
- name: Thread name for identification and debugging
24
- daemon: Whether thread runs as daemon (doesn't prevent program exit)
25
- pool: Existing ThreadPool instance to use instead of creating new thread
26
27
Returns:
28
Decorated function that returns concurrent.futures.Future when called
29
"""
30
```
31
32
#### Usage Examples
33
34
```python
35
from pebble.concurrent import thread
36
import time
37
38
# Simple usage without parameters
39
@thread
40
def io_task(url):
41
# Simulate I/O operation
42
time.sleep(1)
43
return f"Data from {url}"
44
45
# Usage with parameters
46
@thread(name="background-worker", daemon=False)
47
def background_task(data):
48
# Process data in background
49
return len(data) * 2
50
51
# Using with existing pool
52
from pebble import ThreadPool
53
54
pool = ThreadPool(max_workers=4)
55
56
@thread(pool=pool)
57
def pooled_task(x):
58
return x ** 2
59
60
# Calling decorated functions
61
future1 = io_task("https://api.example.com")
62
future2 = background_task([1, 2, 3, 4])
63
future3 = pooled_task(5)
64
65
# Get results
66
result1 = future1.result() # Blocks until complete
67
result2 = future2.result(timeout=10) # With timeout
68
result3 = future3.result()
69
70
print(f"Results: {result1}, {result2}, {result3}")
71
```
72
73
### Process Decorator
74
75
Executes the decorated function in a separate process and returns a `ProcessFuture` object. Perfect for CPU-intensive tasks that can benefit from true parallelism by bypassing Python's Global Interpreter Lock (GIL).
76
77
```python { .api }
78
def process(
79
func: Callable = None,
80
*,
81
name: Optional[str] = None,
82
daemon: bool = True,
83
timeout: Optional[float] = None,
84
mp_context: Optional[multiprocessing.context.BaseContext] = None,
85
pool: Optional[ProcessPool] = None
86
) -> Callable[..., ProcessFuture]:
87
"""
88
Decorator for process-based concurrent execution.
89
90
Parameters:
91
- func: Function to decorate (when used without parameters)
92
- name: Process name for identification and debugging
93
- daemon: Whether process runs as daemon (doesn't prevent program exit)
94
- timeout: Maximum execution time in seconds (raises TimeoutError if exceeded)
95
- mp_context: Multiprocessing context for process creation (spawn, fork, forkserver)
96
- pool: Existing ProcessPool instance to use instead of creating new process
97
98
Returns:
99
Decorated function that returns ProcessFuture when called
100
"""
101
```
102
103
#### Usage Examples
104
105
```python
106
from pebble.concurrent import process
107
import multiprocessing
108
import time
109
110
# Simple usage for CPU-intensive task
111
@process
112
def cpu_intensive(n):
113
total = 0
114
for i in range(n):
115
total += i ** 2
116
return total
117
118
# Usage with timeout
119
@process(timeout=5.0)
120
def timed_task(duration):
121
time.sleep(duration)
122
return "Completed"
123
124
# Usage with custom multiprocessing context
125
ctx = multiprocessing.get_context('spawn')
126
127
@process(mp_context=ctx, name="spawned-worker")
128
def spawned_task(data):
129
return sum(data)
130
131
# Using with existing pool
132
from pebble import ProcessPool
133
134
pool = ProcessPool(max_workers=2)
135
136
@process(pool=pool)
137
def pooled_cpu_task(x, y):
138
return x * y + (x ** y)
139
140
# Calling decorated functions
141
future1 = cpu_intensive(1000000)
142
future2 = timed_task(2) # Will complete within timeout
143
future3 = spawned_task([1, 2, 3, 4, 5])
144
future4 = pooled_cpu_task(3, 4)
145
146
try:
147
# Get results
148
result1 = future1.result()
149
result2 = future2.result()
150
result3 = future3.result()
151
result4 = future4.result()
152
print(f"Results: {result1}, {result2}, {result3}, {result4}")
153
except TimeoutError:
154
print("Task exceeded timeout")
155
except Exception as e:
156
print(f"Task failed: {e}")
157
```
158
159
### Error Handling
160
161
Both decorators preserve exception tracebacks and provide comprehensive error information:
162
163
```python
164
from pebble.concurrent import process
165
from pebble import ProcessExpired
166
167
@process(timeout=1.0)
168
def failing_task():
169
raise ValueError("Something went wrong")
170
171
@process
172
def timeout_task():
173
import time
174
time.sleep(10) # Will timeout if timeout is set
175
return "Never reached"
176
177
# Handle various error conditions
178
future1 = failing_task()
179
future2 = timeout_task()
180
181
try:
182
result1 = future1.result()
183
except ValueError as e:
184
print(f"Function error: {e}")
185
186
try:
187
result2 = future2.result(timeout=2.0)
188
except TimeoutError:
189
print("Task timed out")
190
except ProcessExpired as e:
191
print(f"Process died unexpectedly: {e}")
192
```
193
194
### Function Requirements
195
196
Functions decorated with concurrent decorators must be:
197
198
1. **Serializable**: Pickleable for process decorator (not required for thread decorator)
199
2. **Pure or thread-safe**: Should not depend on global state that could cause race conditions
200
3. **Import-accessible**: For process decorator, function must be importable from the main module
201
202
```python
203
# Good: Pure function, easily serializable
204
@process
205
def calculate_fibonacci(n):
206
if n <= 1:
207
return n
208
return calculate_fibonacci(n-1) + calculate_fibonacci(n-2)
209
210
# Avoid: Depends on global state
211
global_counter = 0
212
213
@process # This could cause issues
214
def increment_global():
215
global global_counter
216
global_counter += 1
217
return global_counter
218
```