or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-concurrency.mddebugging.mdgreen-stdlib.mdindex.mdmonkey-patching.mdnetworking.mdresource-pooling.mdsynchronization.mdthread-pools.mdweb-server.md

thread-pools.mddocs/

0

# Thread Pools

1

2

Managed pools of green threads for controlling concurrency levels and resource usage in high-throughput applications. Thread pools help prevent resource exhaustion while maximizing performance.

3

4

## Capabilities

5

6

### Green Thread Pool

7

8

A pool that manages a fixed number of green threads, providing controlled concurrency and resource management.

9

10

```python { .api }

11

class GreenPool:

12

"""

13

A pool of green threads with a maximum size constraint.

14

Manages concurrency levels and provides convenient methods for

15

spawning work across the pool.

16

"""

17

18

def __init__(self, size=1000):

19

"""

20

Create a green thread pool.

21

22

Parameters:

23

- size: int, maximum number of concurrent greenthreads

24

"""

25

26

def spawn(self, func, *args, **kwargs):

27

"""

28

Spawn a function in the pool, blocking if pool is full.

29

30

Parameters:

31

- func: callable to run in pool

32

- *args: positional arguments for func

33

- **kwargs: keyword arguments for func

34

35

Returns:

36

GreenThread instance for retrieving results

37

"""

38

39

def spawn_n(self, func, *args, **kwargs):

40

"""

41

Spawn a function in the pool without return value access.

42

43

Parameters:

44

- func: callable to run in pool

45

- *args: positional arguments for func

46

- **kwargs: keyword arguments for func

47

48

Returns:

49

None

50

"""

51

52

def waitall(self):

53

"""

54

Wait for all spawned greenthreads in the pool to complete.

55

56

Returns:

57

None

58

"""

59

60

def resize(self, new_size):

61

"""

62

Change the maximum size of the pool.

63

64

Parameters:

65

- new_size: int, new maximum pool size

66

67

Returns:

68

None

69

"""

70

71

def running(self):

72

"""

73

Get the number of currently running greenthreads.

74

75

Returns:

76

int: number of active greenthreads

77

"""

78

79

def free(self):

80

"""

81

Get the number of available slots in the pool.

82

83

Returns:

84

int: number of free slots (size - running)

85

"""

86

87

def starmap(self, func, iterable):

88

"""

89

Apply func to each item in iterable, spawning across the pool.

90

Similar to itertools.starmap but with green thread pool execution.

91

92

Parameters:

93

- func: callable to apply to each item

94

- iterable: iterable of argument tuples for func

95

96

Returns:

97

iterator of results

98

"""

99

100

def imap(self, func, *iterables):

101

"""

102

Apply func to items from iterables in parallel across the pool.

103

Similar to itertools.imap but with green thread pool execution.

104

105

Parameters:

106

- func: callable to apply to items

107

- *iterables: one or more iterables

108

109

Returns:

110

iterator of results in order

111

"""

112

```

113

114

### Green Pile

115

116

A data structure for managing I/O-related tasks that provides easy iteration over results as they become available.

117

118

```python { .api }

119

class GreenPile:

120

"""

121

A pile of green threads for I/O operations.

122

Results can be retrieved as they become available through iteration.

123

"""

124

125

def __init__(self, size_or_pool=1000):

126

"""

127

Create a green pile.

128

129

Parameters:

130

- size_or_pool: int (pool size) or GreenPool instance

131

"""

132

133

def spawn(self, func, *args, **kwargs):

134

"""

135

Spawn a function and add its result to the pile.

136

137

Parameters:

138

- func: callable to run

139

- *args: positional arguments for func

140

- **kwargs: keyword arguments for func

141

142

Returns:

143

GreenThread instance

144

"""

145

146

def __iter__(self):

147

"""

148

Iterate over results as they become available.

149

150

Returns:

151

iterator yielding results in completion order

152

"""

153

154

def next(self):

155

"""

156

Get the next available result.

157

158

Returns:

159

Next completed result

160

161

Raises:

162

StopIteration: when no more results available

163

"""

164

165

def __len__(self):

166

"""

167

Get the number of spawned tasks.

168

169

Returns:

170

int: number of tasks in the pile

171

"""

172

```

173

174

## Usage Examples

175

176

### Basic Pool Usage

177

178

```python

179

import eventlet

180

181

def worker(task_id, duration):

182

"""Simulate some work"""

183

print(f"Task {task_id} starting")

184

eventlet.sleep(duration)

185

print(f"Task {task_id} completed")

186

return f"Result {task_id}"

187

188

# Create a pool with max 5 concurrent greenthreads

189

pool = eventlet.GreenPool(5)

190

191

# Spawn multiple tasks

192

results = []

193

for i in range(10):

194

gt = pool.spawn(worker, i, 1.0)

195

results.append(gt)

196

197

# Wait for all tasks to complete

198

pool.waitall()

199

200

# Collect results

201

final_results = [gt.wait() for gt in results]

202

print(f"All results: {final_results}")

203

```

204

205

### Pool with Map Operations

206

207

```python

208

import eventlet

209

210

def fetch_url(url):

211

"""Simulate fetching a URL"""

212

# In real usage, would use eventlet.green.urllib or similar

213

eventlet.sleep(0.5) # Simulate network delay

214

return f"Content from {url}"

215

216

urls = [

217

"http://example.com/page1",

218

"http://example.com/page2",

219

"http://example.com/page3",

220

"http://example.com/page4",

221

"http://example.com/page5"

222

]

223

224

pool = eventlet.GreenPool(3) # Max 3 concurrent requests

225

226

# Use imap to process URLs and get results in order

227

for result in pool.imap(fetch_url, urls):

228

print(f"Got: {result}")

229

230

# Use starmap with argument tuples

231

def fetch_with_headers(url, headers):

232

eventlet.sleep(0.3)

233

return f"Content from {url} with headers {headers}"

234

235

url_header_pairs = [

236

("http://api.example.com/users", {"Authorization": "Bearer token1"}),

237

("http://api.example.com/posts", {"Authorization": "Bearer token2"}),

238

("http://api.example.com/comments", {"Authorization": "Bearer token3"})

239

]

240

241

for result in pool.starmap(fetch_with_headers, url_header_pairs):

242

print(f"API result: {result}")

243

```

244

245

### Green Pile Usage

246

247

```python

248

import eventlet

249

250

def fetch_data(source):

251

"""Simulate fetching data from different sources"""

252

import random

253

delay = random.uniform(0.1, 2.0) # Random delay

254

eventlet.sleep(delay)

255

return f"Data from {source} (took {delay:.2f}s)"

256

257

# Create a pile for managing multiple I/O operations

258

pile = eventlet.GreenPile(10)

259

260

# Spawn multiple data fetches

261

sources = ["database", "api", "cache", "file", "network"]

262

for source in sources:

263

pile.spawn(fetch_data, source)

264

265

# Process results as they become available (not in original order)

266

print("Processing results as they arrive:")

267

for result in pile:

268

print(f"Received: {result}")

269

270

print("All operations completed")

271

```

272

273

### Dynamic Pool Resizing

274

275

```python

276

import eventlet

277

278

def monitoring_task():

279

"""Simulate a monitoring task"""

280

eventlet.sleep(1.0)

281

return "Monitor check complete"

282

283

pool = eventlet.GreenPool(2) # Start with small pool

284

285

# Spawn some initial tasks

286

for i in range(3):

287

pool.spawn(monitoring_task)

288

289

print(f"Pool stats - Running: {pool.running()}, Free: {pool.free()}")

290

291

# Increase pool size based on demand

292

if pool.free() == 0:

293

print("Pool full, increasing size")

294

pool.resize(5)

295

print(f"After resize - Running: {pool.running()}, Free: {pool.free()}")

296

297

# Spawn more tasks with increased capacity

298

for i in range(3, 7):

299

pool.spawn(monitoring_task)

300

301

# Wait for completion

302

pool.waitall()

303

print("All monitoring tasks completed")

304

```

305

306

### Producer-Consumer with Pools

307

308

```python

309

import eventlet

310

311

def producer(pool, queue, num_items):

312

"""Produce items using a pool of workers"""

313

def create_item(item_id):

314

eventlet.sleep(0.1) # Simulate work

315

return f"item_{item_id}"

316

317

# Use pool to create items concurrently

318

for i in range(num_items):

319

gt = pool.spawn(create_item, i)

320

# Put the greenthread in queue so consumer can wait for result

321

queue.put(gt)

322

323

def consumer(queue, num_items):

324

"""Consume items as they're produced"""

325

for _ in range(num_items):

326

gt = queue.get() # Get the greenthread

327

item = gt.wait() # Wait for the actual result

328

print(f"Consumed: {item}")

329

330

# Set up producer-consumer with pool

331

production_pool = eventlet.GreenPool(5)

332

item_queue = eventlet.Queue()

333

334

# Start producer and consumer

335

eventlet.spawn(producer, production_pool, item_queue, 10)

336

eventlet.spawn(consumer, item_queue, 10)

337

338

# Let them run

339

eventlet.sleep(5)

340

```

341

342

## Pool Management Best Practices

343

344

### Choosing Pool Size

345

346

```python

347

import eventlet

348

349

# For I/O-bound tasks (network, disk)

350

io_pool = eventlet.GreenPool(100) # Higher concurrency

351

352

# For CPU-bound tasks or resource-limited operations

353

cpu_pool = eventlet.GreenPool(10) # Lower concurrency

354

355

# For database connections (limited by DB connection pool)

356

db_pool = eventlet.GreenPool(20) # Match DB pool size

357

```

358

359

### Error Handling in Pools

360

361

```python

362

import eventlet

363

364

def risky_task(task_id):

365

"""Task that might fail"""

366

if task_id % 3 == 0:

367

raise ValueError(f"Task {task_id} failed!")

368

return f"Success {task_id}"

369

370

pool = eventlet.GreenPool(5)

371

greenthreads = []

372

373

# Spawn tasks

374

for i in range(10):

375

gt = pool.spawn(risky_task, i)

376

greenthreads.append(gt)

377

378

# Collect results with error handling

379

results = []

380

for gt in greenthreads:

381

try:

382

result = gt.wait()

383

results.append(result)

384

except Exception as e:

385

print(f"Task failed: {e}")

386

results.append(None)

387

388

print(f"Successful results: {[r for r in results if r is not None]}")

389

```