Core application programming interface for the Cask Data Application Platform enabling development of scalable data processing applications on Hadoop ecosystems.
—
Worker programs in CDAP provide long-running background processing capabilities that run as separate threads with explicit transaction control and lifecycle management.
public interface Worker extends Runnable, ProgramLifecycle<WorkerContext> {
void configure(WorkerConfigurer configurer);
@TransactionPolicy(TransactionControl.EXPLICIT)
void initialize(WorkerContext context) throws Exception;
@TransactionPolicy(TransactionControl.EXPLICIT)
void destroy();
void stop();
}Base interface for worker programs. Unlike other program types, workers have explicit transaction control and implement Runnable for background execution.
public abstract class AbstractWorker implements Worker {
public abstract void configure(WorkerConfigurer configurer);
@Override
public void initialize(WorkerContext context) throws Exception {
// Optional initialization logic
}
@Override
public void destroy() {
// Optional cleanup logic
}
@Override
public void stop() {
// Default stop implementation - should be overridden for graceful shutdown
}
}Base implementation class for worker programs providing default lifecycle behavior.
public interface WorkerConfigurer extends ProgramConfigurer, DatasetConfigurer, PluginConfigurer {
void setInstances(int instances);
void setResources(Resources resources);
}Interface for configuring worker programs including resource allocation and instance count.
public class WorkerSpecification implements ProgramSpecification {
public String getName();
public String getDescription();
public String getClassName();
public Map<String, String> getProperties();
public Resources getResources();
public int getInstances();
public Set<String> getDatasets();
}Complete specification of a worker program.
public interface WorkerContext extends RuntimeContext, DatasetContext, Transactional {
int getInstanceCount();
int getInstanceId();
PluginContext getPluginContext();
ServiceDiscoverer getServiceDiscoverer();
Metrics getMetrics();
Admin getAdmin();
void execute(TxRunnable runnable) throws TransactionFailureException;
<T> T execute(Callable<T> callable) throws TransactionFailureException;
}Runtime context for worker programs providing access to datasets, transactions, and CDAP services with explicit transaction management.
public class DataProcessingWorker extends AbstractWorker {
@Override
public void configure(WorkerConfigurer configurer) {
configurer.setName("DataProcessor");
configurer.setDescription("Processes incoming data continuously");
configurer.setInstances(2);
configurer.setResources(new Resources(1024)); // 1GB memory
configurer.useDataset("inputQueue");
configurer.useDataset("processedData");
}
@Override
public void run() {
WorkerContext context = getContext();
while (!Thread.currentThread().isInterrupted()) {
try {
context.execute(new TxRunnable() {
@Override
public void run(DatasetContext context) throws Exception {
ObjectStore<DataRecord> inputQueue = context.getDataset("inputQueue");
ObjectStore<DataRecord> processedData = context.getDataset("processedData");
// Process data records
DataRecord record = inputQueue.read("next");
if (record != null) {
DataRecord processed = processRecord(record);
processedData.write(processed.getId(), processed);
inputQueue.delete("next");
context.getMetrics().count("records.processed", 1);
}
}
});
// Wait before next processing cycle
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
// Log error and continue
getContext().getMetrics().count("processing.errors", 1);
}
}
}
private DataRecord processRecord(DataRecord record) {
// Implement processing logic
return new DataRecord(record.getId(), "processed_" + record.getData());
}
}public class PluginWorker extends AbstractWorker {
@Override
public void configure(WorkerConfigurer configurer) {
configurer.setName("PluginProcessor");
configurer.useDataset("workQueue");
configurer.usePlugin("processor", "dataProcessor", "processor1",
PluginProperties.builder()
.add("batchSize", "100")
.add("timeout", "30")
.build());
}
@Override
public void run() {
WorkerContext context = getContext();
DataProcessor processor = context.getPluginContext().newPluginInstance("processor1");
while (!Thread.currentThread().isInterrupted()) {
try {
context.execute(new TxRunnable() {
@Override
public void run(DatasetContext datasetContext) throws Exception {
ObjectStore<WorkItem> workQueue = datasetContext.getDataset("workQueue");
List<WorkItem> batch = new ArrayList<>();
for (int i = 0; i < 100; i++) {
WorkItem item = workQueue.read("item_" + i);
if (item != null) {
batch.add(item);
workQueue.delete("item_" + i);
}
}
if (!batch.isEmpty()) {
List<WorkItem> processed = processor.processBatch(batch);
for (WorkItem item : processed) {
workQueue.write("processed_" + item.getId(), item);
}
context.getMetrics().count("batch.processed", batch.size());
}
}
});
Thread.sleep(5000); // Wait 5 seconds between batches
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
context.getMetrics().count("processing.errors", 1);
}
}
}
}public class ServiceIntegrationWorker extends AbstractWorker {
@Override
public void configure(WorkerConfigurer configurer) {
configurer.setName("ServiceWorker");
configurer.useDataset("tasks");
configurer.useDataset("results");
}
@Override
public void run() {
WorkerContext context = getContext();
ServiceDiscoverer serviceDiscoverer = context.getServiceDiscoverer();
while (!Thread.currentThread().isInterrupted()) {
try {
context.execute(new TxRunnable() {
@Override
public void run(DatasetContext datasetContext) throws Exception {
ObjectStore<Task> tasks = datasetContext.getDataset("tasks");
ObjectStore<TaskResult> results = datasetContext.getDataset("results");
// Get next task
Task task = tasks.read("nextTask");
if (task != null) {
// Call external service for processing
Discoverable serviceEndpoint = serviceDiscoverer.discover("processingService");
if (serviceEndpoint != null) {
TaskResult result = callProcessingService(serviceEndpoint, task);
results.write(task.getId(), result);
tasks.delete("nextTask");
context.getMetrics().count("tasks.completed", 1);
}
}
}
});
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
context.getMetrics().count("service.errors", 1);
}
}
}
private TaskResult callProcessingService(Discoverable endpoint, Task task) {
// Implement service call logic
return new TaskResult(task.getId(), "processed");
}
}public class GracefulShutdownWorker extends AbstractWorker {
private volatile boolean stopped = false;
@Override
public void configure(WorkerConfigurer configurer) {
configurer.setName("GracefulWorker");
configurer.useDataset("workItems");
}
@Override
public void run() {
WorkerContext context = getContext();
while (!stopped && !Thread.currentThread().isInterrupted()) {
try {
boolean hasWork = context.execute(new Callable<Boolean>() {
@Override
public Boolean call(DatasetContext datasetContext) throws Exception {
ObjectStore<WorkItem> workItems = datasetContext.getDataset("workItems");
WorkItem item = workItems.read("currentWork");
if (item != null) {
// Process the work item
processWorkItem(item);
workItems.delete("currentWork");
context.getMetrics().count("work.completed", 1);
return true;
}
return false;
}
});
if (!hasWork) {
Thread.sleep(1000); // Wait when no work available
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
context.getMetrics().count("work.errors", 1);
}
}
}
@Override
public void stop() {
stopped = true;
}
private void processWorkItem(WorkItem item) {
// Implement work processing logic
}
}public class CoordinatedWorker extends AbstractWorker {
@Override
public void configure(WorkerConfigurer configurer) {
configurer.setName("CoordinatedWorker");
configurer.setInstances(3); // Run 3 instances
configurer.useDataset("sharedQueue");
configurer.useDataset("instanceStatus");
}
@Override
public void run() {
WorkerContext context = getContext();
int instanceId = context.getInstanceId();
int totalInstances = context.getInstanceCount();
while (!Thread.currentThread().isInterrupted()) {
try {
context.execute(new TxRunnable() {
@Override
public void run(DatasetContext datasetContext) throws Exception {
KeyValueTable instanceStatus = datasetContext.getDataset("instanceStatus");
ObjectStore<WorkItem> sharedQueue = datasetContext.getDataset("sharedQueue");
// Update instance heartbeat
instanceStatus.write("instance_" + instanceId,
String.valueOf(System.currentTimeMillis()));
// Process work assigned to this instance
String workKey = "work_" + (instanceId % totalInstances);
WorkItem work = sharedQueue.read(workKey);
if (work != null) {
processWork(work);
sharedQueue.delete(workKey);
context.getMetrics().count("instance_" + instanceId + ".work", 1);
}
}
});
Thread.sleep(5000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
context.getMetrics().count("instance_" + instanceId + ".errors", 1);
}
}
}
private void processWork(WorkItem work) {
// Instance-specific work processing
}
}Worker programs are ideal for continuous background processing, data pipeline coordination, periodic cleanup tasks, and real-time data monitoring within the CDAP platform.
Install with Tessl CLI
npx tessl i tessl/maven-co-cask-cdap--cdap-api