or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-framework.mddata-management.mddata-processing.mdindex.mdoperational.mdplugin-system.mdsecurity-metadata.md

application-framework.mddocs/

0

# Application Framework

1

2

The CDAP Application Framework provides the core building blocks for creating enterprise data applications. It enables developers to compose applications from different types of programs including services, workflows, workers, MapReduce, and Spark programs.

3

4

## Application Architecture

5

6

### Application Definition

7

8

Applications are the top-level container for all programs and resources in CDAP:

9

10

```java { .api }

11

import io.cdap.cdap.api.app.*;

12

import io.cdap.cdap.api.*;

13

14

// Base application interface

15

public interface Application<T extends Config> {

16

void configure(ApplicationConfigurer configurer, ApplicationContext<T> context);

17

18

default boolean isUpdateSupported() {

19

return false;

20

}

21

22

default ApplicationUpdateResult<T> updateConfig(ApplicationUpdateContext applicationUpdateContext)

23

throws Exception {

24

throw new UnsupportedOperationException("Application config update operation is not supported.");

25

}

26

}

27

28

// Abstract base implementation

29

public abstract class AbstractApplication<T extends Config>

30

extends AbstractPluginConfigurable<ApplicationConfigurer>

31

implements Application<T> {

32

33

@Override

34

public void configure(ApplicationConfigurer configurer,

35

ApplicationContext<T> context) {

36

// Override in subclass to configure application

37

}

38

39

@Override

40

public boolean isUpdateSupported() {

41

return false;

42

}

43

44

@Override

45

public ApplicationUpdateResult<T> updateConfig(ApplicationUpdateContext context) {

46

throw new UnsupportedOperationException();

47

}

48

}

49

```

50

51

### Application Configuration

52

53

```java { .api }

54

// Application configurer interface

55

public interface ApplicationConfigurer extends PluginConfigurer, DatasetConfigurer, FeatureFlagsProvider {

56

void setName(String name);

57

void setDescription(String description);

58

59

// Add program types

60

void addMapReduce(MapReduce mapReduce);

61

void addSpark(Spark spark);

62

void addService(Service service);

63

void addWorker(Worker worker);

64

void addWorkflow(Workflow workflow);

65

66

// Schedule workflows

67

ScheduleBuilder buildSchedule(String scheduleName, ProgramType programType, String programName);

68

void schedule(ScheduleCreationSpec scheduleCreationSpec);

69

70

// Additional methods

71

void emitMetadata(Metadata metadata, MetadataScope scope);

72

TriggerFactory getTriggerFactory();

73

RuntimeConfigurer getRuntimeConfigurer();

74

String getDeployedNamespace();

75

ApplicationSpecification getDeployedApplicationSpec();

76

}

77

78

// Application context

79

public interface ApplicationContext<T extends Config> {

80

T getConfig();

81

}

82

83

// Application update support

84

public class ApplicationUpdateResult<T extends Config> {

85

public T getNewConfig() { /* returns updated configuration */ }

86

public ApplicationConfigUpdateAction getUpdateAction() { /* returns update action */ }

87

}

88

89

public enum ApplicationConfigUpdateAction {

90

UPGRADE_ARTIFACT, // Upgrade to new artifact version

91

UPDATE_CONFIG // Update application configuration

92

}

93

```

94

95

### Runtime Context

96

97

All CDAP programs receive runtime context providing access to system services:

98

99

```java { .api }

100

// Base runtime context

101

public interface RuntimeContext extends FeatureFlagsProvider {

102

ApplicationSpecification getApplicationSpecification();

103

Map<String, String> getRuntimeArguments();

104

String getClusterName();

105

String getNamespace();

106

RunId getRunId();

107

Admin getAdmin();

108

DataTracer getDataTracer(String dataTracerName);

109

}

110

```

111

112

## Services

113

114

Services provide HTTP endpoints for real-time data access and application interaction.

115

116

### Service Definition

117

118

```java { .api }

119

import io.cdap.cdap.api.service.*;

120

import io.cdap.cdap.api.service.http.*;

121

122

// Service interface

123

public interface Service extends ProgramLifecycle<ServiceContext> {

124

void configure(ServiceConfigurer configurer);

125

}

126

127

// Abstract service implementation

128

public abstract class AbstractService implements Service {

129

@Override

130

public void initialize(ServiceContext context) throws Exception {

131

// Initialize service resources

132

}

133

134

@Override

135

public void destroy() {

136

// Cleanup service resources

137

}

138

}

139

140

// Basic service with HTTP handlers

141

public class BasicService extends AbstractService {

142

@Override

143

public void configure(ServiceConfigurer configurer) {

144

configurer.setName("MyService");

145

configurer.setDescription("HTTP service for data access");

146

configurer.addHandler(new MyHttpHandler());

147

configurer.setInstances(2);

148

configurer.setResources(new Resources(1024, 2));

149

}

150

}

151

```

152

153

### HTTP Service Handlers

154

155

```java { .api }

156

// HTTP service handler interface

157

public interface HttpServiceHandler extends ProgramLifecycle<HttpServiceContext> {

158

// Lifecycle methods inherited from ProgramLifecycle

159

}

160

161

// Abstract handler implementation

162

public abstract class AbstractHttpServiceHandler implements HttpServiceHandler {

163

@Override

164

public void initialize(HttpServiceContext context) throws Exception {

165

// Initialize handler

166

}

167

168

@Override

169

public void destroy() {

170

// Cleanup handler

171

}

172

}

173

174

// HTTP service context

175

public interface HttpServiceContext

176

extends RuntimeContext, DatasetContext, ServiceDiscoverer, PluginContext {

177

178

int getInstanceId();

179

int getInstanceCount();

180

DiscoveryServiceClient getDiscoveryServiceClient();

181

}

182

```

183

184

### HTTP Request Handling

185

186

```java { .api }

187

// HTTP request and response interfaces

188

public interface HttpServiceRequest {

189

String getMethod();

190

String getUri();

191

Map<String, List<String>> getAllHeaders();

192

String getHeader(String name);

193

Map<String, List<String>> getAllParameters();

194

String getParameter(String name);

195

byte[] getContent();

196

String getContentType();

197

int getContentLength();

198

}

199

200

public interface HttpServiceResponder {

201

void sendString(int status, String data, String contentType);

202

void sendBytes(int status, byte[] data, String contentType);

203

void sendJson(int status, Object object);

204

void sendError(int status, String errorMessage);

205

void send(int status, ByteBuffer content, String contentType, Map<String, String> headers);

206

}

207

208

// Content streaming interfaces

209

public interface HttpContentProducer {

210

ByteBuffer nextChunk(TransferContext transferContext) throws Exception;

211

void onFinish() throws Exception;

212

void onError(Throwable failureCause);

213

}

214

215

public interface HttpContentConsumer {

216

void onReceived(ByteBuffer chunk, TransferContext transferContext) throws Exception;

217

void onFinish() throws Exception;

218

void onError(Throwable failureCause);

219

}

220

```

221

222

### Service Examples

223

224

```java { .api }

225

// Example HTTP service handler

226

@Path("/data")

227

public class DataServiceHandler extends AbstractHttpServiceHandler {

228

229

@UseDataSet("users")

230

private Table users;

231

232

@GET

233

@Path("/user/{id}")

234

public void getUser(HttpServiceRequest request, HttpServiceResponder responder,

235

@PathParam("id") String userId) {

236

try {

237

Row row = users.get(Bytes.toBytes(userId));

238

if (row.isEmpty()) {

239

responder.sendError(404, "User not found");

240

} else {

241

String userData = row.getString("data");

242

responder.sendString(200, userData, "application/json");

243

}

244

} catch (Exception e) {

245

responder.sendError(500, "Internal error: " + e.getMessage());

246

}

247

}

248

249

@POST

250

@Path("/user")

251

public void createUser(HttpServiceRequest request, HttpServiceResponder responder) {

252

try {

253

String content = Charset.forName("UTF-8").decode(

254

ByteBuffer.wrap(request.getContent())).toString();

255

256

JsonObject user = new JsonParser().parse(content).getAsJsonObject();

257

String userId = user.get("id").getAsString();

258

259

users.put(Bytes.toBytes(userId), "data", content);

260

responder.sendString(201, "User created", "text/plain");

261

} catch (Exception e) {

262

responder.sendError(400, "Invalid request: " + e.getMessage());

263

}

264

}

265

}

266

```

267

268

## Workers

269

270

Workers are long-running background programs for continuous data processing, monitoring, or housekeeping tasks.

271

272

### Worker Definition

273

274

```java { .api }

275

import io.cdap.cdap.api.worker.*;

276

277

// Worker interface

278

public interface Worker extends ProgramLifecycle<WorkerContext> {

279

void configure(WorkerConfigurer configurer);

280

void run() throws Exception;

281

void stop();

282

}

283

284

// Abstract worker implementation

285

public abstract class AbstractWorker

286

extends AbstractPluginConfigurable<WorkerConfigurer>

287

implements ProgramLifecycle<WorkerContext>, Worker {

288

289

@Override

290

public void initialize(WorkerContext context) throws Exception {

291

// Initialize worker resources

292

}

293

294

@Override

295

public abstract void run() throws Exception;

296

297

@Override

298

public void stop() {

299

// Graceful shutdown logic

300

}

301

302

@Override

303

public void destroy() {

304

// Cleanup resources

305

}

306

}

307

308

// Worker context

309

public interface WorkerContext

310

extends RuntimeContext, DatasetContext, ServiceDiscoverer, PluginContext {

311

312

WorkerSpecification getSpecification();

313

int getInstanceId();

314

int getInstanceCount();

315

}

316

```

317

318

### Worker Configuration

319

320

```java { .api }

321

// Worker configurer interface

322

public interface WorkerConfigurer extends ProgramConfigurer, DatasetConfigurer, PluginConfigurer {

323

void setName(String name);

324

void setDescription(String description);

325

void setInstances(int instances);

326

void setResources(Resources resources);

327

}

328

329

// Worker specification

330

public class WorkerSpecification extends AbstractProgramSpecification {

331

public int getInstances() { /* returns number of instances */ }

332

public Resources getResources() { /* returns resource allocation */ }

333

}

334

```

335

336

### Worker Examples

337

338

```java { .api }

339

// Example data ingestion worker

340

public class DataIngestionWorker extends AbstractWorker {

341

342

private volatile boolean running;

343

344

@Override

345

public void configure(WorkerConfigurer configurer) {

346

configurer.setName("DataIngestionWorker");

347

configurer.setDescription("Continuously ingests data from external source");

348

configurer.setInstances(3);

349

configurer.setResources(new Resources(512, 1));

350

}

351

352

@Override

353

public void run() throws Exception {

354

running = true;

355

356

while (running) {

357

try {

358

// Get context and datasets

359

WorkerContext context = getContext();

360

Table outputTable = context.getDataset("ingested_data");

361

362

// Ingest data (example)

363

List<DataRecord> records = fetchDataFromSource();

364

for (DataRecord record : records) {

365

outputTable.put(

366

Bytes.toBytes(record.getId()),

367

"data", record.getData(),

368

"timestamp", System.currentTimeMillis()

369

);

370

}

371

372

// Sleep before next iteration

373

Thread.sleep(5000);

374

375

} catch (InterruptedException e) {

376

Thread.currentThread().interrupt();

377

break;

378

} catch (Exception e) {

379

LOG.error("Error in data ingestion", e);

380

Thread.sleep(10000); // Wait before retry

381

}

382

}

383

}

384

385

@Override

386

public void stop() {

387

running = false;

388

}

389

390

private List<DataRecord> fetchDataFromSource() {

391

// Implementation for fetching data

392

return new ArrayList<>();

393

}

394

}

395

396

// Example monitoring worker

397

public class MetricsCollectionWorker extends AbstractWorker {

398

399

@Override

400

public void configure(WorkerConfigurer configurer) {

401

configurer.setName("MetricsCollector");

402

configurer.setDescription("Collects and aggregates application metrics");

403

}

404

405

@Override

406

public void run() throws Exception {

407

WorkerContext context = getContext();

408

Metrics metrics = context.getMetrics();

409

410

while (context.getState().equals(ProgramRunStatus.RUNNING)) {

411

// Collect custom metrics

412

collectSystemMetrics(metrics);

413

collectApplicationMetrics(context, metrics);

414

415

Thread.sleep(60000); // Collect every minute

416

}

417

}

418

419

private void collectSystemMetrics(Metrics metrics) {

420

// Emit system-level metrics

421

metrics.gauge("system.memory.used", getUsedMemory());

422

metrics.gauge("system.cpu.usage", getCpuUsage());

423

}

424

425

private void collectApplicationMetrics(WorkerContext context, Metrics metrics) {

426

// Collect application-specific metrics

427

Table userTable = context.getDataset("users");

428

long userCount = countTableRows(userTable);

429

metrics.gauge("app.users.count", userCount);

430

}

431

}

432

```

433

434

## Workflows

435

436

Workflows orchestrate the execution of multiple programs in a defined sequence, with support for conditional logic, parallel execution, and data passing.

437

438

### Workflow Definition

439

440

```java { .api }

441

import io.cdap.cdap.api.workflow.*;

442

443

// Workflow interface

444

public interface Workflow {

445

void configure(WorkflowConfigurer configurer);

446

}

447

448

// Abstract workflow implementation

449

public abstract class AbstractWorkflow

450

extends AbstractPluginConfigurable<WorkflowConfigurer>

451

implements Workflow {

452

453

@Override

454

public abstract void configure(WorkflowConfigurer configurer);

455

}

456

457

// Workflow context

458

public interface WorkflowContext

459

extends RuntimeContext, DatasetContext, ServiceDiscoverer, PluginContext {

460

461

WorkflowToken getToken();

462

WorkflowInfo getWorkflowInfo();

463

WorkflowNodeState getNodeState(String nodeId);

464

}

465

```

466

467

### Workflow Configuration

468

469

```java { .api }

470

// Workflow configurer interface

471

public interface WorkflowConfigurer

472

extends ProgramConfigurer, DatasetConfigurer, PluginConfigurer {

473

474

// Add program execution nodes

475

void addMapReduce(String mapReduce);

476

void addSpark(String spark);

477

void addAction(WorkflowAction action);

478

479

// Control flow constructs

480

WorkflowForkConfigurer fork();

481

WorkflowConditionConfigurer condition(Predicate<WorkflowContext> predicate);

482

483

// Resource allocation

484

void setDriverResources(Resources resources);

485

}

486

487

// Fork configurer for parallel execution

488

public interface WorkflowForkConfigurer {

489

WorkflowForkConfigurer addMapReduce(String mapReduce);

490

WorkflowForkConfigurer addSpark(String spark);

491

WorkflowForkConfigurer addAction(WorkflowAction action);

492

WorkflowForkConfigurer fork();

493

WorkflowForkConfigurer condition(Predicate<WorkflowContext> predicate);

494

WorkflowConfigurer join();

495

}

496

497

// Condition configurer for conditional execution

498

public interface WorkflowConditionConfigurer {

499

WorkflowConditionConfigurer addMapReduce(String mapReduce);

500

WorkflowConditionConfigurer addSpark(String spark);

501

WorkflowConditionConfigurer addAction(WorkflowAction action);

502

WorkflowConditionConfigurer fork();

503

WorkflowConditionConfigurer condition(Predicate<WorkflowContext> predicate);

504

WorkflowConditionConfigurer otherwise();

505

WorkflowConfigurer end();

506

}

507

```

508

509

### Workflow Tokens

510

511

Workflows use tokens to pass data between nodes:

512

513

```java { .api }

514

// Workflow token interface

515

public interface WorkflowToken {

516

void put(String key, String value);

517

void put(String key, String value, WorkflowToken.Scope scope);

518

519

Value get(String key);

520

Value get(String key, String nodeName);

521

Value get(String key, WorkflowToken.Scope scope);

522

523

Map<String, Value> getAll();

524

Map<String, Value> getAll(WorkflowToken.Scope scope);

525

Map<String, Map<String, Value>> getAllFromNodes();

526

527

// Token scopes

528

enum Scope {

529

SYSTEM, // System-wide token data

530

USER // User-defined token data

531

}

532

}

533

534

// Token value container

535

public class Value {

536

public String toString() { /* returns string representation */ }

537

public long getAsLong() { /* returns as long value */ }

538

public double getAsDouble() { /* returns as double value */ }

539

public boolean getAsBoolean() { /* returns as boolean value */ }

540

}

541

542

// Node value for specific workflow nodes

543

public class NodeValue {

544

public String getNodeName() { /* returns node name */ }

545

public Value getValue() { /* returns node value */ }

546

}

547

```

548

549

### Workflow Nodes and Status

550

551

```java { .api }

552

// Workflow node types

553

public enum WorkflowNodeType {

554

ACTION, // Custom action node

555

MAPREDUCE, // MapReduce program node

556

SPARK, // Spark program node

557

FORK, // Parallel execution fork

558

JOIN, // Fork join point

559

CONDITION // Conditional execution node

560

}

561

562

// Workflow node interface

563

public interface WorkflowNode {

564

String getNodeId();

565

WorkflowNodeType getType();

566

}

567

568

// Specific node implementations

569

public class WorkflowActionNode implements WorkflowNode {

570

public WorkflowActionSpecification getProgram() { /* returns action spec */ }

571

}

572

573

public class WorkflowForkNode implements WorkflowNode {

574

public List<List<WorkflowNode>> getBranches() { /* returns fork branches */ }

575

}

576

577

public class WorkflowConditionNode implements WorkflowNode {

578

public List<WorkflowNode> getIfBranch() { /* returns if branch */ }

579

public List<WorkflowNode> getElseBranch() { /* returns else branch */ }

580

public Predicate<WorkflowContext> getPredicate() { /* returns condition */ }

581

}

582

583

// Node status and state

584

public enum NodeStatus {

585

STARTING, // Node is initializing

586

RUNNING, // Node is executing

587

COMPLETED, // Node completed successfully

588

FAILED, // Node failed with error

589

KILLED // Node was terminated

590

}

591

592

public class WorkflowNodeState {

593

public String getNodeId() { /* returns node ID */ }

594

public NodeStatus getNodeStatus() { /* returns current status */ }

595

public String getFailureCause() { /* returns failure reason if failed */ }

596

}

597

```

598

599

### Custom Actions

600

601

Create custom workflow actions for specialized processing:

602

603

```java { .api }

604

// Custom action interface

605

public interface CustomAction extends ProgramLifecycle<CustomActionContext> {

606

void configure(CustomActionConfigurer configurer);

607

void run(CustomActionContext context) throws Exception;

608

}

609

610

// Abstract custom action

611

public abstract class AbstractCustomAction implements CustomAction {

612

@Override

613

public void initialize(CustomActionContext context) throws Exception {

614

// Initialize action resources

615

}

616

617

@Override

618

public abstract void run(CustomActionContext context) throws Exception;

619

620

@Override

621

public void destroy() {

622

// Cleanup action resources

623

}

624

}

625

626

// Custom action context

627

public interface CustomActionContext

628

extends RuntimeContext, DatasetContext, ServiceDiscoverer, PluginContext {

629

630

WorkflowToken getWorkflowToken();

631

CustomActionSpecification getSpecification();

632

}

633

```

634

635

### Workflow Examples

636

637

```java { .api }

638

// Example data processing workflow

639

public class DataProcessingWorkflow extends AbstractWorkflow {

640

641

@Override

642

public void configure(WorkflowConfigurer configurer) {

643

configurer.setName("DataProcessingWorkflow");

644

configurer.setDescription("Complete data processing pipeline");

645

646

// Sequential execution

647

configurer.addAction(new DataValidationAction());

648

configurer.addMapReduce("DataCleaningMapReduce");

649

650

// Conditional processing

651

configurer.condition(new DataQualityCondition())

652

.addSpark("DataTransformationSpark")

653

.addMapReduce("DataAggregationMapReduce")

654

.otherwise()

655

.addAction(new DataRepairAction())

656

.end();

657

658

// Parallel processing

659

configurer.fork()

660

.addSpark("ModelTrainingSpark")

661

.fork()

662

.addMapReduce("ReportGeneration")

663

.addAction(new NotificationAction())

664

.join()

665

.join();

666

667

configurer.addAction(new CleanupAction());

668

}

669

}

670

671

// Example condition implementation

672

public class DataQualityCondition implements Predicate<WorkflowContext> {

673

@Override

674

public boolean apply(WorkflowContext context) {

675

WorkflowToken token = context.getToken();

676

Value errorRate = token.get("data.error_rate");

677

678

if (errorRate != null) {

679

double rate = errorRate.getAsDouble();

680

return rate < 0.05; // Proceed only if error rate < 5%

681

}

682

683

return false; // Default to repair path if no data

684

}

685

}

686

687

// Example custom action

688

public class DataValidationAction extends AbstractCustomAction {

689

690

@Override

691

public void configure(CustomActionConfigurer configurer) {

692

configurer.setName("DataValidation");

693

configurer.setDescription("Validates input data quality");

694

}

695

696

@Override

697

public void run(CustomActionContext context) throws Exception {

698

Table inputData = context.getDataset("raw_data");

699

WorkflowToken token = context.getWorkflowToken();

700

701

// Perform data validation

702

long totalRecords = 0;

703

long errorRecords = 0;

704

705

Scanner scanner = inputData.scan(null, null);

706

try {

707

Row row;

708

while ((row = scanner.next()) != null) {

709

totalRecords++;

710

if (!isValidRecord(row)) {

711

errorRecords++;

712

}

713

}

714

} finally {

715

scanner.close();

716

}

717

718

// Store results in workflow token

719

double errorRate = (double) errorRecords / totalRecords;

720

token.put("data.total_records", String.valueOf(totalRecords));

721

token.put("data.error_records", String.valueOf(errorRecords));

722

token.put("data.error_rate", String.valueOf(errorRate));

723

724

context.getMetrics().gauge("validation.error_rate", errorRate);

725

}

726

727

private boolean isValidRecord(Row row) {

728

// Implement validation logic

729

return row.get("id") != null && row.get("data") != null;

730

}

731

}

732

```

733

734

## Application State Management

735

736

Applications can persist state across runs using the App State Store:

737

738

```java { .api }

739

// Application state store interface

740

public interface AppStateStore {

741

void saveState(String key, byte[] value) throws IOException;

742

byte[] getState(String key) throws IOException;

743

void deleteState(String key) throws IOException;

744

}

745

746

// Usage in application context

747

public class StatefulApplication extends AbstractApplication {

748

@Override

749

public void configure(ApplicationConfigurer configurer, ApplicationContext context) {

750

// Application can access state store through admin interface

751

// State persists across application updates and restarts

752

}

753

}

754

```

755

756

The Application Framework provides the foundation for building complex, multi-component data applications with enterprise-grade operational features including service discovery, resource management, state persistence, and orchestration capabilities.