or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-greenlets.mdindex.mdmonkey-patching.mdnetworking.mdpooling.mdqueues.mdservers.mdsynchronization.mdtimeouts.md

pooling.mddocs/

0

# Pooling

1

2

Tools for managing groups of greenlets including pools with size limits, groups for coordination, and thread pools for CPU-bound operations. These provide structured concurrency management and resource control.

3

4

## Capabilities

5

6

### Greenlet Pools

7

8

Limited-size pools of greenlets for controlling concurrency.

9

10

```python { .api }

11

class Pool:

12

"""

13

Pool of greenlets with maximum size limit.

14

"""

15

16

def __init__(self, size=None, greenlet_class=None):

17

"""

18

Create a pool.

19

20

Parameters:

21

- size: int, maximum pool size (None for unlimited)

22

- greenlet_class: class, greenlet class to use

23

"""

24

25

def spawn(self, function, *args, **kwargs) -> Greenlet:

26

"""

27

Spawn function in pool, blocking if pool is full.

28

29

Parameters:

30

- function: callable to execute

31

- *args: positional arguments

32

- **kwargs: keyword arguments

33

34

Returns:

35

Greenlet object

36

"""

37

38

def map(self, func, iterable, maxsize=None):

39

"""

40

Map function over iterable using pool.

41

42

Parameters:

43

- func: function to apply

44

- iterable: sequence to process

45

- maxsize: int, buffer size for results

46

47

Returns:

48

list of results in order

49

"""

50

51

def imap(self, func, iterable, maxsize=None):

52

"""

53

Lazy map function over iterable using pool.

54

55

Parameters:

56

- func: function to apply

57

- iterable: sequence to process

58

- maxsize: int, buffer size for results

59

60

Yields:

61

Results as they become available

62

"""

63

64

def imap_unordered(self, func, iterable, maxsize=None):

65

"""

66

Lazy map returning results in completion order.

67

68

Parameters:

69

- func: function to apply

70

- iterable: sequence to process

71

- maxsize: int, buffer size for results

72

73

Yields:

74

Results as they complete (unordered)

75

"""

76

77

def apply(self, func, args=(), kwds={}):

78

"""

79

Synchronously apply function with arguments.

80

81

Parameters:

82

- func: function to call

83

- args: tuple, positional arguments

84

- kwds: dict, keyword arguments

85

86

Returns:

87

Function result

88

"""

89

90

def apply_async(self, func, args=(), kwds={}, callback=None):

91

"""

92

Asynchronously apply function with arguments.

93

94

Parameters:

95

- func: function to call

96

- args: tuple, positional arguments

97

- kwds: dict, keyword arguments

98

- callback: callable, called with result

99

100

Returns:

101

AsyncResult object

102

"""

103

104

def join(self, timeout=None, raise_error=False):

105

"""

106

Wait for all greenlets in pool to finish.

107

108

Parameters:

109

- timeout: float, maximum time to wait

110

- raise_error: bool, raise exception if any failed

111

112

Returns:

113

None

114

"""

115

116

def kill(self, exception=GreenletExit, block=True, timeout=None):

117

"""

118

Kill all greenlets in pool.

119

120

Parameters:

121

- exception: exception to raise in greenlets

122

- block: bool, wait for greenlets to die

123

- timeout: float, maximum time to wait

124

125

Returns:

126

None

127

"""

128

129

@property

130

def size(self) -> int:

131

"""Current number of greenlets in pool."""

132

133

@property

134

def free_count(self) -> int:

135

"""Number of available slots in pool."""

136

137

class PoolFull(Exception):

138

"""Exception raised when pool is full and cannot accept more greenlets."""

139

```

140

141

### Greenlet Groups

142

143

Unbound collections of greenlets for coordination.

144

145

```python { .api }

146

class Group:

147

"""

148

Collection of greenlets for coordination without size limits.

149

"""

150

151

def add(self, greenlet):

152

"""

153

Add greenlet to group.

154

155

Parameters:

156

- greenlet: Greenlet object to add

157

158

Returns:

159

None

160

"""

161

162

def discard(self, greenlet):

163

"""

164

Remove greenlet from group.

165

166

Parameters:

167

- greenlet: Greenlet object to remove

168

169

Returns:

170

None

171

"""

172

173

def spawn(self, function, *args, **kwargs) -> Greenlet:

174

"""

175

Spawn greenlet and add to group.

176

177

Parameters:

178

- function: callable to execute

179

- *args: positional arguments

180

- **kwargs: keyword arguments

181

182

Returns:

183

Greenlet object

184

"""

185

186

def map(self, func, iterable):

187

"""

188

Map function over iterable using group greenlets.

189

190

Parameters:

191

- func: function to apply

192

- iterable: sequence to process

193

194

Returns:

195

list of results in order

196

"""

197

198

def imap(self, func, iterable):

199

"""

200

Lazy map function over iterable.

201

202

Parameters:

203

- func: function to apply

204

- iterable: sequence to process

205

206

Yields:

207

Results as they become available

208

"""

209

210

def join(self, timeout=None, raise_error=False):

211

"""

212

Wait for all greenlets in group to finish.

213

214

Parameters:

215

- timeout: float, maximum time to wait

216

- raise_error: bool, raise exception if any failed

217

218

Returns:

219

None

220

"""

221

222

def kill(self, exception=GreenletExit, block=True, timeout=None):

223

"""

224

Kill all greenlets in group.

225

226

Parameters:

227

- exception: exception to raise in greenlets

228

- block: bool, wait for greenlets to die

229

- timeout: float, maximum time to wait

230

231

Returns:

232

None

233

"""

234

235

def killone(self, greenlet, exception=GreenletExit, block=True, timeout=None):

236

"""

237

Kill one greenlet in group.

238

239

Parameters:

240

- greenlet: Greenlet object to kill

241

- exception: exception to raise

242

- block: bool, wait for greenlet to die

243

- timeout: float, maximum time to wait

244

245

Returns:

246

None

247

"""

248

```

249

250

### Thread Pools

251

252

Thread pools for CPU-bound operations that need true parallelism.

253

254

```python { .api }

255

class ThreadPool:

256

"""

257

Pool of OS threads for CPU-bound operations.

258

"""

259

260

def __init__(self, maxsize, hub=None):

261

"""

262

Create thread pool.

263

264

Parameters:

265

- maxsize: int, maximum number of threads

266

- hub: Hub, hub to use for coordination

267

"""

268

269

def spawn(self, func, *args, **kwargs) -> ThreadResult:

270

"""

271

Execute function in thread pool.

272

273

Parameters:

274

- func: function to execute

275

- *args: positional arguments

276

- **kwargs: keyword arguments

277

278

Returns:

279

ThreadResult object

280

"""

281

282

def apply(self, func, args=(), kwds={}):

283

"""

284

Synchronously execute function in thread.

285

286

Parameters:

287

- func: function to execute

288

- args: tuple, positional arguments

289

- kwds: dict, keyword arguments

290

291

Returns:

292

Function result

293

"""

294

295

def apply_async(self, func, args=(), kwds={}, callback=None):

296

"""

297

Asynchronously execute function in thread.

298

299

Parameters:

300

- func: function to execute

301

- args: tuple, positional arguments

302

- kwds: dict, keyword arguments

303

- callback: callable, called with result

304

305

Returns:

306

AsyncResult object

307

"""

308

309

def kill(self):

310

"""

311

Kill all threads in pool.

312

313

Returns:

314

None

315

"""

316

317

class ThreadResult:

318

"""

319

Result object for thread pool operations.

320

"""

321

322

def get(self, timeout=None):

323

"""

324

Get the result, blocking if necessary.

325

326

Parameters:

327

- timeout: float, maximum time to wait

328

329

Returns:

330

Function result

331

"""

332

333

def ready(self) -> bool:

334

"""

335

Check if result is available.

336

337

Returns:

338

bool, True if result is ready

339

"""

340

```

341

342

## Usage Examples

343

344

### Pool for Concurrent Requests

345

346

```python

347

import gevent

348

from gevent import pool

349

from gevent import socket

350

import time

351

352

def fetch_url(url):

353

# Simulate HTTP request

354

print(f"Fetching {url}")

355

gevent.sleep(1) # Simulate network delay

356

return f"Content from {url}"

357

358

# Create pool with max 5 concurrent operations

359

request_pool = pool.Pool(5)

360

361

urls = [f"http://example.com/page{i}" for i in range(20)]

362

363

# Process URLs with pool

364

start_time = time.time()

365

results = request_pool.map(fetch_url, urls)

366

end_time = time.time()

367

368

print(f"Processed {len(urls)} URLs in {end_time - start_time:.2f} seconds")

369

print(f"First result: {results[0]}")

370

```

371

372

### Group for Related Tasks

373

374

```python

375

import gevent

376

from gevent import pool

377

378

def worker(task_id, duration):

379

print(f"Task {task_id} starting")

380

gevent.sleep(duration)

381

print(f"Task {task_id} completed")

382

return f"Result {task_id}"

383

384

# Create group for related tasks

385

task_group = pool.Group()

386

387

# Add tasks to group

388

for i in range(5):

389

task_group.spawn(worker, i, i * 0.5)

390

391

# Wait for all tasks

392

print("Waiting for all tasks to complete...")

393

task_group.join()

394

print("All tasks completed!")

395

```

396

397

### Thread Pool for CPU-Bound Work

398

399

```python

400

import gevent

401

from gevent import threadpool

402

import math

403

404

def cpu_intensive_task(n):

405

"""CPU-intensive task that benefits from real threads."""

406

result = 0

407

for i in range(n):

408

result += math.sqrt(i)

409

return result

410

411

# Create thread pool

412

thread_pool = threadpool.ThreadPool(4)

413

414

# Execute CPU-bound tasks

415

tasks = [100000, 200000, 300000, 400000]

416

results = []

417

418

for task in tasks:

419

result = thread_pool.spawn(cpu_intensive_task, task)

420

results.append(result)

421

422

# Get results

423

for i, result in enumerate(results):

424

value = result.get() # Blocks until complete

425

print(f"Task {i}: {value:.2f}")

426

427

thread_pool.kill()

428

```

429

430

### Pool with Error Handling

431

432

```python

433

import gevent

434

from gevent import pool

435

import random

436

437

def unreliable_task(task_id):

438

gevent.sleep(random.uniform(0.1, 0.5))

439

if random.random() < 0.3: # 30% chance of failure

440

raise Exception(f"Task {task_id} failed")

441

return f"Success {task_id}"

442

443

# Create pool

444

work_pool = pool.Pool(3)

445

446

# Process tasks with error handling

447

task_ids = range(10)

448

results = []

449

450

for task_id in task_ids:

451

greenlet = work_pool.spawn(unreliable_task, task_id)

452

results.append(greenlet)

453

454

# Check results

455

for i, greenlet in enumerate(results):

456

try:

457

result = greenlet.get()

458

print(f"Task {i}: {result}")

459

except Exception as e:

460

print(f"Task {i} failed: {e}")

461

```