or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-pool-management.mderror-handling.mdindex.mdpool-control.mdprogress-tracking.md

pool-control.mddocs/

0

# Pool Control

1

2

Manual pool control capabilities including stopping execution, checking pool state, and managing concurrency dynamically during runtime.

3

4

## Capabilities

5

6

### Stoppable Interface

7

8

Interface for manually controlling pool execution state.

9

10

```typescript { .api }

11

/**

12

* Interface for stoppable promise pools

13

*/

14

interface Stoppable {

15

/** Stop the promise pool and prevent processing of remaining items */

16

stop(): void;

17

18

/** Determine whether the pool is marked as stopped */

19

isStopped(): boolean;

20

}

21

```

22

23

### UsesConcurrency Interface

24

25

Interface for managing concurrency settings at runtime.

26

27

```typescript { .api }

28

/**

29

* Interface for concurrency management

30

*/

31

interface UsesConcurrency {

32

/** Set the number of tasks to process concurrently */

33

useConcurrency(concurrency: number): this;

34

35

/** Returns the current concurrency level */

36

concurrency(): number;

37

}

38

```

39

40

### Manual Pool Stopping

41

42

Stop the promise pool execution manually from within processing or error handling functions.

43

44

**Usage Examples:**

45

46

```typescript

47

import { PromisePool } from "@supercharge/promise-pool";

48

49

// Stop from within process handler

50

await PromisePool

51

.for(users)

52

.process(async (user, index, pool) => {

53

if (user.requiresManualReview) {

54

console.log("Manual review required, stopping pool");

55

pool.stop();

56

return null;

57

}

58

59

return await processUser(user);

60

});

61

62

// Stop based on results

63

let processedCount = 0;

64

const maxToProcess = 100;

65

66

await PromisePool

67

.for(items)

68

.process(async (item, index, pool) => {

69

processedCount++;

70

71

if (processedCount >= maxToProcess) {

72

console.log(`Reached limit of ${maxToProcess} items, stopping`);

73

pool.stop();

74

return null;

75

}

76

77

return await processItem(item);

78

});

79

80

// Stop from error handler

81

await PromisePool

82

.for(items)

83

.handleError(async (error, item, pool) => {

84

if (error instanceof CriticalSystemError) {

85

console.error("Critical system error detected, stopping all processing");

86

pool.stop();

87

return;

88

}

89

90

// Handle non-critical errors normally

91

console.warn(`Warning: ${error.message}`);

92

})

93

.process(async (item) => processItem(item));

94

```

95

96

### Pool State Checking

97

98

Check if the pool has been stopped during execution.

99

100

**Usage Examples:**

101

102

```typescript

103

// Check pool state during processing

104

await PromisePool

105

.for(items)

106

.process(async (item, index, pool) => {

107

// Perform some preliminary work

108

const preprocessed = await preprocessItem(item);

109

110

// Check if pool was stopped by another task

111

if (pool.isStopped()) {

112

console.log("Pool was stopped, skipping remaining work");

113

return null;

114

}

115

116

// Continue with main processing

117

return await processItem(preprocessed);

118

});

119

120

// Conditional processing based on pool state

121

await PromisePool

122

.for(items)

123

.onTaskStarted((item, pool) => {

124

if (pool.isStopped()) {

125

console.log("Pool is stopped, task should not have started");

126

}

127

})

128

.process(async (item, index, pool) => {

129

if (pool.isStopped()) {

130

return null; // Skip processing if pool is stopped

131

}

132

133

return await processItem(item);

134

});

135

```

136

137

## Pool Control Patterns

138

139

### Pattern 1: Circuit Breaker

140

141

Stop processing when too many errors occur.

142

143

```typescript

144

class CircuitBreaker {

145

private errorCount = 0;

146

private readonly threshold: number;

147

148

constructor(threshold: number = 5) {

149

this.threshold = threshold;

150

}

151

152

async process<T>(items: T[], processor: (item: T) => Promise<any>) {

153

return await PromisePool

154

.for(items)

155

.handleError((error, item, pool) => {

156

this.errorCount++;

157

158

if (this.errorCount >= this.threshold) {

159

console.error(`Circuit breaker triggered: ${this.errorCount} errors exceeded threshold of ${this.threshold}`);

160

pool.stop();

161

return;

162

}

163

164

console.warn(`Error ${this.errorCount}/${this.threshold}: ${error.message}`);

165

})

166

.process(processor);

167

}

168

}

169

170

// Usage

171

const circuitBreaker = new CircuitBreaker(3);

172

173

const { results } = await circuitBreaker.process(

174

items,

175

async (item) => processItem(item)

176

);

177

```

178

179

### Pattern 2: Time-Based Stopping

180

181

Stop processing after a certain time limit.

182

183

```typescript

184

async function processWithTimeout<T>(

185

items: T[],

186

processor: (item: T) => Promise<any>,

187

timeoutMs: number

188

) {

189

const startTime = Date.now();

190

191

return await PromisePool

192

.for(items)

193

.onTaskStarted((item, pool) => {

194

const elapsed = Date.now() - startTime;

195

196

if (elapsed > timeoutMs) {

197

console.log(`Time limit of ${timeoutMs}ms exceeded, stopping pool`);

198

pool.stop();

199

}

200

})

201

.process(processor);

202

}

203

204

// Usage: Stop processing after 30 seconds

205

const { results } = await processWithTimeout(

206

items,

207

async (item) => processItem(item),

208

30000

209

);

210

```

211

212

### Pattern 3: Resource-Based Stopping

213

214

Stop when system resources are exhausted.

215

216

```typescript

217

import { memoryUsage } from 'process';

218

219

async function processWithMemoryLimit<T>(

220

items: T[],

221

processor: (item: T) => Promise<any>,

222

maxMemoryMB: number = 500

223

) {

224

return await PromisePool

225

.for(items)

226

.onTaskStarted((item, pool) => {

227

const memUsage = memoryUsage();

228

const currentMemoryMB = memUsage.heapUsed / 1024 / 1024;

229

230

if (currentMemoryMB > maxMemoryMB) {

231

console.log(`Memory usage (${currentMemoryMB.toFixed(1)}MB) exceeded limit (${maxMemoryMB}MB), stopping pool`);

232

pool.stop();

233

}

234

})

235

.process(processor);

236

}

237

238

// Usage

239

const { results } = await processWithMemoryLimit(

240

largeDataSet,

241

async (item) => processLargeItem(item),

242

1000 // 1GB limit

243

);

244

```

245

246

### Pattern 4: Conditional Stopping with User Input

247

248

Stop processing based on external conditions or user input.

249

250

```typescript

251

class InteractiveProcessor {

252

private shouldStop = false;

253

254

constructor() {

255

// Listen for user input to stop processing

256

process.stdin.on('data', (data) => {

257

const input = data.toString().trim();

258

if (input === 'stop') {

259

console.log('User requested stop');

260

this.shouldStop = true;

261

}

262

});

263

}

264

265

async process<T>(items: T[], processor: (item: T) => Promise<any>) {

266

console.log('Processing started. Type "stop" to halt execution.');

267

268

return await PromisePool

269

.for(items)

270

.onTaskStarted((item, pool) => {

271

if (this.shouldStop) {

272

console.log('Stopping due to user request');

273

pool.stop();

274

}

275

})

276

.process(processor);

277

}

278

}

279

280

// Usage

281

const interactiveProcessor = new InteractiveProcessor();

282

283

const { results } = await interactiveProcessor.process(

284

items,

285

async (item) => {

286

// Long-running processing

287

await new Promise(resolve => setTimeout(resolve, 1000));

288

return processItem(item);

289

}

290

);

291

```

292

293

### Pattern 5: Graceful Shutdown with Cleanup

294

295

Stop processing and perform cleanup operations.

296

297

```typescript

298

class GracefulProcessor {

299

private resources: any[] = [];

300

private isShuttingDown = false;

301

302

async processWithCleanup<T>(

303

items: T[],

304

processor: (item: T) => Promise<any>

305

) {

306

// Set up signal handlers for graceful shutdown

307

process.on('SIGINT', () => {

308

console.log('Received SIGINT, initiating graceful shutdown...');

309

this.isShuttingDown = true;

310

});

311

312

try {

313

const { results, errors } = await PromisePool

314

.for(items)

315

.onTaskStarted((item, pool) => {

316

if (this.isShuttingDown && !pool.isStopped()) {

317

console.log('Graceful shutdown initiated, stopping pool');

318

pool.stop();

319

}

320

})

321

.onTaskFinished((item, pool) => {

322

// Track resources for cleanup

323

this.resources.push(item);

324

})

325

.process(async (item, index, pool) => {

326

if (pool.isStopped()) {

327

return null;

328

}

329

330

return await processor(item);

331

});

332

333

return { results, errors };

334

} finally {

335

// Always perform cleanup

336

await this.cleanup();

337

}

338

}

339

340

private async cleanup() {

341

console.log(`Cleaning up ${this.resources.length} resources...`);

342

343

for (const resource of this.resources) {

344

try {

345

await this.cleanupResource(resource);

346

} catch (error) {

347

console.error(`Error cleaning up resource:`, error);

348

}

349

}

350

351

console.log('Cleanup completed');

352

}

353

354

private async cleanupResource(resource: any) {

355

// Implement resource-specific cleanup

356

console.log(`Cleaning up resource: ${resource}`);

357

}

358

}

359

360

// Usage

361

const gracefulProcessor = new GracefulProcessor();

362

363

const { results, errors } = await gracefulProcessor.processWithCleanup(

364

items,

365

async (item) => processItem(item)

366

);

367

```

368

369

### Dynamic Concurrency Control

370

371

Adjust concurrency during runtime based on system performance or conditions.

372

373

**Note:** Direct runtime concurrency adjustment is available through the internal executor, but the public PromisePool interface doesn't expose this directly. However, you can implement adaptive patterns:

374

375

```typescript

376

// Pattern: Restart with different concurrency based on performance

377

async function adaptiveConcurrencyProcessing<T>(

378

items: T[],

379

processor: (item: T) => Promise<any>

380

) {

381

let concurrency = 5; // Start with moderate concurrency

382

let remainingItems = [...items];

383

const allResults: any[] = [];

384

385

while (remainingItems.length > 0) {

386

const startTime = Date.now();

387

const batchSize = Math.min(100, remainingItems.length);

388

const currentBatch = remainingItems.splice(0, batchSize);

389

390

console.log(`Processing batch of ${currentBatch.length} items with concurrency ${concurrency}`);

391

392

const { results } = await PromisePool

393

.withConcurrency(concurrency)

394

.for(currentBatch)

395

.process(processor);

396

397

allResults.push(...results);

398

399

// Adjust concurrency based on performance

400

const elapsed = Date.now() - startTime;

401

const itemsPerSecond = batchSize / (elapsed / 1000);

402

403

if (itemsPerSecond > 10 && concurrency < 20) {

404

concurrency += 2; // Increase concurrency if processing fast

405

console.log(`Increasing concurrency to ${concurrency}`);

406

} else if (itemsPerSecond < 2 && concurrency > 1) {

407

concurrency = Math.max(1, concurrency - 1); // Decrease if slow

408

console.log(`Decreasing concurrency to ${concurrency}`);

409

}

410

}

411

412

return allResults;

413

}

414

415

// Usage

416

const results = await adaptiveConcurrencyProcessing(

417

items,

418

async (item) => processItem(item)

419

);

420

```