0
# Worker Tasks
1
2
Remote task execution framework allowing system services to run tasks on worker nodes with full system context, serializable parameters, and comprehensive error handling.
3
4
## Capabilities
5
6
### RunnableTask
7
8
Interface representing a task that can be launched by a Task worker service.
9
10
```java { .api }
11
/**
12
* RunnableTask represents a task that can be launched by a Task worker service.
13
*/
14
public interface RunnableTask {
15
/**
16
* Executes the task with the provided context.
17
* @param context the execution context for the task
18
* @throws Exception if task execution fails
19
*/
20
void run(RunnableTaskContext context) throws Exception;
21
}
22
```
23
24
**Usage Example:**
25
26
```java
27
import io.cdap.cdap.api.service.worker.RunnableTask;
28
import io.cdap.cdap.api.service.worker.RunnableTaskContext;
29
30
public class DataProcessingTask implements RunnableTask {
31
@Override
32
public void run(RunnableTaskContext context) throws Exception {
33
// Get task parameters
34
String param = context.getParam();
35
String namespace = context.getNamespace();
36
37
// Perform task logic
38
String result = processData(param, namespace);
39
40
// Write result back
41
context.writeResult(result.getBytes());
42
43
// Set cleanup task if needed
44
context.setCleanupTask(() -> {
45
// Cleanup resources
46
});
47
}
48
49
private String processData(String param, String namespace) {
50
// Task implementation
51
return "processed: " + param;
52
}
53
}
54
```
55
56
### RunnableTaskRequest
57
58
Request object for launching a runnable task with parameters and configuration.
59
60
```java { .api }
61
/**
62
* Request for launching a runnable task.
63
*/
64
public class RunnableTaskRequest {
65
/**
66
* Returns the task class name.
67
* @return class name of the task to execute
68
*/
69
public String getClassName();
70
71
/**
72
* Returns the task parameter.
73
* @return task parameter or null if not set
74
*/
75
@Nullable
76
public RunnableTaskParam getParam();
77
78
/**
79
* Returns the artifact ID.
80
* @return artifact ID or null if not set
81
*/
82
@Nullable
83
public ArtifactId getArtifactId();
84
85
/**
86
* Returns the namespace.
87
* @return namespace or null if not set
88
*/
89
@Nullable
90
public String getNamespace();
91
92
/**
93
* Returns builder for RunnableTaskRequest.
94
* @param taskClassName the class name of the task
95
* @return builder instance
96
*/
97
public static Builder getBuilder(String taskClassName);
98
99
/**
100
* Builder for RunnableTaskRequest.
101
*/
102
public static class Builder {
103
/**
104
* Sets parameter for the task.
105
* @param param parameter string
106
* @return builder instance
107
*/
108
public Builder withParam(String param);
109
110
/**
111
* Sets namespace for the task.
112
* @param namespace namespace string
113
* @return builder instance
114
*/
115
public Builder withNamespace(String namespace);
116
117
/**
118
* Sets artifact ID for the task.
119
* @param artifactId artifact identifier
120
* @return builder instance
121
*/
122
public Builder withArtifact(ArtifactId artifactId);
123
124
/**
125
* Sets embedded task request.
126
* @param embeddedTaskRequest nested task request
127
* @return builder instance
128
*/
129
public Builder withEmbeddedTaskRequest(RunnableTaskRequest embeddedTaskRequest);
130
131
/**
132
* Builds the request.
133
* @return constructed RunnableTaskRequest
134
*/
135
public RunnableTaskRequest build();
136
}
137
}
138
```
139
140
**Usage Example:**
141
142
```java
143
import io.cdap.cdap.api.service.worker.RunnableTaskRequest;
144
import io.cdap.cdap.api.artifact.ArtifactId;
145
146
// Create simple task request
147
RunnableTaskRequest simpleTask = RunnableTaskRequest
148
.getBuilder("com.example.DataProcessingTask")
149
.withParam("input-data")
150
.withNamespace("analytics")
151
.build();
152
153
// Create task request with artifact
154
ArtifactId artifact = new ArtifactId("my-plugin", "1.0.0", ArtifactScope.USER);
155
RunnableTaskRequest taskWithArtifact = RunnableTaskRequest
156
.getBuilder("com.example.PluginTask")
157
.withParam("plugin-config")
158
.withArtifact(artifact)
159
.withNamespace("default")
160
.build();
161
162
// Create nested task request
163
RunnableTaskRequest embeddedTask = RunnableTaskRequest
164
.getBuilder("com.example.SubTask")
165
.withParam("sub-param")
166
.build();
167
168
RunnableTaskRequest parentTask = RunnableTaskRequest
169
.getBuilder("com.example.ParentTask")
170
.withEmbeddedTaskRequest(embeddedTask)
171
.build();
172
```
173
174
### RunnableTaskContext
175
176
Context for RunnableTask execution, providing result writing and cleanup capabilities.
177
178
```java { .api }
179
/**
180
* Represents a context for a RunnableTask. This context is used for writing back
181
* the result of RunnableTask execution.
182
*/
183
public class RunnableTaskContext {
184
/**
185
* Constructor with task request.
186
* @param taskRequest the originating task request
187
*/
188
public RunnableTaskContext(RunnableTaskRequest taskRequest);
189
190
/**
191
* Constructor with task request and system app context.
192
* @param taskRequest the originating task request
193
* @param systemAppTaskContext system app task context (nullable)
194
*/
195
public RunnableTaskContext(RunnableTaskRequest taskRequest,
196
@Nullable SystemAppTaskContext systemAppTaskContext);
197
198
/**
199
* Writes result data.
200
* @param data result data as byte array
201
* @throws IOException if writing fails
202
*/
203
public void writeResult(byte[] data) throws IOException;
204
205
/**
206
* Sets cleanup task to run after task completion.
207
* @param cleanupTask cleanup runnable
208
*/
209
public void setCleanupTask(Runnable cleanupTask);
210
211
/**
212
* Executes the cleanup task.
213
*/
214
public void executeCleanupTask();
215
216
/**
217
* Sets whether to terminate the task runner on task completion.
218
* @param terminate true to terminate after completion
219
*/
220
public void setTerminateOnComplete(boolean terminate);
221
222
/**
223
* Returns true if terminate the task runner after the task completed.
224
* @return termination flag
225
*/
226
public boolean isTerminateOnComplete();
227
228
/**
229
* Gets the result as ByteBuffer.
230
* @return result buffer
231
*/
232
public ByteBuffer getResult();
233
234
/**
235
* Returns the class name.
236
* @return task class name
237
*/
238
public String getClassName();
239
240
/**
241
* Returns the parameter.
242
* @return parameter string or null
243
*/
244
@Nullable
245
public String getParam();
246
247
/**
248
* Returns embedded request.
249
* @return embedded task request or null
250
*/
251
@Nullable
252
public RunnableTaskRequest getEmbeddedRequest();
253
254
/**
255
* Returns namespace.
256
* @return namespace string or null
257
*/
258
@Nullable
259
public String getNamespace();
260
261
/**
262
* Returns artifact ID.
263
* @return artifact ID or null
264
*/
265
@Nullable
266
public ArtifactId getArtifactId();
267
268
/**
269
* Returns the system app task context.
270
* @return system app task context or null
271
*/
272
@Nullable
273
public SystemAppTaskContext getRunnableTaskSystemAppContext();
274
}
275
```
276
277
### RunnableTaskParam
278
279
Parameter wrapper for runnable task requests supporting both simple strings and embedded task requests.
280
281
```java { .api }
282
/**
283
* Class for the parameter of RunnableTaskRequest.
284
*/
285
public class RunnableTaskParam {
286
/**
287
* Constructor with simple parameter and embedded task request.
288
* @param simpleParam parameter string (nullable)
289
* @param embeddedTaskRequest embedded task request (nullable)
290
*/
291
public RunnableTaskParam(@Nullable String simpleParam,
292
@Nullable RunnableTaskRequest embeddedTaskRequest);
293
294
/**
295
* Returns embedded task request.
296
* @return embedded task request or null
297
*/
298
@Nullable
299
public RunnableTaskRequest getEmbeddedTaskRequest();
300
301
/**
302
* Returns simple parameter.
303
* @return parameter string or null
304
*/
305
@Nullable
306
public String getSimpleParam();
307
308
/**
309
* String representation.
310
* @return string representation
311
*/
312
public String toString();
313
314
/**
315
* Equals implementation.
316
* @param o object to compare
317
* @return true if equal
318
*/
319
public boolean equals(Object o);
320
321
/**
322
* Hash code implementation.
323
* @return hash code
324
*/
325
public int hashCode();
326
}
327
```
328
329
### SystemAppTaskContext
330
331
System App context for remote tasks with plugin configuration, artifact management, and macro evaluation.
332
333
```java { .api }
334
/**
335
* System App context for a remote task.
336
*/
337
public interface SystemAppTaskContext
338
extends ServiceDiscoverer, SecureStore, AutoCloseable, FeatureFlagsProvider {
339
340
/**
341
* Fetches preferences for the given namespace.
342
* @param namespace the namespace to get preferences for
343
* @param resolved whether to resolve macros in preference values
344
* @return map of preference key-value pairs
345
* @throws Exception if fetching preferences fails
346
*/
347
Map<String, String> getPreferencesForNamespace(String namespace, boolean resolved) throws Exception;
348
349
/**
350
* Creates a PluginConfigurer that can be used to instantiate plugins at runtime.
351
* @param namespace the namespace context for plugin configuration
352
* @return plugin configurer
353
* @throws IOException if plugin configurer creation fails
354
*/
355
PluginConfigurer createPluginConfigurer(String namespace) throws IOException;
356
357
/**
358
* Creates a ServicePluginConfigurer that can be used to instantiate plugins
359
* with macro evaluation.
360
* @param namespace the namespace context for plugin configuration
361
* @return service plugin configurer
362
*/
363
ServicePluginConfigurer createServicePluginConfigurer(String namespace);
364
365
/**
366
* Evaluates macros using provided macro evaluator with the provided parsing options.
367
* @param namespace the namespace context for macro evaluation
368
* @param macros map of properties containing macros to evaluate
369
* @param evaluator the macro evaluator to use
370
* @param options macro parsing options
371
* @return map with evaluated macros
372
* @throws InvalidMacroException if macro evaluation fails
373
*/
374
Map<String, String> evaluateMacros(String namespace,
375
Map<String, String> macros,
376
MacroEvaluator evaluator,
377
MacroParserOptions options)
378
throws InvalidMacroException;
379
380
/**
381
* Returns ArtifactManager for artifact listing and class loading.
382
* @return artifact manager
383
*/
384
ArtifactManager getArtifactManager();
385
386
/**
387
* Returns String service name.
388
* @return service name
389
*/
390
String getServiceName();
391
}
392
```
393
394
### Exception Handling
395
396
Specialized exception classes for handling errors in remote task execution.
397
398
```java { .api }
399
/**
400
* An exception class for wrapping an Exception coming from remote task execution.
401
*/
402
public class RemoteExecutionException extends Exception {
403
/**
404
* Constructor with remote task exception cause.
405
* @param cause the remote task exception
406
*/
407
public RemoteExecutionException(RemoteTaskException cause);
408
409
/**
410
* Returns the remote task exception cause.
411
* @return remote task exception cause
412
*/
413
public RemoteTaskException getCause();
414
415
/**
416
* Converts a BasicThrowable to a RemoteExecutionException.
417
* @param basicThrowable the basic throwable to convert
418
* @return converted remote execution exception
419
*/
420
public static RemoteExecutionException fromBasicThrowable(BasicThrowable basicThrowable);
421
}
422
423
/**
424
* Captures the stacktrace of exceptions from remote task.
425
*/
426
public class RemoteTaskException extends Exception {
427
/**
428
* Constructor with remote exception class name, message and cause.
429
* @param remoteExceptionClassName the remote exception class name
430
* @param message the exception message
431
* @param cause the underlying cause (nullable)
432
*/
433
public RemoteTaskException(String remoteExceptionClassName, String message, @Nullable Throwable cause);
434
435
/**
436
* Returns the remote exception class name.
437
* @return remote exception class name
438
*/
439
public String getRemoteExceptionClassName();
440
441
/**
442
* String representation.
443
* @return string representation
444
*/
445
public String toString();
446
}
447
```
448
449
## Usage Patterns
450
451
### Basic Task Execution
452
453
```java
454
// In a system HTTP service handler
455
@POST
456
@Path("/process")
457
public void processData(HttpServiceRequest request, HttpServiceResponder responder) {
458
try {
459
// Create task request
460
RunnableTaskRequest taskRequest = RunnableTaskRequest
461
.getBuilder("com.example.DataProcessingTask")
462
.withParam("input-data")
463
.withNamespace("analytics")
464
.build();
465
466
// Execute task
467
byte[] result = getContext().runTask(taskRequest);
468
responder.sendBytes(result, "application/json");
469
} catch (Exception e) {
470
responder.sendError(500, "Task execution failed: " + e.getMessage());
471
}
472
}
473
```
474
475
### Task with Cleanup
476
477
```java
478
public class ResourceIntensiveTask implements RunnableTask {
479
@Override
480
public void run(RunnableTaskContext context) throws Exception {
481
// Allocate resources
482
ExternalResource resource = allocateResource();
483
484
// Set cleanup task
485
context.setCleanupTask(() -> {
486
resource.close();
487
LOG.info("Resource cleaned up");
488
});
489
490
try {
491
// Perform work with resource
492
String result = resource.process(context.getParam());
493
context.writeResult(result.getBytes());
494
} catch (Exception e) {
495
// Cleanup will still be called
496
throw e;
497
}
498
}
499
}
500
```
501
502
## Important Notes
503
504
- Tasks are executed on remote worker nodes, not in the originating service
505
- Task classes must be available on the worker node classpath or specified via artifact
506
- All task parameters must be serializable
507
- Cleanup tasks are always executed, even if the main task fails
508
- Use `SystemAppTaskContext` for advanced plugin configuration and macro evaluation
509
- Remote task execution must be enabled in CDAP configuration
510
- Task results are limited by available memory and should be kept reasonably small