0
# Worker Programs
1
2
Worker programs in CDAP provide long-running background processing capabilities that run as separate threads with explicit transaction control and lifecycle management.
3
4
## Core Worker Interfaces
5
6
### Worker
7
8
```java { .api }
9
public interface Worker extends Runnable, ProgramLifecycle<WorkerContext> {
10
void configure(WorkerConfigurer configurer);
11
12
@TransactionPolicy(TransactionControl.EXPLICIT)
13
void initialize(WorkerContext context) throws Exception;
14
15
@TransactionPolicy(TransactionControl.EXPLICIT)
16
void destroy();
17
18
void stop();
19
}
20
```
21
22
Base interface for worker programs. Unlike other program types, workers have explicit transaction control and implement Runnable for background execution.
23
24
### AbstractWorker
25
26
```java { .api }
27
public abstract class AbstractWorker implements Worker {
28
public abstract void configure(WorkerConfigurer configurer);
29
30
@Override
31
public void initialize(WorkerContext context) throws Exception {
32
// Optional initialization logic
33
}
34
35
@Override
36
public void destroy() {
37
// Optional cleanup logic
38
}
39
40
@Override
41
public void stop() {
42
// Default stop implementation - should be overridden for graceful shutdown
43
}
44
}
45
```
46
47
Base implementation class for worker programs providing default lifecycle behavior.
48
49
## Worker Configuration
50
51
### WorkerConfigurer
52
53
```java { .api }
54
public interface WorkerConfigurer extends ProgramConfigurer, DatasetConfigurer, PluginConfigurer {
55
void setInstances(int instances);
56
void setResources(Resources resources);
57
}
58
```
59
60
Interface for configuring worker programs including resource allocation and instance count.
61
62
### WorkerSpecification
63
64
```java { .api }
65
public class WorkerSpecification implements ProgramSpecification {
66
public String getName();
67
public String getDescription();
68
public String getClassName();
69
public Map<String, String> getProperties();
70
public Resources getResources();
71
public int getInstances();
72
public Set<String> getDatasets();
73
}
74
```
75
76
Complete specification of a worker program.
77
78
## Worker Context
79
80
### WorkerContext
81
82
```java { .api }
83
public interface WorkerContext extends RuntimeContext, DatasetContext, Transactional {
84
int getInstanceCount();
85
int getInstanceId();
86
87
PluginContext getPluginContext();
88
ServiceDiscoverer getServiceDiscoverer();
89
Metrics getMetrics();
90
Admin getAdmin();
91
92
void execute(TxRunnable runnable) throws TransactionFailureException;
93
<T> T execute(Callable<T> callable) throws TransactionFailureException;
94
}
95
```
96
97
Runtime context for worker programs providing access to datasets, transactions, and CDAP services with explicit transaction management.
98
99
## Usage Examples
100
101
### Basic Worker
102
103
```java
104
public class DataProcessingWorker extends AbstractWorker {
105
106
@Override
107
public void configure(WorkerConfigurer configurer) {
108
configurer.setName("DataProcessor");
109
configurer.setDescription("Processes incoming data continuously");
110
configurer.setInstances(2);
111
configurer.setResources(new Resources(1024)); // 1GB memory
112
configurer.useDataset("inputQueue");
113
configurer.useDataset("processedData");
114
}
115
116
@Override
117
public void run() {
118
WorkerContext context = getContext();
119
120
while (!Thread.currentThread().isInterrupted()) {
121
try {
122
context.execute(new TxRunnable() {
123
@Override
124
public void run(DatasetContext context) throws Exception {
125
ObjectStore<DataRecord> inputQueue = context.getDataset("inputQueue");
126
ObjectStore<DataRecord> processedData = context.getDataset("processedData");
127
128
// Process data records
129
DataRecord record = inputQueue.read("next");
130
if (record != null) {
131
DataRecord processed = processRecord(record);
132
processedData.write(processed.getId(), processed);
133
inputQueue.delete("next");
134
135
context.getMetrics().count("records.processed", 1);
136
}
137
}
138
});
139
140
// Wait before next processing cycle
141
Thread.sleep(1000);
142
143
} catch (InterruptedException e) {
144
Thread.currentThread().interrupt();
145
break;
146
} catch (Exception e) {
147
// Log error and continue
148
getContext().getMetrics().count("processing.errors", 1);
149
}
150
}
151
}
152
153
private DataRecord processRecord(DataRecord record) {
154
// Implement processing logic
155
return new DataRecord(record.getId(), "processed_" + record.getData());
156
}
157
}
158
```
159
160
### Worker with Plugin Integration
161
162
```java
163
public class PluginWorker extends AbstractWorker {
164
165
@Override
166
public void configure(WorkerConfigurer configurer) {
167
configurer.setName("PluginProcessor");
168
configurer.useDataset("workQueue");
169
170
configurer.usePlugin("processor", "dataProcessor", "processor1",
171
PluginProperties.builder()
172
.add("batchSize", "100")
173
.add("timeout", "30")
174
.build());
175
}
176
177
@Override
178
public void run() {
179
WorkerContext context = getContext();
180
DataProcessor processor = context.getPluginContext().newPluginInstance("processor1");
181
182
while (!Thread.currentThread().isInterrupted()) {
183
try {
184
context.execute(new TxRunnable() {
185
@Override
186
public void run(DatasetContext datasetContext) throws Exception {
187
ObjectStore<WorkItem> workQueue = datasetContext.getDataset("workQueue");
188
189
List<WorkItem> batch = new ArrayList<>();
190
for (int i = 0; i < 100; i++) {
191
WorkItem item = workQueue.read("item_" + i);
192
if (item != null) {
193
batch.add(item);
194
workQueue.delete("item_" + i);
195
}
196
}
197
198
if (!batch.isEmpty()) {
199
List<WorkItem> processed = processor.processBatch(batch);
200
for (WorkItem item : processed) {
201
workQueue.write("processed_" + item.getId(), item);
202
}
203
204
context.getMetrics().count("batch.processed", batch.size());
205
}
206
}
207
});
208
209
Thread.sleep(5000); // Wait 5 seconds between batches
210
211
} catch (InterruptedException e) {
212
Thread.currentThread().interrupt();
213
break;
214
} catch (Exception e) {
215
context.getMetrics().count("processing.errors", 1);
216
}
217
}
218
}
219
}
220
```
221
222
### Worker with Service Communication
223
224
```java
225
public class ServiceIntegrationWorker extends AbstractWorker {
226
227
@Override
228
public void configure(WorkerConfigurer configurer) {
229
configurer.setName("ServiceWorker");
230
configurer.useDataset("tasks");
231
configurer.useDataset("results");
232
}
233
234
@Override
235
public void run() {
236
WorkerContext context = getContext();
237
ServiceDiscoverer serviceDiscoverer = context.getServiceDiscoverer();
238
239
while (!Thread.currentThread().isInterrupted()) {
240
try {
241
context.execute(new TxRunnable() {
242
@Override
243
public void run(DatasetContext datasetContext) throws Exception {
244
ObjectStore<Task> tasks = datasetContext.getDataset("tasks");
245
ObjectStore<TaskResult> results = datasetContext.getDataset("results");
246
247
// Get next task
248
Task task = tasks.read("nextTask");
249
if (task != null) {
250
// Call external service for processing
251
Discoverable serviceEndpoint = serviceDiscoverer.discover("processingService");
252
if (serviceEndpoint != null) {
253
TaskResult result = callProcessingService(serviceEndpoint, task);
254
results.write(task.getId(), result);
255
tasks.delete("nextTask");
256
257
context.getMetrics().count("tasks.completed", 1);
258
}
259
}
260
}
261
});
262
263
Thread.sleep(2000);
264
265
} catch (InterruptedException e) {
266
Thread.currentThread().interrupt();
267
break;
268
} catch (Exception e) {
269
context.getMetrics().count("service.errors", 1);
270
}
271
}
272
}
273
274
private TaskResult callProcessingService(Discoverable endpoint, Task task) {
275
// Implement service call logic
276
return new TaskResult(task.getId(), "processed");
277
}
278
}
279
```
280
281
### Graceful Shutdown Worker
282
283
```java
284
public class GracefulShutdownWorker extends AbstractWorker {
285
private volatile boolean stopped = false;
286
287
@Override
288
public void configure(WorkerConfigurer configurer) {
289
configurer.setName("GracefulWorker");
290
configurer.useDataset("workItems");
291
}
292
293
@Override
294
public void run() {
295
WorkerContext context = getContext();
296
297
while (!stopped && !Thread.currentThread().isInterrupted()) {
298
try {
299
boolean hasWork = context.execute(new Callable<Boolean>() {
300
@Override
301
public Boolean call(DatasetContext datasetContext) throws Exception {
302
ObjectStore<WorkItem> workItems = datasetContext.getDataset("workItems");
303
304
WorkItem item = workItems.read("currentWork");
305
if (item != null) {
306
// Process the work item
307
processWorkItem(item);
308
workItems.delete("currentWork");
309
context.getMetrics().count("work.completed", 1);
310
return true;
311
}
312
return false;
313
}
314
});
315
316
if (!hasWork) {
317
Thread.sleep(1000); // Wait when no work available
318
}
319
320
} catch (InterruptedException e) {
321
Thread.currentThread().interrupt();
322
break;
323
} catch (Exception e) {
324
context.getMetrics().count("work.errors", 1);
325
}
326
}
327
}
328
329
@Override
330
public void stop() {
331
stopped = true;
332
}
333
334
private void processWorkItem(WorkItem item) {
335
// Implement work processing logic
336
}
337
}
338
```
339
340
### Multi-Instance Coordination Worker
341
342
```java
343
public class CoordinatedWorker extends AbstractWorker {
344
345
@Override
346
public void configure(WorkerConfigurer configurer) {
347
configurer.setName("CoordinatedWorker");
348
configurer.setInstances(3); // Run 3 instances
349
configurer.useDataset("sharedQueue");
350
configurer.useDataset("instanceStatus");
351
}
352
353
@Override
354
public void run() {
355
WorkerContext context = getContext();
356
int instanceId = context.getInstanceId();
357
int totalInstances = context.getInstanceCount();
358
359
while (!Thread.currentThread().isInterrupted()) {
360
try {
361
context.execute(new TxRunnable() {
362
@Override
363
public void run(DatasetContext datasetContext) throws Exception {
364
KeyValueTable instanceStatus = datasetContext.getDataset("instanceStatus");
365
ObjectStore<WorkItem> sharedQueue = datasetContext.getDataset("sharedQueue");
366
367
// Update instance heartbeat
368
instanceStatus.write("instance_" + instanceId,
369
String.valueOf(System.currentTimeMillis()));
370
371
// Process work assigned to this instance
372
String workKey = "work_" + (instanceId % totalInstances);
373
WorkItem work = sharedQueue.read(workKey);
374
375
if (work != null) {
376
processWork(work);
377
sharedQueue.delete(workKey);
378
context.getMetrics().count("instance_" + instanceId + ".work", 1);
379
}
380
}
381
});
382
383
Thread.sleep(5000);
384
385
} catch (InterruptedException e) {
386
Thread.currentThread().interrupt();
387
break;
388
} catch (Exception e) {
389
context.getMetrics().count("instance_" + instanceId + ".errors", 1);
390
}
391
}
392
}
393
394
private void processWork(WorkItem work) {
395
// Instance-specific work processing
396
}
397
}
398
```
399
400
Worker programs are ideal for continuous background processing, data pipeline coordination, periodic cleanup tasks, and real-time data monitoring within the CDAP platform.