CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-co-cask-cdap--cdap-api

Core application programming interface for the Cask Data Application Platform enabling development of scalable data processing applications on Hadoop ecosystems.

Pending
Overview
Eval results
Files

worker-programs.mddocs/

Worker Programs

Worker programs in CDAP provide long-running background processing capabilities that run as separate threads with explicit transaction control and lifecycle management.

Core Worker Interfaces

Worker

public interface Worker extends Runnable, ProgramLifecycle<WorkerContext> {
    void configure(WorkerConfigurer configurer);
    
    @TransactionPolicy(TransactionControl.EXPLICIT)
    void initialize(WorkerContext context) throws Exception;
    
    @TransactionPolicy(TransactionControl.EXPLICIT)
    void destroy();
    
    void stop();
}

Base interface for worker programs. Unlike other program types, workers have explicit transaction control and implement Runnable for background execution.

AbstractWorker

public abstract class AbstractWorker implements Worker {
    public abstract void configure(WorkerConfigurer configurer);
    
    @Override
    public void initialize(WorkerContext context) throws Exception {
        // Optional initialization logic
    }
    
    @Override
    public void destroy() {
        // Optional cleanup logic
    }
    
    @Override
    public void stop() {
        // Default stop implementation - should be overridden for graceful shutdown
    }
}

Base implementation class for worker programs providing default lifecycle behavior.

Worker Configuration

WorkerConfigurer

public interface WorkerConfigurer extends ProgramConfigurer, DatasetConfigurer, PluginConfigurer {
    void setInstances(int instances);
    void setResources(Resources resources);
}

Interface for configuring worker programs including resource allocation and instance count.

WorkerSpecification

public class WorkerSpecification implements ProgramSpecification {
    public String getName();
    public String getDescription();
    public String getClassName();
    public Map<String, String> getProperties();
    public Resources getResources();
    public int getInstances();
    public Set<String> getDatasets();
}

Complete specification of a worker program.

Worker Context

WorkerContext

public interface WorkerContext extends RuntimeContext, DatasetContext, Transactional {
    int getInstanceCount();
    int getInstanceId();
    
    PluginContext getPluginContext();
    ServiceDiscoverer getServiceDiscoverer();
    Metrics getMetrics();
    Admin getAdmin();
    
    void execute(TxRunnable runnable) throws TransactionFailureException;
    <T> T execute(Callable<T> callable) throws TransactionFailureException;
}

Runtime context for worker programs providing access to datasets, transactions, and CDAP services with explicit transaction management.

Usage Examples

Basic Worker

public class DataProcessingWorker extends AbstractWorker {
    
    @Override
    public void configure(WorkerConfigurer configurer) {
        configurer.setName("DataProcessor");
        configurer.setDescription("Processes incoming data continuously");
        configurer.setInstances(2);
        configurer.setResources(new Resources(1024)); // 1GB memory
        configurer.useDataset("inputQueue");
        configurer.useDataset("processedData");
    }
    
    @Override
    public void run() {
        WorkerContext context = getContext();
        
        while (!Thread.currentThread().isInterrupted()) {
            try {
                context.execute(new TxRunnable() {
                    @Override
                    public void run(DatasetContext context) throws Exception {
                        ObjectStore<DataRecord> inputQueue = context.getDataset("inputQueue");
                        ObjectStore<DataRecord> processedData = context.getDataset("processedData");
                        
                        // Process data records
                        DataRecord record = inputQueue.read("next");
                        if (record != null) {
                            DataRecord processed = processRecord(record);
                            processedData.write(processed.getId(), processed);
                            inputQueue.delete("next");
                            
                            context.getMetrics().count("records.processed", 1);
                        }
                    }
                });
                
                // Wait before next processing cycle
                Thread.sleep(1000);
                
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            } catch (Exception e) {
                // Log error and continue
                getContext().getMetrics().count("processing.errors", 1);
            }
        }
    }
    
    private DataRecord processRecord(DataRecord record) {
        // Implement processing logic
        return new DataRecord(record.getId(), "processed_" + record.getData());
    }
}

Worker with Plugin Integration

public class PluginWorker extends AbstractWorker {
    
    @Override
    public void configure(WorkerConfigurer configurer) {
        configurer.setName("PluginProcessor");
        configurer.useDataset("workQueue");
        
        configurer.usePlugin("processor", "dataProcessor", "processor1",
                            PluginProperties.builder()
                                .add("batchSize", "100")
                                .add("timeout", "30")
                                .build());
    }
    
    @Override
    public void run() {
        WorkerContext context = getContext();
        DataProcessor processor = context.getPluginContext().newPluginInstance("processor1");
        
        while (!Thread.currentThread().isInterrupted()) {
            try {
                context.execute(new TxRunnable() {
                    @Override
                    public void run(DatasetContext datasetContext) throws Exception {
                        ObjectStore<WorkItem> workQueue = datasetContext.getDataset("workQueue");
                        
                        List<WorkItem> batch = new ArrayList<>();
                        for (int i = 0; i < 100; i++) {
                            WorkItem item = workQueue.read("item_" + i);
                            if (item != null) {
                                batch.add(item);
                                workQueue.delete("item_" + i);
                            }
                        }
                        
                        if (!batch.isEmpty()) {
                            List<WorkItem> processed = processor.processBatch(batch);
                            for (WorkItem item : processed) {
                                workQueue.write("processed_" + item.getId(), item);
                            }
                            
                            context.getMetrics().count("batch.processed", batch.size());
                        }
                    }
                });
                
                Thread.sleep(5000); // Wait 5 seconds between batches
                
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            } catch (Exception e) {
                context.getMetrics().count("processing.errors", 1);
            }
        }
    }
}

Worker with Service Communication

public class ServiceIntegrationWorker extends AbstractWorker {
    
    @Override
    public void configure(WorkerConfigurer configurer) {
        configurer.setName("ServiceWorker");
        configurer.useDataset("tasks");
        configurer.useDataset("results");
    }
    
    @Override
    public void run() {
        WorkerContext context = getContext();
        ServiceDiscoverer serviceDiscoverer = context.getServiceDiscoverer();
        
        while (!Thread.currentThread().isInterrupted()) {
            try {
                context.execute(new TxRunnable() {
                    @Override
                    public void run(DatasetContext datasetContext) throws Exception {
                        ObjectStore<Task> tasks = datasetContext.getDataset("tasks");
                        ObjectStore<TaskResult> results = datasetContext.getDataset("results");
                        
                        // Get next task
                        Task task = tasks.read("nextTask");
                        if (task != null) {
                            // Call external service for processing
                            Discoverable serviceEndpoint = serviceDiscoverer.discover("processingService");
                            if (serviceEndpoint != null) {
                                TaskResult result = callProcessingService(serviceEndpoint, task);
                                results.write(task.getId(), result);
                                tasks.delete("nextTask");
                                
                                context.getMetrics().count("tasks.completed", 1);
                            }
                        }
                    }
                });
                
                Thread.sleep(2000);
                
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            } catch (Exception e) {
                context.getMetrics().count("service.errors", 1);
            }
        }
    }
    
    private TaskResult callProcessingService(Discoverable endpoint, Task task) {
        // Implement service call logic
        return new TaskResult(task.getId(), "processed");
    }
}

Graceful Shutdown Worker

public class GracefulShutdownWorker extends AbstractWorker {
    private volatile boolean stopped = false;
    
    @Override
    public void configure(WorkerConfigurer configurer) {
        configurer.setName("GracefulWorker");
        configurer.useDataset("workItems");
    }
    
    @Override
    public void run() {
        WorkerContext context = getContext();
        
        while (!stopped && !Thread.currentThread().isInterrupted()) {
            try {
                boolean hasWork = context.execute(new Callable<Boolean>() {
                    @Override
                    public Boolean call(DatasetContext datasetContext) throws Exception {
                        ObjectStore<WorkItem> workItems = datasetContext.getDataset("workItems");
                        
                        WorkItem item = workItems.read("currentWork");
                        if (item != null) {
                            // Process the work item
                            processWorkItem(item);
                            workItems.delete("currentWork");
                            context.getMetrics().count("work.completed", 1);
                            return true;
                        }
                        return false;
                    }
                });
                
                if (!hasWork) {
                    Thread.sleep(1000); // Wait when no work available
                }
                
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            } catch (Exception e) {
                context.getMetrics().count("work.errors", 1);
            }
        }
    }
    
    @Override
    public void stop() {
        stopped = true;
    }
    
    private void processWorkItem(WorkItem item) {
        // Implement work processing logic
    }
}

Multi-Instance Coordination Worker

public class CoordinatedWorker extends AbstractWorker {
    
    @Override
    public void configure(WorkerConfigurer configurer) {
        configurer.setName("CoordinatedWorker");
        configurer.setInstances(3); // Run 3 instances
        configurer.useDataset("sharedQueue");
        configurer.useDataset("instanceStatus");
    }
    
    @Override
    public void run() {
        WorkerContext context = getContext();
        int instanceId = context.getInstanceId();
        int totalInstances = context.getInstanceCount();
        
        while (!Thread.currentThread().isInterrupted()) {
            try {
                context.execute(new TxRunnable() {
                    @Override
                    public void run(DatasetContext datasetContext) throws Exception {
                        KeyValueTable instanceStatus = datasetContext.getDataset("instanceStatus");
                        ObjectStore<WorkItem> sharedQueue = datasetContext.getDataset("sharedQueue");
                        
                        // Update instance heartbeat
                        instanceStatus.write("instance_" + instanceId, 
                                           String.valueOf(System.currentTimeMillis()));
                        
                        // Process work assigned to this instance
                        String workKey = "work_" + (instanceId % totalInstances);
                        WorkItem work = sharedQueue.read(workKey);
                        
                        if (work != null) {
                            processWork(work);
                            sharedQueue.delete(workKey);
                            context.getMetrics().count("instance_" + instanceId + ".work", 1);
                        }
                    }
                });
                
                Thread.sleep(5000);
                
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            } catch (Exception e) {
                context.getMetrics().count("instance_" + instanceId + ".errors", 1);
            }
        }
    }
    
    private void processWork(WorkItem work) {
        // Instance-specific work processing
    }
}

Worker programs are ideal for continuous background processing, data pipeline coordination, periodic cleanup tasks, and real-time data monitoring within the CDAP platform.

Install with Tessl CLI

npx tessl i tessl/maven-co-cask-cdap--cdap-api

docs

annotations.md

application-framework.md

dataset-management.md

index.md

mapreduce-programs.md

plugin-framework.md

scheduling.md

service-programs.md

spark-programs.md

system-services.md

transactions.md

worker-programs.md

workflow-programs.md

tile.json