or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

http-services.mdindex.mdsystem-services.mdworker-tasks.md

worker-tasks.mddocs/

0

# Worker Tasks

1

2

Remote task execution framework allowing system services to run tasks on worker nodes with full system context, serializable parameters, and comprehensive error handling.

3

4

## Capabilities

5

6

### RunnableTask

7

8

Interface representing a task that can be launched by a Task worker service.

9

10

```java { .api }

11

/**

12

* RunnableTask represents a task that can be launched by a Task worker service.

13

*/

14

public interface RunnableTask {

15

/**

16

* Executes the task with the provided context.

17

* @param context the execution context for the task

18

* @throws Exception if task execution fails

19

*/

20

void run(RunnableTaskContext context) throws Exception;

21

}

22

```

23

24

**Usage Example:**

25

26

```java

27

import io.cdap.cdap.api.service.worker.RunnableTask;

28

import io.cdap.cdap.api.service.worker.RunnableTaskContext;

29

30

public class DataProcessingTask implements RunnableTask {

31

@Override

32

public void run(RunnableTaskContext context) throws Exception {

33

// Get task parameters

34

String param = context.getParam();

35

String namespace = context.getNamespace();

36

37

// Perform task logic

38

String result = processData(param, namespace);

39

40

// Write result back

41

context.writeResult(result.getBytes());

42

43

// Set cleanup task if needed

44

context.setCleanupTask(() -> {

45

// Cleanup resources

46

});

47

}

48

49

private String processData(String param, String namespace) {

50

// Task implementation

51

return "processed: " + param;

52

}

53

}

54

```

55

56

### RunnableTaskRequest

57

58

Request object for launching a runnable task with parameters and configuration.

59

60

```java { .api }

61

/**

62

* Request for launching a runnable task.

63

*/

64

public class RunnableTaskRequest {

65

/**

66

* Returns the task class name.

67

* @return class name of the task to execute

68

*/

69

public String getClassName();

70

71

/**

72

* Returns the task parameter.

73

* @return task parameter or null if not set

74

*/

75

@Nullable

76

public RunnableTaskParam getParam();

77

78

/**

79

* Returns the artifact ID.

80

* @return artifact ID or null if not set

81

*/

82

@Nullable

83

public ArtifactId getArtifactId();

84

85

/**

86

* Returns the namespace.

87

* @return namespace or null if not set

88

*/

89

@Nullable

90

public String getNamespace();

91

92

/**

93

* Returns builder for RunnableTaskRequest.

94

* @param taskClassName the class name of the task

95

* @return builder instance

96

*/

97

public static Builder getBuilder(String taskClassName);

98

99

/**

100

* Builder for RunnableTaskRequest.

101

*/

102

public static class Builder {

103

/**

104

* Sets parameter for the task.

105

* @param param parameter string

106

* @return builder instance

107

*/

108

public Builder withParam(String param);

109

110

/**

111

* Sets namespace for the task.

112

* @param namespace namespace string

113

* @return builder instance

114

*/

115

public Builder withNamespace(String namespace);

116

117

/**

118

* Sets artifact ID for the task.

119

* @param artifactId artifact identifier

120

* @return builder instance

121

*/

122

public Builder withArtifact(ArtifactId artifactId);

123

124

/**

125

* Sets embedded task request.

126

* @param embeddedTaskRequest nested task request

127

* @return builder instance

128

*/

129

public Builder withEmbeddedTaskRequest(RunnableTaskRequest embeddedTaskRequest);

130

131

/**

132

* Builds the request.

133

* @return constructed RunnableTaskRequest

134

*/

135

public RunnableTaskRequest build();

136

}

137

}

138

```

139

140

**Usage Example:**

141

142

```java

143

import io.cdap.cdap.api.service.worker.RunnableTaskRequest;

144

import io.cdap.cdap.api.artifact.ArtifactId;

145

146

// Create simple task request

147

RunnableTaskRequest simpleTask = RunnableTaskRequest

148

.getBuilder("com.example.DataProcessingTask")

149

.withParam("input-data")

150

.withNamespace("analytics")

151

.build();

152

153

// Create task request with artifact

154

ArtifactId artifact = new ArtifactId("my-plugin", "1.0.0", ArtifactScope.USER);

155

RunnableTaskRequest taskWithArtifact = RunnableTaskRequest

156

.getBuilder("com.example.PluginTask")

157

.withParam("plugin-config")

158

.withArtifact(artifact)

159

.withNamespace("default")

160

.build();

161

162

// Create nested task request

163

RunnableTaskRequest embeddedTask = RunnableTaskRequest

164

.getBuilder("com.example.SubTask")

165

.withParam("sub-param")

166

.build();

167

168

RunnableTaskRequest parentTask = RunnableTaskRequest

169

.getBuilder("com.example.ParentTask")

170

.withEmbeddedTaskRequest(embeddedTask)

171

.build();

172

```

173

174

### RunnableTaskContext

175

176

Context for RunnableTask execution, providing result writing and cleanup capabilities.

177

178

```java { .api }

179

/**

180

* Represents a context for a RunnableTask. This context is used for writing back

181

* the result of RunnableTask execution.

182

*/

183

public class RunnableTaskContext {

184

/**

185

* Constructor with task request.

186

* @param taskRequest the originating task request

187

*/

188

public RunnableTaskContext(RunnableTaskRequest taskRequest);

189

190

/**

191

* Constructor with task request and system app context.

192

* @param taskRequest the originating task request

193

* @param systemAppTaskContext system app task context (nullable)

194

*/

195

public RunnableTaskContext(RunnableTaskRequest taskRequest,

196

@Nullable SystemAppTaskContext systemAppTaskContext);

197

198

/**

199

* Writes result data.

200

* @param data result data as byte array

201

* @throws IOException if writing fails

202

*/

203

public void writeResult(byte[] data) throws IOException;

204

205

/**

206

* Sets cleanup task to run after task completion.

207

* @param cleanupTask cleanup runnable

208

*/

209

public void setCleanupTask(Runnable cleanupTask);

210

211

/**

212

* Executes the cleanup task.

213

*/

214

public void executeCleanupTask();

215

216

/**

217

* Sets whether to terminate the task runner on task completion.

218

* @param terminate true to terminate after completion

219

*/

220

public void setTerminateOnComplete(boolean terminate);

221

222

/**

223

* Returns true if terminate the task runner after the task completed.

224

* @return termination flag

225

*/

226

public boolean isTerminateOnComplete();

227

228

/**

229

* Gets the result as ByteBuffer.

230

* @return result buffer

231

*/

232

public ByteBuffer getResult();

233

234

/**

235

* Returns the class name.

236

* @return task class name

237

*/

238

public String getClassName();

239

240

/**

241

* Returns the parameter.

242

* @return parameter string or null

243

*/

244

@Nullable

245

public String getParam();

246

247

/**

248

* Returns embedded request.

249

* @return embedded task request or null

250

*/

251

@Nullable

252

public RunnableTaskRequest getEmbeddedRequest();

253

254

/**

255

* Returns namespace.

256

* @return namespace string or null

257

*/

258

@Nullable

259

public String getNamespace();

260

261

/**

262

* Returns artifact ID.

263

* @return artifact ID or null

264

*/

265

@Nullable

266

public ArtifactId getArtifactId();

267

268

/**

269

* Returns the system app task context.

270

* @return system app task context or null

271

*/

272

@Nullable

273

public SystemAppTaskContext getRunnableTaskSystemAppContext();

274

}

275

```

276

277

### RunnableTaskParam

278

279

Parameter wrapper for runnable task requests supporting both simple strings and embedded task requests.

280

281

```java { .api }

282

/**

283

* Class for the parameter of RunnableTaskRequest.

284

*/

285

public class RunnableTaskParam {

286

/**

287

* Constructor with simple parameter and embedded task request.

288

* @param simpleParam parameter string (nullable)

289

* @param embeddedTaskRequest embedded task request (nullable)

290

*/

291

public RunnableTaskParam(@Nullable String simpleParam,

292

@Nullable RunnableTaskRequest embeddedTaskRequest);

293

294

/**

295

* Returns embedded task request.

296

* @return embedded task request or null

297

*/

298

@Nullable

299

public RunnableTaskRequest getEmbeddedTaskRequest();

300

301

/**

302

* Returns simple parameter.

303

* @return parameter string or null

304

*/

305

@Nullable

306

public String getSimpleParam();

307

308

/**

309

* String representation.

310

* @return string representation

311

*/

312

public String toString();

313

314

/**

315

* Equals implementation.

316

* @param o object to compare

317

* @return true if equal

318

*/

319

public boolean equals(Object o);

320

321

/**

322

* Hash code implementation.

323

* @return hash code

324

*/

325

public int hashCode();

326

}

327

```

328

329

### SystemAppTaskContext

330

331

System App context for remote tasks with plugin configuration, artifact management, and macro evaluation.

332

333

```java { .api }

334

/**

335

* System App context for a remote task.

336

*/

337

public interface SystemAppTaskContext

338

extends ServiceDiscoverer, SecureStore, AutoCloseable, FeatureFlagsProvider {

339

340

/**

341

* Fetches preferences for the given namespace.

342

* @param namespace the namespace to get preferences for

343

* @param resolved whether to resolve macros in preference values

344

* @return map of preference key-value pairs

345

* @throws Exception if fetching preferences fails

346

*/

347

Map<String, String> getPreferencesForNamespace(String namespace, boolean resolved) throws Exception;

348

349

/**

350

* Creates a PluginConfigurer that can be used to instantiate plugins at runtime.

351

* @param namespace the namespace context for plugin configuration

352

* @return plugin configurer

353

* @throws IOException if plugin configurer creation fails

354

*/

355

PluginConfigurer createPluginConfigurer(String namespace) throws IOException;

356

357

/**

358

* Creates a ServicePluginConfigurer that can be used to instantiate plugins

359

* with macro evaluation.

360

* @param namespace the namespace context for plugin configuration

361

* @return service plugin configurer

362

*/

363

ServicePluginConfigurer createServicePluginConfigurer(String namespace);

364

365

/**

366

* Evaluates macros using provided macro evaluator with the provided parsing options.

367

* @param namespace the namespace context for macro evaluation

368

* @param macros map of properties containing macros to evaluate

369

* @param evaluator the macro evaluator to use

370

* @param options macro parsing options

371

* @return map with evaluated macros

372

* @throws InvalidMacroException if macro evaluation fails

373

*/

374

Map<String, String> evaluateMacros(String namespace,

375

Map<String, String> macros,

376

MacroEvaluator evaluator,

377

MacroParserOptions options)

378

throws InvalidMacroException;

379

380

/**

381

* Returns ArtifactManager for artifact listing and class loading.

382

* @return artifact manager

383

*/

384

ArtifactManager getArtifactManager();

385

386

/**

387

* Returns String service name.

388

* @return service name

389

*/

390

String getServiceName();

391

}

392

```

393

394

### Exception Handling

395

396

Specialized exception classes for handling errors in remote task execution.

397

398

```java { .api }

399

/**

400

* An exception class for wrapping an Exception coming from remote task execution.

401

*/

402

public class RemoteExecutionException extends Exception {

403

/**

404

* Constructor with remote task exception cause.

405

* @param cause the remote task exception

406

*/

407

public RemoteExecutionException(RemoteTaskException cause);

408

409

/**

410

* Returns the remote task exception cause.

411

* @return remote task exception cause

412

*/

413

public RemoteTaskException getCause();

414

415

/**

416

* Converts a BasicThrowable to a RemoteExecutionException.

417

* @param basicThrowable the basic throwable to convert

418

* @return converted remote execution exception

419

*/

420

public static RemoteExecutionException fromBasicThrowable(BasicThrowable basicThrowable);

421

}

422

423

/**

424

* Captures the stacktrace of exceptions from remote task.

425

*/

426

public class RemoteTaskException extends Exception {

427

/**

428

* Constructor with remote exception class name, message and cause.

429

* @param remoteExceptionClassName the remote exception class name

430

* @param message the exception message

431

* @param cause the underlying cause (nullable)

432

*/

433

public RemoteTaskException(String remoteExceptionClassName, String message, @Nullable Throwable cause);

434

435

/**

436

* Returns the remote exception class name.

437

* @return remote exception class name

438

*/

439

public String getRemoteExceptionClassName();

440

441

/**

442

* String representation.

443

* @return string representation

444

*/

445

public String toString();

446

}

447

```

448

449

## Usage Patterns

450

451

### Basic Task Execution

452

453

```java

454

// In a system HTTP service handler

455

@POST

456

@Path("/process")

457

public void processData(HttpServiceRequest request, HttpServiceResponder responder) {

458

try {

459

// Create task request

460

RunnableTaskRequest taskRequest = RunnableTaskRequest

461

.getBuilder("com.example.DataProcessingTask")

462

.withParam("input-data")

463

.withNamespace("analytics")

464

.build();

465

466

// Execute task

467

byte[] result = getContext().runTask(taskRequest);

468

responder.sendBytes(result, "application/json");

469

} catch (Exception e) {

470

responder.sendError(500, "Task execution failed: " + e.getMessage());

471

}

472

}

473

```

474

475

### Task with Cleanup

476

477

```java

478

public class ResourceIntensiveTask implements RunnableTask {

479

@Override

480

public void run(RunnableTaskContext context) throws Exception {

481

// Allocate resources

482

ExternalResource resource = allocateResource();

483

484

// Set cleanup task

485

context.setCleanupTask(() -> {

486

resource.close();

487

LOG.info("Resource cleaned up");

488

});

489

490

try {

491

// Perform work with resource

492

String result = resource.process(context.getParam());

493

context.writeResult(result.getBytes());

494

} catch (Exception e) {

495

// Cleanup will still be called

496

throw e;

497

}

498

}

499

}

500

```

501

502

## Important Notes

503

504

- Tasks are executed on remote worker nodes, not in the originating service

505

- Task classes must be available on the worker node classpath or specified via artifact

506

- All task parameters must be serializable

507

- Cleanup tasks are always executed, even if the main task fails

508

- Use `SystemAppTaskContext` for advanced plugin configuration and macro evaluation

509

- Remote task execution must be enabled in CDAP configuration

510

- Task results are limited by available memory and should be kept reasonably small