or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

annotations.mdapplication-framework.mddataset-management.mdindex.mdmapreduce-programs.mdplugin-framework.mdscheduling.mdservice-programs.mdspark-programs.mdsystem-services.mdtransactions.mdworker-programs.mdworkflow-programs.md

worker-programs.mddocs/

0

# Worker Programs

1

2

Worker programs in CDAP provide long-running background processing capabilities that run as separate threads with explicit transaction control and lifecycle management.

3

4

## Core Worker Interfaces

5

6

### Worker

7

8

```java { .api }

9

public interface Worker extends Runnable, ProgramLifecycle<WorkerContext> {

10

void configure(WorkerConfigurer configurer);

11

12

@TransactionPolicy(TransactionControl.EXPLICIT)

13

void initialize(WorkerContext context) throws Exception;

14

15

@TransactionPolicy(TransactionControl.EXPLICIT)

16

void destroy();

17

18

void stop();

19

}

20

```

21

22

Base interface for worker programs. Unlike other program types, workers have explicit transaction control and implement Runnable for background execution.

23

24

### AbstractWorker

25

26

```java { .api }

27

public abstract class AbstractWorker implements Worker {

28

public abstract void configure(WorkerConfigurer configurer);

29

30

@Override

31

public void initialize(WorkerContext context) throws Exception {

32

// Optional initialization logic

33

}

34

35

@Override

36

public void destroy() {

37

// Optional cleanup logic

38

}

39

40

@Override

41

public void stop() {

42

// Default stop implementation - should be overridden for graceful shutdown

43

}

44

}

45

```

46

47

Base implementation class for worker programs providing default lifecycle behavior.

48

49

## Worker Configuration

50

51

### WorkerConfigurer

52

53

```java { .api }

54

public interface WorkerConfigurer extends ProgramConfigurer, DatasetConfigurer, PluginConfigurer {

55

void setInstances(int instances);

56

void setResources(Resources resources);

57

}

58

```

59

60

Interface for configuring worker programs including resource allocation and instance count.

61

62

### WorkerSpecification

63

64

```java { .api }

65

public class WorkerSpecification implements ProgramSpecification {

66

public String getName();

67

public String getDescription();

68

public String getClassName();

69

public Map<String, String> getProperties();

70

public Resources getResources();

71

public int getInstances();

72

public Set<String> getDatasets();

73

}

74

```

75

76

Complete specification of a worker program.

77

78

## Worker Context

79

80

### WorkerContext

81

82

```java { .api }

83

public interface WorkerContext extends RuntimeContext, DatasetContext, Transactional {

84

int getInstanceCount();

85

int getInstanceId();

86

87

PluginContext getPluginContext();

88

ServiceDiscoverer getServiceDiscoverer();

89

Metrics getMetrics();

90

Admin getAdmin();

91

92

void execute(TxRunnable runnable) throws TransactionFailureException;

93

<T> T execute(Callable<T> callable) throws TransactionFailureException;

94

}

95

```

96

97

Runtime context for worker programs providing access to datasets, transactions, and CDAP services with explicit transaction management.

98

99

## Usage Examples

100

101

### Basic Worker

102

103

```java

104

public class DataProcessingWorker extends AbstractWorker {

105

106

@Override

107

public void configure(WorkerConfigurer configurer) {

108

configurer.setName("DataProcessor");

109

configurer.setDescription("Processes incoming data continuously");

110

configurer.setInstances(2);

111

configurer.setResources(new Resources(1024)); // 1GB memory

112

configurer.useDataset("inputQueue");

113

configurer.useDataset("processedData");

114

}

115

116

@Override

117

public void run() {

118

WorkerContext context = getContext();

119

120

while (!Thread.currentThread().isInterrupted()) {

121

try {

122

context.execute(new TxRunnable() {

123

@Override

124

public void run(DatasetContext context) throws Exception {

125

ObjectStore<DataRecord> inputQueue = context.getDataset("inputQueue");

126

ObjectStore<DataRecord> processedData = context.getDataset("processedData");

127

128

// Process data records

129

DataRecord record = inputQueue.read("next");

130

if (record != null) {

131

DataRecord processed = processRecord(record);

132

processedData.write(processed.getId(), processed);

133

inputQueue.delete("next");

134

135

context.getMetrics().count("records.processed", 1);

136

}

137

}

138

});

139

140

// Wait before next processing cycle

141

Thread.sleep(1000);

142

143

} catch (InterruptedException e) {

144

Thread.currentThread().interrupt();

145

break;

146

} catch (Exception e) {

147

// Log error and continue

148

getContext().getMetrics().count("processing.errors", 1);

149

}

150

}

151

}

152

153

private DataRecord processRecord(DataRecord record) {

154

// Implement processing logic

155

return new DataRecord(record.getId(), "processed_" + record.getData());

156

}

157

}

158

```

159

160

### Worker with Plugin Integration

161

162

```java

163

public class PluginWorker extends AbstractWorker {

164

165

@Override

166

public void configure(WorkerConfigurer configurer) {

167

configurer.setName("PluginProcessor");

168

configurer.useDataset("workQueue");

169

170

configurer.usePlugin("processor", "dataProcessor", "processor1",

171

PluginProperties.builder()

172

.add("batchSize", "100")

173

.add("timeout", "30")

174

.build());

175

}

176

177

@Override

178

public void run() {

179

WorkerContext context = getContext();

180

DataProcessor processor = context.getPluginContext().newPluginInstance("processor1");

181

182

while (!Thread.currentThread().isInterrupted()) {

183

try {

184

context.execute(new TxRunnable() {

185

@Override

186

public void run(DatasetContext datasetContext) throws Exception {

187

ObjectStore<WorkItem> workQueue = datasetContext.getDataset("workQueue");

188

189

List<WorkItem> batch = new ArrayList<>();

190

for (int i = 0; i < 100; i++) {

191

WorkItem item = workQueue.read("item_" + i);

192

if (item != null) {

193

batch.add(item);

194

workQueue.delete("item_" + i);

195

}

196

}

197

198

if (!batch.isEmpty()) {

199

List<WorkItem> processed = processor.processBatch(batch);

200

for (WorkItem item : processed) {

201

workQueue.write("processed_" + item.getId(), item);

202

}

203

204

context.getMetrics().count("batch.processed", batch.size());

205

}

206

}

207

});

208

209

Thread.sleep(5000); // Wait 5 seconds between batches

210

211

} catch (InterruptedException e) {

212

Thread.currentThread().interrupt();

213

break;

214

} catch (Exception e) {

215

context.getMetrics().count("processing.errors", 1);

216

}

217

}

218

}

219

}

220

```

221

222

### Worker with Service Communication

223

224

```java

225

public class ServiceIntegrationWorker extends AbstractWorker {

226

227

@Override

228

public void configure(WorkerConfigurer configurer) {

229

configurer.setName("ServiceWorker");

230

configurer.useDataset("tasks");

231

configurer.useDataset("results");

232

}

233

234

@Override

235

public void run() {

236

WorkerContext context = getContext();

237

ServiceDiscoverer serviceDiscoverer = context.getServiceDiscoverer();

238

239

while (!Thread.currentThread().isInterrupted()) {

240

try {

241

context.execute(new TxRunnable() {

242

@Override

243

public void run(DatasetContext datasetContext) throws Exception {

244

ObjectStore<Task> tasks = datasetContext.getDataset("tasks");

245

ObjectStore<TaskResult> results = datasetContext.getDataset("results");

246

247

// Get next task

248

Task task = tasks.read("nextTask");

249

if (task != null) {

250

// Call external service for processing

251

Discoverable serviceEndpoint = serviceDiscoverer.discover("processingService");

252

if (serviceEndpoint != null) {

253

TaskResult result = callProcessingService(serviceEndpoint, task);

254

results.write(task.getId(), result);

255

tasks.delete("nextTask");

256

257

context.getMetrics().count("tasks.completed", 1);

258

}

259

}

260

}

261

});

262

263

Thread.sleep(2000);

264

265

} catch (InterruptedException e) {

266

Thread.currentThread().interrupt();

267

break;

268

} catch (Exception e) {

269

context.getMetrics().count("service.errors", 1);

270

}

271

}

272

}

273

274

private TaskResult callProcessingService(Discoverable endpoint, Task task) {

275

// Implement service call logic

276

return new TaskResult(task.getId(), "processed");

277

}

278

}

279

```

280

281

### Graceful Shutdown Worker

282

283

```java

284

public class GracefulShutdownWorker extends AbstractWorker {

285

private volatile boolean stopped = false;

286

287

@Override

288

public void configure(WorkerConfigurer configurer) {

289

configurer.setName("GracefulWorker");

290

configurer.useDataset("workItems");

291

}

292

293

@Override

294

public void run() {

295

WorkerContext context = getContext();

296

297

while (!stopped && !Thread.currentThread().isInterrupted()) {

298

try {

299

boolean hasWork = context.execute(new Callable<Boolean>() {

300

@Override

301

public Boolean call(DatasetContext datasetContext) throws Exception {

302

ObjectStore<WorkItem> workItems = datasetContext.getDataset("workItems");

303

304

WorkItem item = workItems.read("currentWork");

305

if (item != null) {

306

// Process the work item

307

processWorkItem(item);

308

workItems.delete("currentWork");

309

context.getMetrics().count("work.completed", 1);

310

return true;

311

}

312

return false;

313

}

314

});

315

316

if (!hasWork) {

317

Thread.sleep(1000); // Wait when no work available

318

}

319

320

} catch (InterruptedException e) {

321

Thread.currentThread().interrupt();

322

break;

323

} catch (Exception e) {

324

context.getMetrics().count("work.errors", 1);

325

}

326

}

327

}

328

329

@Override

330

public void stop() {

331

stopped = true;

332

}

333

334

private void processWorkItem(WorkItem item) {

335

// Implement work processing logic

336

}

337

}

338

```

339

340

### Multi-Instance Coordination Worker

341

342

```java

343

public class CoordinatedWorker extends AbstractWorker {

344

345

@Override

346

public void configure(WorkerConfigurer configurer) {

347

configurer.setName("CoordinatedWorker");

348

configurer.setInstances(3); // Run 3 instances

349

configurer.useDataset("sharedQueue");

350

configurer.useDataset("instanceStatus");

351

}

352

353

@Override

354

public void run() {

355

WorkerContext context = getContext();

356

int instanceId = context.getInstanceId();

357

int totalInstances = context.getInstanceCount();

358

359

while (!Thread.currentThread().isInterrupted()) {

360

try {

361

context.execute(new TxRunnable() {

362

@Override

363

public void run(DatasetContext datasetContext) throws Exception {

364

KeyValueTable instanceStatus = datasetContext.getDataset("instanceStatus");

365

ObjectStore<WorkItem> sharedQueue = datasetContext.getDataset("sharedQueue");

366

367

// Update instance heartbeat

368

instanceStatus.write("instance_" + instanceId,

369

String.valueOf(System.currentTimeMillis()));

370

371

// Process work assigned to this instance

372

String workKey = "work_" + (instanceId % totalInstances);

373

WorkItem work = sharedQueue.read(workKey);

374

375

if (work != null) {

376

processWork(work);

377

sharedQueue.delete(workKey);

378

context.getMetrics().count("instance_" + instanceId + ".work", 1);

379

}

380

}

381

});

382

383

Thread.sleep(5000);

384

385

} catch (InterruptedException e) {

386

Thread.currentThread().interrupt();

387

break;

388

} catch (Exception e) {

389

context.getMetrics().count("instance_" + instanceId + ".errors", 1);

390

}

391

}

392

}

393

394

private void processWork(WorkItem work) {

395

// Instance-specific work processing

396

}

397

}

398

```

399

400

Worker programs are ideal for continuous background processing, data pipeline coordination, periodic cleanup tasks, and real-time data monitoring within the CDAP platform.