0
# Async Results
1
2
Asynchronous result handling with support for callbacks, timeouts, and iterators for processing results as they become available. The AsyncResult system provides fine-grained control over task execution and result retrieval.
3
4
## Capabilities
5
6
### AsyncResult Class
7
8
Main class for handling asynchronous results from apply_async operations.
9
10
```python { .api }
11
class AsyncResult:
12
def __init__(self, cache: Dict, callback: Optional[Callable], error_callback: Optional[Callable],
13
job_id: Optional[int] = None, delete_from_cache: bool = True,
14
timeout: Optional[float] = None) -> None
15
def ready(self) -> bool
16
def successful(self) -> bool
17
def get(self, timeout: Optional[float] = None) -> Any
18
def wait(self, timeout: Optional[float] = None) -> None
19
```
20
21
**ready**: Check if the task has completed (either successfully or with error).
22
23
**successful**: Check if the task completed successfully (only valid after ready() returns True).
24
25
**get**: Retrieve the result, blocking until ready. Raises the exception if task failed.
26
- `timeout` (Optional[float]): Maximum time to wait for result in seconds
27
28
**wait**: Wait for the task to complete without retrieving the result.
29
- `timeout` (Optional[float]): Maximum time to wait in seconds
30
31
### AsyncResult Iterator Classes
32
33
Iterator classes for handling collections of async results.
34
35
```python { .api }
36
class UnorderedAsyncResultIterator:
37
def __init__(self, cache: Dict, job_ids: List[int]) -> None
38
def __iter__(self) -> Iterator
39
def __next__(self) -> Any
40
41
class AsyncResultWithExceptionGetter(AsyncResult):
42
"""AsyncResult subclass with enhanced exception handling"""
43
pass
44
45
class UnorderedAsyncExitResultIterator(UnorderedAsyncResultIterator):
46
"""Iterator for worker exit results"""
47
pass
48
```
49
50
## Usage Examples
51
52
### Basic AsyncResult Usage
53
54
```python
55
from mpire import WorkerPool
56
import time
57
58
def slow_computation(x):
59
time.sleep(x * 0.1)
60
return x ** 2
61
62
with WorkerPool(n_jobs=4) as pool:
63
# Submit async task
64
async_result = pool.apply_async(slow_computation, args=(5,))
65
66
# Do other work while task runs
67
print("Task submitted, doing other work...")
68
time.sleep(0.2)
69
70
# Check if ready
71
if async_result.ready():
72
print("Task completed!")
73
if async_result.successful():
74
result = async_result.get()
75
print(f"Result: {result}")
76
else:
77
print("Task still running, waiting...")
78
result = async_result.get() # Block until ready
79
print(f"Result: {result}")
80
```
81
82
### Multiple Async Tasks
83
84
```python
85
from mpire import WorkerPool
86
import time
87
88
def factorial(n):
89
if n <= 1:
90
return 1
91
result = 1
92
for i in range(2, n + 1):
93
result *= i
94
return result
95
96
with WorkerPool(n_jobs=3) as pool:
97
# Submit multiple async tasks
98
tasks = []
99
for i in range(1, 11):
100
async_result = pool.apply_async(factorial, args=(i,))
101
tasks.append((i, async_result))
102
103
# Process results as they become available
104
completed = []
105
while len(completed) < len(tasks):
106
for i, (input_val, async_result) in enumerate(tasks):
107
if i not in completed and async_result.ready():
108
if async_result.successful():
109
result = async_result.get()
110
print(f"Factorial of {input_val} = {result}")
111
else:
112
print(f"Task {input_val} failed")
113
completed.append(i)
114
115
time.sleep(0.01) # Small delay to prevent busy waiting
116
```
117
118
### Timeout Handling
119
120
```python
121
from mpire import WorkerPool
122
import time
123
124
def unreliable_task(duration):
125
time.sleep(duration)
126
return f"Completed after {duration} seconds"
127
128
with WorkerPool(n_jobs=2) as pool:
129
# Submit tasks with different durations
130
fast_task = pool.apply_async(unreliable_task, args=(1,))
131
slow_task = pool.apply_async(unreliable_task, args=(5,))
132
133
# Get results with timeout
134
try:
135
result1 = fast_task.get(timeout=2.0)
136
print(f"Fast task: {result1}")
137
except TimeoutError:
138
print("Fast task timed out")
139
140
try:
141
result2 = slow_task.get(timeout=2.0)
142
print(f"Slow task: {result2}")
143
except TimeoutError:
144
print("Slow task timed out")
145
146
# Wait for slow task without timeout
147
print("Waiting for slow task to complete...")
148
result2 = slow_task.get() # No timeout
149
print(f"Slow task finally completed: {result2}")
150
```
151
152
### Callback Functions with AsyncResult
153
154
```python
155
from mpire import WorkerPool
156
import time
157
158
def process_data(data):
159
time.sleep(0.5)
160
if data < 0:
161
raise ValueError(f"Negative data not allowed: {data}")
162
return data * 2
163
164
def success_callback(result):
165
print(f"✓ Task succeeded with result: {result}")
166
167
def error_callback(exception):
168
print(f"✗ Task failed with error: {type(exception).__name__}: {exception}")
169
170
with WorkerPool(n_jobs=2) as pool:
171
# Submit tasks with callbacks
172
tasks = []
173
test_data = [1, 2, -1, 3, -2, 4]
174
175
for data in test_data:
176
async_result = pool.apply_async(
177
process_data,
178
args=(data,),
179
callback=success_callback,
180
error_callback=error_callback
181
)
182
tasks.append(async_result)
183
184
# Wait for all tasks to complete
185
for async_result in tasks:
186
async_result.wait()
187
188
print("All tasks completed")
189
```
190
191
### Conditional Result Processing
192
193
```python
194
from mpire import WorkerPool
195
import time
196
import random
197
198
def random_computation(x):
199
# Simulate variable processing time
200
sleep_time = random.uniform(0.1, 1.0)
201
time.sleep(sleep_time)
202
203
# Occasionally fail
204
if random.random() < 0.2:
205
raise RuntimeError(f"Random failure for input {x}")
206
207
return x ** 2
208
209
with WorkerPool(n_jobs=3) as pool:
210
# Submit batch of tasks
211
async_results = []
212
for i in range(10):
213
result = pool.apply_async(random_computation, args=(i,))
214
async_results.append(result)
215
216
# Process results with different strategies
217
successful_results = []
218
failed_results = []
219
220
for i, async_result in enumerate(async_results):
221
async_result.wait() # Wait for completion
222
223
if async_result.successful():
224
result = async_result.get()
225
successful_results.append((i, result))
226
print(f"✓ Task {i}: {result}")
227
else:
228
try:
229
async_result.get() # This will raise the exception
230
except Exception as e:
231
failed_results.append((i, str(e)))
232
print(f"✗ Task {i}: {e}")
233
234
print(f"\nSummary: {len(successful_results)} succeeded, {len(failed_results)} failed")
235
```
236
237
### Polling for Results
238
239
```python
240
from mpire import WorkerPool
241
import time
242
243
def long_running_task(task_id):
244
# Simulate different task durations
245
duration = task_id * 0.5
246
time.sleep(duration)
247
return f"Task {task_id} completed after {duration}s"
248
249
with WorkerPool(n_jobs=2) as pool:
250
# Submit multiple long-running tasks
251
async_results = []
252
for i in range(1, 6):
253
result = pool.apply_async(long_running_task, args=(i,))
254
async_results.append((i, result))
255
256
# Poll for results and process them as they complete
257
completed_tasks = set()
258
259
while len(completed_tasks) < len(async_results):
260
for task_id, async_result in async_results:
261
if task_id not in completed_tasks and async_result.ready():
262
result = async_result.get()
263
print(f"Completed: {result}")
264
completed_tasks.add(task_id)
265
266
# Show progress
267
print(f"Progress: {len(completed_tasks)}/{len(async_results)} tasks completed")
268
time.sleep(0.1)
269
270
print("All tasks completed!")
271
```
272
273
### Advanced Error Handling
274
275
```python
276
from mpire import WorkerPool
277
import time
278
279
def risky_operation(operation_id, fail_probability=0.3):
280
time.sleep(0.5)
281
282
import random
283
if random.random() < fail_probability:
284
if operation_id % 2 == 0:
285
raise ValueError(f"ValueError in operation {operation_id}")
286
else:
287
raise RuntimeError(f"RuntimeError in operation {operation_id}")
288
289
return f"Operation {operation_id} successful"
290
291
with WorkerPool(n_jobs=3) as pool:
292
async_results = []
293
294
# Submit operations with error handling
295
for i in range(10):
296
result = pool.apply_async(risky_operation, args=(i,))
297
async_results.append((i, result))
298
299
# Categorize results by outcome
300
success_count = 0
301
value_errors = 0
302
runtime_errors = 0
303
other_errors = 0
304
305
for operation_id, async_result in async_results:
306
async_result.wait()
307
308
if async_result.successful():
309
result = async_result.get()
310
print(f"✓ {result}")
311
success_count += 1
312
else:
313
try:
314
async_result.get()
315
except ValueError as e:
316
print(f"ValueError in operation {operation_id}: {e}")
317
value_errors += 1
318
except RuntimeError as e:
319
print(f"RuntimeError in operation {operation_id}: {e}")
320
runtime_errors += 1
321
except Exception as e:
322
print(f"Unexpected error in operation {operation_id}: {e}")
323
other_errors += 1
324
325
print(f"\nResults Summary:")
326
print(f"Successful: {success_count}")
327
print(f"ValueErrors: {value_errors}")
328
print(f"RuntimeErrors: {runtime_errors}")
329
print(f"Other errors: {other_errors}")
330
```