or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

apply-functions.mdasync-results.mddashboard-integration.mdexception-handling.mdindex.mdparallel-map.mdperformance-insights.mdutility-functions.mdworker-configuration.mdworkerpool-management.md

async-results.mddocs/

0

# Async Results

1

2

Asynchronous result handling with support for callbacks, timeouts, and iterators for processing results as they become available. The AsyncResult system provides fine-grained control over task execution and result retrieval.

3

4

## Capabilities

5

6

### AsyncResult Class

7

8

Main class for handling asynchronous results from apply_async operations.

9

10

```python { .api }

11

class AsyncResult:

12

def __init__(self, cache: Dict, callback: Optional[Callable], error_callback: Optional[Callable],

13

job_id: Optional[int] = None, delete_from_cache: bool = True,

14

timeout: Optional[float] = None) -> None

15

def ready(self) -> bool

16

def successful(self) -> bool

17

def get(self, timeout: Optional[float] = None) -> Any

18

def wait(self, timeout: Optional[float] = None) -> None

19

```

20

21

**ready**: Check if the task has completed (either successfully or with error).

22

23

**successful**: Check if the task completed successfully (only valid after ready() returns True).

24

25

**get**: Retrieve the result, blocking until ready. Raises the exception if task failed.

26

- `timeout` (Optional[float]): Maximum time to wait for result in seconds

27

28

**wait**: Wait for the task to complete without retrieving the result.

29

- `timeout` (Optional[float]): Maximum time to wait in seconds

30

31

### AsyncResult Iterator Classes

32

33

Iterator classes for handling collections of async results.

34

35

```python { .api }

36

class UnorderedAsyncResultIterator:

37

def __init__(self, cache: Dict, job_ids: List[int]) -> None

38

def __iter__(self) -> Iterator

39

def __next__(self) -> Any

40

41

class AsyncResultWithExceptionGetter(AsyncResult):

42

"""AsyncResult subclass with enhanced exception handling"""

43

pass

44

45

class UnorderedAsyncExitResultIterator(UnorderedAsyncResultIterator):

46

"""Iterator for worker exit results"""

47

pass

48

```

49

50

## Usage Examples

51

52

### Basic AsyncResult Usage

53

54

```python

55

from mpire import WorkerPool

56

import time

57

58

def slow_computation(x):

59

time.sleep(x * 0.1)

60

return x ** 2

61

62

with WorkerPool(n_jobs=4) as pool:

63

# Submit async task

64

async_result = pool.apply_async(slow_computation, args=(5,))

65

66

# Do other work while task runs

67

print("Task submitted, doing other work...")

68

time.sleep(0.2)

69

70

# Check if ready

71

if async_result.ready():

72

print("Task completed!")

73

if async_result.successful():

74

result = async_result.get()

75

print(f"Result: {result}")

76

else:

77

print("Task still running, waiting...")

78

result = async_result.get() # Block until ready

79

print(f"Result: {result}")

80

```

81

82

### Multiple Async Tasks

83

84

```python

85

from mpire import WorkerPool

86

import time

87

88

def factorial(n):

89

if n <= 1:

90

return 1

91

result = 1

92

for i in range(2, n + 1):

93

result *= i

94

return result

95

96

with WorkerPool(n_jobs=3) as pool:

97

# Submit multiple async tasks

98

tasks = []

99

for i in range(1, 11):

100

async_result = pool.apply_async(factorial, args=(i,))

101

tasks.append((i, async_result))

102

103

# Process results as they become available

104

completed = []

105

while len(completed) < len(tasks):

106

for i, (input_val, async_result) in enumerate(tasks):

107

if i not in completed and async_result.ready():

108

if async_result.successful():

109

result = async_result.get()

110

print(f"Factorial of {input_val} = {result}")

111

else:

112

print(f"Task {input_val} failed")

113

completed.append(i)

114

115

time.sleep(0.01) # Small delay to prevent busy waiting

116

```

117

118

### Timeout Handling

119

120

```python

121

from mpire import WorkerPool

122

import time

123

124

def unreliable_task(duration):

125

time.sleep(duration)

126

return f"Completed after {duration} seconds"

127

128

with WorkerPool(n_jobs=2) as pool:

129

# Submit tasks with different durations

130

fast_task = pool.apply_async(unreliable_task, args=(1,))

131

slow_task = pool.apply_async(unreliable_task, args=(5,))

132

133

# Get results with timeout

134

try:

135

result1 = fast_task.get(timeout=2.0)

136

print(f"Fast task: {result1}")

137

except TimeoutError:

138

print("Fast task timed out")

139

140

try:

141

result2 = slow_task.get(timeout=2.0)

142

print(f"Slow task: {result2}")

143

except TimeoutError:

144

print("Slow task timed out")

145

146

# Wait for slow task without timeout

147

print("Waiting for slow task to complete...")

148

result2 = slow_task.get() # No timeout

149

print(f"Slow task finally completed: {result2}")

150

```

151

152

### Callback Functions with AsyncResult

153

154

```python

155

from mpire import WorkerPool

156

import time

157

158

def process_data(data):

159

time.sleep(0.5)

160

if data < 0:

161

raise ValueError(f"Negative data not allowed: {data}")

162

return data * 2

163

164

def success_callback(result):

165

print(f"✓ Task succeeded with result: {result}")

166

167

def error_callback(exception):

168

print(f"✗ Task failed with error: {type(exception).__name__}: {exception}")

169

170

with WorkerPool(n_jobs=2) as pool:

171

# Submit tasks with callbacks

172

tasks = []

173

test_data = [1, 2, -1, 3, -2, 4]

174

175

for data in test_data:

176

async_result = pool.apply_async(

177

process_data,

178

args=(data,),

179

callback=success_callback,

180

error_callback=error_callback

181

)

182

tasks.append(async_result)

183

184

# Wait for all tasks to complete

185

for async_result in tasks:

186

async_result.wait()

187

188

print("All tasks completed")

189

```

190

191

### Conditional Result Processing

192

193

```python

194

from mpire import WorkerPool

195

import time

196

import random

197

198

def random_computation(x):

199

# Simulate variable processing time

200

sleep_time = random.uniform(0.1, 1.0)

201

time.sleep(sleep_time)

202

203

# Occasionally fail

204

if random.random() < 0.2:

205

raise RuntimeError(f"Random failure for input {x}")

206

207

return x ** 2

208

209

with WorkerPool(n_jobs=3) as pool:

210

# Submit batch of tasks

211

async_results = []

212

for i in range(10):

213

result = pool.apply_async(random_computation, args=(i,))

214

async_results.append(result)

215

216

# Process results with different strategies

217

successful_results = []

218

failed_results = []

219

220

for i, async_result in enumerate(async_results):

221

async_result.wait() # Wait for completion

222

223

if async_result.successful():

224

result = async_result.get()

225

successful_results.append((i, result))

226

print(f"✓ Task {i}: {result}")

227

else:

228

try:

229

async_result.get() # This will raise the exception

230

except Exception as e:

231

failed_results.append((i, str(e)))

232

print(f"✗ Task {i}: {e}")

233

234

print(f"\nSummary: {len(successful_results)} succeeded, {len(failed_results)} failed")

235

```

236

237

### Polling for Results

238

239

```python

240

from mpire import WorkerPool

241

import time

242

243

def long_running_task(task_id):

244

# Simulate different task durations

245

duration = task_id * 0.5

246

time.sleep(duration)

247

return f"Task {task_id} completed after {duration}s"

248

249

with WorkerPool(n_jobs=2) as pool:

250

# Submit multiple long-running tasks

251

async_results = []

252

for i in range(1, 6):

253

result = pool.apply_async(long_running_task, args=(i,))

254

async_results.append((i, result))

255

256

# Poll for results and process them as they complete

257

completed_tasks = set()

258

259

while len(completed_tasks) < len(async_results):

260

for task_id, async_result in async_results:

261

if task_id not in completed_tasks and async_result.ready():

262

result = async_result.get()

263

print(f"Completed: {result}")

264

completed_tasks.add(task_id)

265

266

# Show progress

267

print(f"Progress: {len(completed_tasks)}/{len(async_results)} tasks completed")

268

time.sleep(0.1)

269

270

print("All tasks completed!")

271

```

272

273

### Advanced Error Handling

274

275

```python

276

from mpire import WorkerPool

277

import time

278

279

def risky_operation(operation_id, fail_probability=0.3):

280

time.sleep(0.5)

281

282

import random

283

if random.random() < fail_probability:

284

if operation_id % 2 == 0:

285

raise ValueError(f"ValueError in operation {operation_id}")

286

else:

287

raise RuntimeError(f"RuntimeError in operation {operation_id}")

288

289

return f"Operation {operation_id} successful"

290

291

with WorkerPool(n_jobs=3) as pool:

292

async_results = []

293

294

# Submit operations with error handling

295

for i in range(10):

296

result = pool.apply_async(risky_operation, args=(i,))

297

async_results.append((i, result))

298

299

# Categorize results by outcome

300

success_count = 0

301

value_errors = 0

302

runtime_errors = 0

303

other_errors = 0

304

305

for operation_id, async_result in async_results:

306

async_result.wait()

307

308

if async_result.successful():

309

result = async_result.get()

310

print(f"✓ {result}")

311

success_count += 1

312

else:

313

try:

314

async_result.get()

315

except ValueError as e:

316

print(f"ValueError in operation {operation_id}: {e}")

317

value_errors += 1

318

except RuntimeError as e:

319

print(f"RuntimeError in operation {operation_id}: {e}")

320

runtime_errors += 1

321

except Exception as e:

322

print(f"Unexpected error in operation {operation_id}: {e}")

323

other_errors += 1

324

325

print(f"\nResults Summary:")

326

print(f"Successful: {success_count}")

327

print(f"ValueErrors: {value_errors}")

328

print(f"RuntimeErrors: {runtime_errors}")

329

print(f"Other errors: {other_errors}")

330

```