or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

executors.mdfutures.mdindex.mdutilities.md

utilities.mddocs/

0

# Utility Functions

1

2

Utility functions for coordinating and managing multiple Future objects. These functions provide powerful patterns for waiting on multiple asynchronous operations and processing results as they become available.

3

4

## Capabilities

5

6

### wait Function

7

8

Waits for Future objects to complete based on specified conditions and returns completed and pending futures.

9

10

```python { .api }

11

def wait(fs, timeout=None, return_when=ALL_COMPLETED):

12

"""

13

Wait for futures to complete based on specified conditions.

14

15

Parameters:

16

- fs (iterable): Sequence of Future objects to wait for

17

- timeout (float, optional): Maximum time to wait in seconds

18

- return_when (str): Condition for when to return:

19

- ALL_COMPLETED: Wait for all futures to complete (default)

20

- FIRST_COMPLETED: Return when any future completes

21

- FIRST_EXCEPTION: Return when any future raises an exception

22

23

Returns:

24

DoneAndNotDoneFutures: Named tuple with 'done' and 'not_done' sets

25

26

Note: The 'done' set contains completed futures, 'not_done' contains pending futures

27

"""

28

```

29

30

#### Usage Examples

31

32

**Basic wait usage:**

33

34

```python

35

from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED

36

import time

37

38

def task(n, delay):

39

time.sleep(delay)

40

return f"Task {n} completed"

41

42

with ThreadPoolExecutor(max_workers=3) as executor:

43

# Submit multiple tasks

44

futures_list = [

45

executor.submit(task, 1, 0.5),

46

executor.submit(task, 2, 1.0),

47

executor.submit(task, 3, 0.3)

48

]

49

50

# Wait for all to complete

51

done, not_done = wait(futures_list)

52

53

print(f"Completed: {len(done)}") # 3

54

print(f"Pending: {len(not_done)}") # 0

55

56

# Get all results

57

for future in done:

58

print(future.result())

59

```

60

61

**Wait with timeout:**

62

63

```python

64

with ThreadPoolExecutor(max_workers=2) as executor:

65

futures_list = [

66

executor.submit(task, 1, 0.5),

67

executor.submit(task, 2, 2.0) # Long-running task

68

]

69

70

# Wait maximum 1 second

71

done, not_done = wait(futures_list, timeout=1.0)

72

73

print(f"Completed in 1s: {len(done)}") # 1

74

print(f"Still running: {len(not_done)}") # 1

75

76

# Process completed futures

77

for future in done:

78

print(f"Quick result: {future.result()}")

79

80

# Wait for remaining futures

81

if not_done:

82

final_done, _ = wait(not_done)

83

for future in final_done:

84

print(f"Slow result: {future.result()}")

85

```

86

87

**Return when first completes:**

88

89

```python

90

from concurrent.futures import FIRST_COMPLETED

91

92

with ThreadPoolExecutor(max_workers=3) as executor:

93

futures_list = [

94

executor.submit(task, 1, 1.0),

95

executor.submit(task, 2, 0.3), # This will complete first

96

executor.submit(task, 3, 2.0)

97

]

98

99

# Return as soon as any future completes

100

done, not_done = wait(futures_list, return_when=FIRST_COMPLETED)

101

102

print(f"First completed: {len(done)}") # 1

103

print(f"Still running: {len(not_done)}") # 2

104

105

# Get the first result

106

first_future = next(iter(done))

107

print(f"First result: {first_future.result()}")

108

```

109

110

**Return when first exception occurs:**

111

112

```python

113

from concurrent.futures import FIRST_EXCEPTION

114

115

def failing_task(n):

116

import time

117

time.sleep(0.1 * n)

118

if n == 2:

119

raise ValueError(f"Task {n} failed")

120

return f"Task {n} succeeded"

121

122

with ThreadPoolExecutor(max_workers=3) as executor:

123

futures_list = [

124

executor.submit(failing_task, 1),

125

executor.submit(failing_task, 2), # Will fail

126

executor.submit(failing_task, 3)

127

]

128

129

# Return when first exception occurs

130

done, not_done = wait(futures_list, return_when=FIRST_EXCEPTION)

131

132

# Check results

133

for future in done:

134

try:

135

result = future.result()

136

print(f"Success: {result}")

137

except Exception as e:

138

print(f"Exception: {e}")

139

```

140

141

### as_completed Function

142

143

Returns an iterator that yields Future objects as they complete, regardless of order.

144

145

```python { .api }

146

def as_completed(fs, timeout=None):

147

"""

148

Return iterator over futures as they complete.

149

150

Parameters:

151

- fs (iterable): Sequence of Future objects to monitor

152

- timeout (float, optional): Maximum total time for iteration

153

154

Yields:

155

Future: Futures in order of completion

156

157

Raises:

158

TimeoutError: If entire iteration cannot complete before timeout

159

160

Note: Duplicate futures in input are yielded only once

161

"""

162

```

163

164

#### Usage Examples

165

166

**Basic as_completed usage:**

167

168

```python

169

from concurrent.futures import ThreadPoolExecutor, as_completed

170

import time

171

172

def timed_task(n, delay):

173

time.sleep(delay)

174

return f"Task {n} finished after {delay}s"

175

176

with ThreadPoolExecutor(max_workers=4) as executor:

177

# Submit tasks with different delays

178

futures_dict = {

179

executor.submit(timed_task, 1, 0.5): 1,

180

executor.submit(timed_task, 2, 0.2): 2, # Fastest

181

executor.submit(timed_task, 3, 0.8): 3,

182

executor.submit(timed_task, 4, 0.1): 4 # Actually fastest

183

}

184

185

# Process results as they complete

186

for future in as_completed(futures_dict.keys()):

187

task_id = futures_dict[future]

188

try:

189

result = future.result()

190

print(f"Task {task_id}: {result}")

191

except Exception as e:

192

print(f"Task {task_id} failed: {e}")

193

194

# Output order will be: Task 4, Task 2, Task 1, Task 3

195

```

196

197

**as_completed with timeout:**

198

199

```python

200

with ThreadPoolExecutor(max_workers=3) as executor:

201

futures_list = [

202

executor.submit(timed_task, 1, 0.3),

203

executor.submit(timed_task, 2, 0.6),

204

executor.submit(timed_task, 3, 1.5) # Too slow

205

]

206

207

try:

208

# Only wait 1 second total

209

for future in as_completed(futures_list, timeout=1.0):

210

result = future.result()

211

print(f"Completed: {result}")

212

except TimeoutError:

213

print("Timeout exceeded - some futures may still be running")

214

215

# Check what's still pending

216

for future in futures_list:

217

if not future.done():

218

print(f"Still running: {future}")

219

```

220

221

**Progress tracking with as_completed:**

222

223

```python

224

import time

225

226

def download_file(file_id, size):

227

"""Simulate file download"""

228

time.sleep(size * 0.1) # Simulate download time

229

return f"File {file_id} ({size}MB) downloaded"

230

231

files_to_download = [

232

(1, 5), # file_id, size_mb

233

(2, 12),

234

(3, 3),

235

(4, 8),

236

(5, 15)

237

]

238

239

with ThreadPoolExecutor(max_workers=3) as executor:

240

# Submit all download tasks

241

future_to_file = {

242

executor.submit(download_file, file_id, size): (file_id, size)

243

for file_id, size in files_to_download

244

}

245

246

completed = 0

247

total = len(future_to_file)

248

249

# Show progress as downloads complete

250

for future in as_completed(future_to_file.keys()):

251

file_id, size = future_to_file[future]

252

completed += 1

253

254

try:

255

result = future.result()

256

print(f"[{completed}/{total}] {result}")

257

except Exception as e:

258

print(f"[{completed}/{total}] File {file_id} failed: {e}")

259

```

260

261

**Batch processing with as_completed:**

262

263

```python

264

def process_batch(batch_id, items):

265

"""Process a batch of items"""

266

time.sleep(len(items) * 0.1) # Processing time

267

processed = [item.upper() for item in items]

268

return {"batch_id": batch_id, "processed": processed}

269

270

# Split work into batches

271

all_items = ["apple", "banana", "cherry", "date", "elderberry",

272

"fig", "grape", "honeydew", "kiwi", "lemon"]

273

batch_size = 3

274

batches = [all_items[i:i+batch_size] for i in range(0, len(all_items), batch_size)]

275

276

with ThreadPoolExecutor(max_workers=2) as executor:

277

# Submit all batches

278

batch_futures = [

279

executor.submit(process_batch, i, batch)

280

for i, batch in enumerate(batches)

281

]

282

283

# Collect results as they complete

284

all_processed = []

285

for future in as_completed(batch_futures):

286

try:

287

result = future.result()

288

all_processed.extend(result["processed"])

289

print(f"Batch {result['batch_id']} completed")

290

except Exception as e:

291

print(f"Batch processing failed: {e}")

292

293

print(f"All processed items: {all_processed}")

294

```

295

296

### Return Types

297

298

```python { .api }

299

class DoneAndNotDoneFutures:

300

"""

301

Named tuple returned by wait() function.

302

303

Attributes:

304

- done (set): Set of completed Future objects

305

- not_done (set): Set of uncompleted Future objects

306

"""

307

done = None # set of Future objects

308

not_done = None # set of Future objects

309

```

310

311

### Constants

312

313

```python { .api }

314

# Wait condition constants for use with wait()

315

FIRST_COMPLETED = 'FIRST_COMPLETED' # Return when any future completes

316

FIRST_EXCEPTION = 'FIRST_EXCEPTION' # Return when any future raises exception

317

ALL_COMPLETED = 'ALL_COMPLETED' # Return when all futures complete (default)

318

```

319

320

### Advanced Patterns

321

322

**Combining wait() and as_completed():**

323

324

```python

325

def process_with_fallback(tasks):

326

"""Process tasks with timeout and fallback handling"""

327

with ThreadPoolExecutor(max_workers=4) as executor:

328

futures_list = [executor.submit(task_func, task) for task in tasks]

329

330

# First, try to get some quick results

331

done, not_done = wait(futures_list, timeout=1.0, return_when=FIRST_COMPLETED)

332

333

# Process any quick results

334

quick_results = []

335

for future in done:

336

try:

337

quick_results.append(future.result())

338

except Exception as e:

339

print(f"Quick task failed: {e}")

340

341

# Continue processing remaining tasks as they complete

342

if not_done:

343

for future in as_completed(not_done, timeout=5.0):

344

try:

345

result = future.result()

346

quick_results.append(result)

347

except Exception as e:

348

print(f"Slow task failed: {e}")

349

350

return quick_results

351

```

352

353

**Race condition handling:**

354

355

```python

356

def first_successful_result(tasks, max_workers=3):

357

"""Return first successful result, cancel others"""

358

with ThreadPoolExecutor(max_workers=max_workers) as executor:

359

futures_list = [executor.submit(task_func, task) for task in tasks]

360

361

try:

362

for future in as_completed(futures_list):

363

try:

364

result = future.result()

365

# Got first successful result - cancel others

366

for f in futures_list:

367

if f != future:

368

f.cancel()

369

return result

370

except Exception:

371

continue # Try next future

372

373

raise RuntimeError("All tasks failed")

374

except TimeoutError:

375

# Cancel all if timeout

376

for future in futures_list:

377

future.cancel()

378

raise

379

```