0
# Worker Context
1
2
Utilities for setting up worker processes, registering functions, and handling communication between the main thread and workers. This module provides the worker-side API for function registration, event emission, and cleanup handling.
3
4
## Capabilities
5
6
### Worker Function Registration
7
8
Register functions to be available for execution from the main thread. This is used in dedicated worker scripts to expose methods that can be called via `pool.exec()` or worker proxies.
9
10
```javascript { .api }
11
/**
12
* Register functions in a worker context
13
* @param {object} [methods] - Object containing functions to register
14
* @param {WorkerRegisterOptions} [options] - Registration configuration
15
*/
16
function worker(methods, options) -> void
17
```
18
19
**Usage Examples:**
20
21
```javascript
22
// myWorker.js - dedicated worker script
23
const workerpool = require('workerpool');
24
25
// Register individual functions
26
workerpool.worker({
27
fibonacci: function(n) {
28
if (n < 2) return n;
29
return fibonacci(n - 2) + fibonacci(n - 1);
30
},
31
32
processData: function(data) {
33
return data.map(item => ({ ...item, processed: true }));
34
},
35
36
asyncTask: async function(config) {
37
await new Promise(resolve => setTimeout(resolve, config.delay));
38
return { status: 'completed', config };
39
}
40
});
41
42
// Register with cleanup handler
43
workerpool.worker({
44
longRunningTask: function(data) {
45
// Register cleanup for this specific task
46
this.addAbortListener(async () => {
47
console.log('Cleaning up long running task...');
48
// Perform cleanup operations
49
await cleanupResources();
50
});
51
52
return processLongRunningTask(data);
53
}
54
}, {
55
onTerminate: async function(code) {
56
console.log('Worker terminating with code:', code);
57
await cleanup();
58
},
59
abortListenerTimeout: 5000 // 5 second timeout for cleanup
60
});
61
```
62
63
### Worker Event Emission
64
65
Emit events from workers to the main thread during task execution. This enables progress reporting, status updates, and real-time communication during long-running tasks.
66
67
```javascript { .api }
68
/**
69
* Emit an event from worker to main thread
70
* @param {any} payload - Event data to send
71
*/
72
function workerEmit(payload) -> void
73
```
74
75
**Usage Examples:**
76
77
```javascript
78
// In worker script
79
const workerpool = require('workerpool');
80
81
workerpool.worker({
82
processLargeDataset: function(dataset) {
83
const total = dataset.length;
84
const results = [];
85
86
for (let i = 0; i < total; i++) {
87
// Process item
88
const result = processItem(dataset[i]);
89
results.push(result);
90
91
// Emit progress updates
92
if (i % 100 === 0) {
93
workerpool.workerEmit({
94
type: 'progress',
95
completed: i,
96
total: total,
97
percentage: Math.round((i / total) * 100)
98
});
99
}
100
}
101
102
workerpool.workerEmit({
103
type: 'completed',
104
message: 'Processing finished successfully'
105
});
106
107
return results;
108
},
109
110
downloadAndProcess: async function(url) {
111
// Emit status updates
112
workerpool.workerEmit({ status: 'downloading', url });
113
114
const data = await fetch(url).then(r => r.json());
115
116
workerpool.workerEmit({
117
status: 'processing',
118
size: data.length
119
});
120
121
const processed = await processData(data);
122
123
workerpool.workerEmit({
124
status: 'complete',
125
resultSize: processed.length
126
});
127
128
return processed;
129
}
130
});
131
132
// In main thread
133
const result = await pool.exec('processLargeDataset', [data], {
134
on: function(event) {
135
if (event.type === 'progress') {
136
console.log(`Progress: ${event.percentage}%`);
137
} else if (event.type === 'completed') {
138
console.log(event.message);
139
}
140
}
141
});
142
```
143
144
### Worker API (Available within worker functions)
145
146
When functions are registered with `workerpool.worker()`, they receive access to a special `this` context containing worker utilities.
147
148
```javascript { .api }
149
interface WorkerAPI {
150
/**
151
* Register cleanup listener for task cancellation/timeout
152
* @param {function} listener - Async function to handle cleanup
153
*/
154
addAbortListener(listener: () => Promise<void>) -> void
155
156
/**
157
* Emit event to main thread (same as workerEmit)
158
* @param {any} payload - Event data to send
159
*/
160
emit(payload: any) -> void
161
}
162
```
163
164
**Usage Examples:**
165
166
```javascript
167
workerpool.worker({
168
taskWithCleanup: function(config) {
169
let resources = null;
170
171
// Register cleanup handler
172
this.addAbortListener(async () => {
173
console.log('Task was cancelled, cleaning up...');
174
if (resources) {
175
await resources.close();
176
resources = null;
177
}
178
});
179
180
// Initialize resources
181
resources = initializeResources(config);
182
183
// Emit progress during execution
184
this.emit({ status: 'initialized', resourceCount: resources.length });
185
186
// Perform work
187
const result = performWork(resources);
188
189
// Cleanup on successful completion
190
await resources.close();
191
resources = null;
192
193
return result;
194
},
195
196
interruptibleTask: function(data) {
197
const processor = new DataProcessor(data);
198
199
// Handle graceful cancellation
200
this.addAbortListener(async () => {
201
await processor.gracefulStop();
202
});
203
204
// Process with periodic progress reports
205
return processor.process((progress) => {
206
this.emit({
207
type: 'progress',
208
current: progress.current,
209
total: progress.total,
210
eta: progress.estimatedTimeRemaining
211
});
212
});
213
}
214
});
215
```
216
217
### Built-in Worker Methods
218
219
These methods are automatically available in any worker and can be called from the main thread.
220
221
```javascript { .api }
222
/**
223
* Execute a stringified function (used for dynamic offloading)
224
* @param {string} fn - Stringified function code
225
* @param {array} args - Function arguments
226
* @returns {any} Function execution result
227
*/
228
function run(fn, args) -> any
229
230
/**
231
* Get list of available worker methods
232
* @returns {string[]} Array of method names
233
*/
234
function methods() -> string[]
235
```
236
237
**Usage Examples:**
238
239
```javascript
240
// These are called automatically by workerpool
241
// when using pool.exec() with functions or pool.proxy()
242
243
// Dynamic function execution (called internally)
244
// pool.exec(function(x) { return x * 2; }, [5])
245
// Internally calls: worker.run('function(x) { return x * 2; }', [5])
246
247
// Method discovery (called internally by proxy())
248
// const proxy = await pool.proxy();
249
// Internally calls: worker.methods() to discover available functions
250
```
251
252
## Worker Registration Options
253
254
```javascript { .api }
255
interface WorkerRegisterOptions {
256
/**
257
* Callback executed when worker is terminating
258
* Runs in worker context for cleanup
259
*/
260
onTerminate?: (code: number | undefined) => PromiseLike<void> | void
261
262
/**
263
* Timeout for abort listeners in milliseconds (default: 1000)
264
* If cleanup takes longer, worker will be forcefully terminated
265
*/
266
abortListenerTimeout?: number
267
}
268
```
269
270
**Configuration Examples:**
271
272
```javascript
273
// Database worker with connection cleanup
274
workerpool.worker({
275
queryDatabase: async function(query) {
276
const result = await db.query(query);
277
return result;
278
}
279
}, {
280
onTerminate: async function(code) {
281
console.log('Database worker terminating...');
282
if (db && db.connected) {
283
await db.close();
284
}
285
},
286
abortListenerTimeout: 10000 // Allow 10 seconds for cleanup
287
});
288
289
// File processing worker with temp file cleanup
290
workerpool.worker({
291
processFile: function(filePath) {
292
const tempFiles = [];
293
294
this.addAbortListener(async () => {
295
// Clean up temporary files
296
for (const tempFile of tempFiles) {
297
try {
298
await fs.unlink(tempFile);
299
} catch (err) {
300
console.warn('Failed to clean temp file:', err);
301
}
302
}
303
});
304
305
// Process file and create temp files
306
const result = processWithTempFiles(filePath, tempFiles);
307
308
// Clean up temp files on success
309
tempFiles.forEach(file => fs.unlinkSync(file));
310
311
return result;
312
}
313
}, {
314
abortListenerTimeout: 5000 // 5 second cleanup timeout
315
});
316
```
317
318
## Error Handling in Workers
319
320
```javascript
321
// Workers should handle errors appropriately
322
workerpool.worker({
323
riskyOperation: function(data) {
324
try {
325
// Validate input
326
if (!data || typeof data !== 'object') {
327
throw new Error('Invalid input data');
328
}
329
330
// Perform operation
331
return performRiskyOperation(data);
332
333
} catch (error) {
334
// Error will be propagated to main thread
335
// and reject the Promise returned by pool.exec()
336
throw new Error(`Operation failed: ${error.message}`);
337
}
338
},
339
340
asyncRiskyOperation: async function(config) {
341
try {
342
const result = await performAsyncOperation(config);
343
return result;
344
} catch (error) {
345
// Async errors are also properly propagated
346
throw new Error(`Async operation failed: ${error.message}`);
347
}
348
}
349
});
350
```