0
# Pool Control
1
2
Manual pool control capabilities including stopping execution, checking pool state, and managing concurrency dynamically during runtime.
3
4
## Capabilities
5
6
### Stoppable Interface
7
8
Interface for manually controlling pool execution state.
9
10
```typescript { .api }
11
/**
12
* Interface for stoppable promise pools
13
*/
14
interface Stoppable {
15
/** Stop the promise pool and prevent processing of remaining items */
16
stop(): void;
17
18
/** Determine whether the pool is marked as stopped */
19
isStopped(): boolean;
20
}
21
```
22
23
### UsesConcurrency Interface
24
25
Interface for managing concurrency settings at runtime.
26
27
```typescript { .api }
28
/**
29
* Interface for concurrency management
30
*/
31
interface UsesConcurrency {
32
/** Set the number of tasks to process concurrently */
33
useConcurrency(concurrency: number): this;
34
35
/** Returns the current concurrency level */
36
concurrency(): number;
37
}
38
```
39
40
### Manual Pool Stopping
41
42
Stop the promise pool execution manually from within processing or error handling functions.
43
44
**Usage Examples:**
45
46
```typescript
47
import { PromisePool } from "@supercharge/promise-pool";
48
49
// Stop from within process handler
50
await PromisePool
51
.for(users)
52
.process(async (user, index, pool) => {
53
if (user.requiresManualReview) {
54
console.log("Manual review required, stopping pool");
55
pool.stop();
56
return null;
57
}
58
59
return await processUser(user);
60
});
61
62
// Stop based on results
63
let processedCount = 0;
64
const maxToProcess = 100;
65
66
await PromisePool
67
.for(items)
68
.process(async (item, index, pool) => {
69
processedCount++;
70
71
if (processedCount >= maxToProcess) {
72
console.log(`Reached limit of ${maxToProcess} items, stopping`);
73
pool.stop();
74
return null;
75
}
76
77
return await processItem(item);
78
});
79
80
// Stop from error handler
81
await PromisePool
82
.for(items)
83
.handleError(async (error, item, pool) => {
84
if (error instanceof CriticalSystemError) {
85
console.error("Critical system error detected, stopping all processing");
86
pool.stop();
87
return;
88
}
89
90
// Handle non-critical errors normally
91
console.warn(`Warning: ${error.message}`);
92
})
93
.process(async (item) => processItem(item));
94
```
95
96
### Pool State Checking
97
98
Check if the pool has been stopped during execution.
99
100
**Usage Examples:**
101
102
```typescript
103
// Check pool state during processing
104
await PromisePool
105
.for(items)
106
.process(async (item, index, pool) => {
107
// Perform some preliminary work
108
const preprocessed = await preprocessItem(item);
109
110
// Check if pool was stopped by another task
111
if (pool.isStopped()) {
112
console.log("Pool was stopped, skipping remaining work");
113
return null;
114
}
115
116
// Continue with main processing
117
return await processItem(preprocessed);
118
});
119
120
// Conditional processing based on pool state
121
await PromisePool
122
.for(items)
123
.onTaskStarted((item, pool) => {
124
if (pool.isStopped()) {
125
console.log("Pool is stopped, task should not have started");
126
}
127
})
128
.process(async (item, index, pool) => {
129
if (pool.isStopped()) {
130
return null; // Skip processing if pool is stopped
131
}
132
133
return await processItem(item);
134
});
135
```
136
137
## Pool Control Patterns
138
139
### Pattern 1: Circuit Breaker
140
141
Stop processing when too many errors occur.
142
143
```typescript
144
class CircuitBreaker {
145
private errorCount = 0;
146
private readonly threshold: number;
147
148
constructor(threshold: number = 5) {
149
this.threshold = threshold;
150
}
151
152
async process<T>(items: T[], processor: (item: T) => Promise<any>) {
153
return await PromisePool
154
.for(items)
155
.handleError((error, item, pool) => {
156
this.errorCount++;
157
158
if (this.errorCount >= this.threshold) {
159
console.error(`Circuit breaker triggered: ${this.errorCount} errors exceeded threshold of ${this.threshold}`);
160
pool.stop();
161
return;
162
}
163
164
console.warn(`Error ${this.errorCount}/${this.threshold}: ${error.message}`);
165
})
166
.process(processor);
167
}
168
}
169
170
// Usage
171
const circuitBreaker = new CircuitBreaker(3);
172
173
const { results } = await circuitBreaker.process(
174
items,
175
async (item) => processItem(item)
176
);
177
```
178
179
### Pattern 2: Time-Based Stopping
180
181
Stop processing after a certain time limit.
182
183
```typescript
184
async function processWithTimeout<T>(
185
items: T[],
186
processor: (item: T) => Promise<any>,
187
timeoutMs: number
188
) {
189
const startTime = Date.now();
190
191
return await PromisePool
192
.for(items)
193
.onTaskStarted((item, pool) => {
194
const elapsed = Date.now() - startTime;
195
196
if (elapsed > timeoutMs) {
197
console.log(`Time limit of ${timeoutMs}ms exceeded, stopping pool`);
198
pool.stop();
199
}
200
})
201
.process(processor);
202
}
203
204
// Usage: Stop processing after 30 seconds
205
const { results } = await processWithTimeout(
206
items,
207
async (item) => processItem(item),
208
30000
209
);
210
```
211
212
### Pattern 3: Resource-Based Stopping
213
214
Stop when system resources are exhausted.
215
216
```typescript
217
import { memoryUsage } from 'process';
218
219
async function processWithMemoryLimit<T>(
220
items: T[],
221
processor: (item: T) => Promise<any>,
222
maxMemoryMB: number = 500
223
) {
224
return await PromisePool
225
.for(items)
226
.onTaskStarted((item, pool) => {
227
const memUsage = memoryUsage();
228
const currentMemoryMB = memUsage.heapUsed / 1024 / 1024;
229
230
if (currentMemoryMB > maxMemoryMB) {
231
console.log(`Memory usage (${currentMemoryMB.toFixed(1)}MB) exceeded limit (${maxMemoryMB}MB), stopping pool`);
232
pool.stop();
233
}
234
})
235
.process(processor);
236
}
237
238
// Usage
239
const { results } = await processWithMemoryLimit(
240
largeDataSet,
241
async (item) => processLargeItem(item),
242
1000 // 1GB limit
243
);
244
```
245
246
### Pattern 4: Conditional Stopping with User Input
247
248
Stop processing based on external conditions or user input.
249
250
```typescript
251
class InteractiveProcessor {
252
private shouldStop = false;
253
254
constructor() {
255
// Listen for user input to stop processing
256
process.stdin.on('data', (data) => {
257
const input = data.toString().trim();
258
if (input === 'stop') {
259
console.log('User requested stop');
260
this.shouldStop = true;
261
}
262
});
263
}
264
265
async process<T>(items: T[], processor: (item: T) => Promise<any>) {
266
console.log('Processing started. Type "stop" to halt execution.');
267
268
return await PromisePool
269
.for(items)
270
.onTaskStarted((item, pool) => {
271
if (this.shouldStop) {
272
console.log('Stopping due to user request');
273
pool.stop();
274
}
275
})
276
.process(processor);
277
}
278
}
279
280
// Usage
281
const interactiveProcessor = new InteractiveProcessor();
282
283
const { results } = await interactiveProcessor.process(
284
items,
285
async (item) => {
286
// Long-running processing
287
await new Promise(resolve => setTimeout(resolve, 1000));
288
return processItem(item);
289
}
290
);
291
```
292
293
### Pattern 5: Graceful Shutdown with Cleanup
294
295
Stop processing and perform cleanup operations.
296
297
```typescript
298
class GracefulProcessor {
299
private resources: any[] = [];
300
private isShuttingDown = false;
301
302
async processWithCleanup<T>(
303
items: T[],
304
processor: (item: T) => Promise<any>
305
) {
306
// Set up signal handlers for graceful shutdown
307
process.on('SIGINT', () => {
308
console.log('Received SIGINT, initiating graceful shutdown...');
309
this.isShuttingDown = true;
310
});
311
312
try {
313
const { results, errors } = await PromisePool
314
.for(items)
315
.onTaskStarted((item, pool) => {
316
if (this.isShuttingDown && !pool.isStopped()) {
317
console.log('Graceful shutdown initiated, stopping pool');
318
pool.stop();
319
}
320
})
321
.onTaskFinished((item, pool) => {
322
// Track resources for cleanup
323
this.resources.push(item);
324
})
325
.process(async (item, index, pool) => {
326
if (pool.isStopped()) {
327
return null;
328
}
329
330
return await processor(item);
331
});
332
333
return { results, errors };
334
} finally {
335
// Always perform cleanup
336
await this.cleanup();
337
}
338
}
339
340
private async cleanup() {
341
console.log(`Cleaning up ${this.resources.length} resources...`);
342
343
for (const resource of this.resources) {
344
try {
345
await this.cleanupResource(resource);
346
} catch (error) {
347
console.error(`Error cleaning up resource:`, error);
348
}
349
}
350
351
console.log('Cleanup completed');
352
}
353
354
private async cleanupResource(resource: any) {
355
// Implement resource-specific cleanup
356
console.log(`Cleaning up resource: ${resource}`);
357
}
358
}
359
360
// Usage
361
const gracefulProcessor = new GracefulProcessor();
362
363
const { results, errors } = await gracefulProcessor.processWithCleanup(
364
items,
365
async (item) => processItem(item)
366
);
367
```
368
369
### Dynamic Concurrency Control
370
371
Adjust concurrency during runtime based on system performance or conditions.
372
373
**Note:** Direct runtime concurrency adjustment is available through the internal executor, but the public PromisePool interface doesn't expose this directly. However, you can implement adaptive patterns:
374
375
```typescript
376
// Pattern: Restart with different concurrency based on performance
377
async function adaptiveConcurrencyProcessing<T>(
378
items: T[],
379
processor: (item: T) => Promise<any>
380
) {
381
let concurrency = 5; // Start with moderate concurrency
382
let remainingItems = [...items];
383
const allResults: any[] = [];
384
385
while (remainingItems.length > 0) {
386
const startTime = Date.now();
387
const batchSize = Math.min(100, remainingItems.length);
388
const currentBatch = remainingItems.splice(0, batchSize);
389
390
console.log(`Processing batch of ${currentBatch.length} items with concurrency ${concurrency}`);
391
392
const { results } = await PromisePool
393
.withConcurrency(concurrency)
394
.for(currentBatch)
395
.process(processor);
396
397
allResults.push(...results);
398
399
// Adjust concurrency based on performance
400
const elapsed = Date.now() - startTime;
401
const itemsPerSecond = batchSize / (elapsed / 1000);
402
403
if (itemsPerSecond > 10 && concurrency < 20) {
404
concurrency += 2; // Increase concurrency if processing fast
405
console.log(`Increasing concurrency to ${concurrency}`);
406
} else if (itemsPerSecond < 2 && concurrency > 1) {
407
concurrency = Math.max(1, concurrency - 1); // Decrease if slow
408
console.log(`Decreasing concurrency to ${concurrency}`);
409
}
410
}
411
412
return allResults;
413
}
414
415
// Usage
416
const results = await adaptiveConcurrencyProcessing(
417
items,
418
async (item) => processItem(item)
419
);
420
```