0
# Application Framework
1
2
The CDAP Application Framework provides the core building blocks for creating enterprise data applications. It enables developers to compose applications from different types of programs including services, workflows, workers, MapReduce, and Spark programs.
3
4
## Application Architecture
5
6
### Application Definition
7
8
Applications are the top-level container for all programs and resources in CDAP:
9
10
```java { .api }
11
import io.cdap.cdap.api.app.*;
12
import io.cdap.cdap.api.*;
13
14
// Base application interface
15
public interface Application<T extends Config> {
16
void configure(ApplicationConfigurer configurer, ApplicationContext<T> context);
17
18
default boolean isUpdateSupported() {
19
return false;
20
}
21
22
default ApplicationUpdateResult<T> updateConfig(ApplicationUpdateContext applicationUpdateContext)
23
throws Exception {
24
throw new UnsupportedOperationException("Application config update operation is not supported.");
25
}
26
}
27
28
// Abstract base implementation
29
public abstract class AbstractApplication<T extends Config>
30
extends AbstractPluginConfigurable<ApplicationConfigurer>
31
implements Application<T> {
32
33
@Override
34
public void configure(ApplicationConfigurer configurer,
35
ApplicationContext<T> context) {
36
// Override in subclass to configure application
37
}
38
39
@Override
40
public boolean isUpdateSupported() {
41
return false;
42
}
43
44
@Override
45
public ApplicationUpdateResult<T> updateConfig(ApplicationUpdateContext context) {
46
throw new UnsupportedOperationException();
47
}
48
}
49
```
50
51
### Application Configuration
52
53
```java { .api }
54
// Application configurer interface
55
public interface ApplicationConfigurer extends PluginConfigurer, DatasetConfigurer, FeatureFlagsProvider {
56
void setName(String name);
57
void setDescription(String description);
58
59
// Add program types
60
void addMapReduce(MapReduce mapReduce);
61
void addSpark(Spark spark);
62
void addService(Service service);
63
void addWorker(Worker worker);
64
void addWorkflow(Workflow workflow);
65
66
// Schedule workflows
67
ScheduleBuilder buildSchedule(String scheduleName, ProgramType programType, String programName);
68
void schedule(ScheduleCreationSpec scheduleCreationSpec);
69
70
// Additional methods
71
void emitMetadata(Metadata metadata, MetadataScope scope);
72
TriggerFactory getTriggerFactory();
73
RuntimeConfigurer getRuntimeConfigurer();
74
String getDeployedNamespace();
75
ApplicationSpecification getDeployedApplicationSpec();
76
}
77
78
// Application context
79
public interface ApplicationContext<T extends Config> {
80
T getConfig();
81
}
82
83
// Application update support
84
public class ApplicationUpdateResult<T extends Config> {
85
public T getNewConfig() { /* returns updated configuration */ }
86
public ApplicationConfigUpdateAction getUpdateAction() { /* returns update action */ }
87
}
88
89
public enum ApplicationConfigUpdateAction {
90
UPGRADE_ARTIFACT, // Upgrade to new artifact version
91
UPDATE_CONFIG // Update application configuration
92
}
93
```
94
95
### Runtime Context
96
97
All CDAP programs receive runtime context providing access to system services:
98
99
```java { .api }
100
// Base runtime context
101
public interface RuntimeContext extends FeatureFlagsProvider {
102
ApplicationSpecification getApplicationSpecification();
103
Map<String, String> getRuntimeArguments();
104
String getClusterName();
105
String getNamespace();
106
RunId getRunId();
107
Admin getAdmin();
108
DataTracer getDataTracer(String dataTracerName);
109
}
110
```
111
112
## Services
113
114
Services provide HTTP endpoints for real-time data access and application interaction.
115
116
### Service Definition
117
118
```java { .api }
119
import io.cdap.cdap.api.service.*;
120
import io.cdap.cdap.api.service.http.*;
121
122
// Service interface
123
public interface Service extends ProgramLifecycle<ServiceContext> {
124
void configure(ServiceConfigurer configurer);
125
}
126
127
// Abstract service implementation
128
public abstract class AbstractService implements Service {
129
@Override
130
public void initialize(ServiceContext context) throws Exception {
131
// Initialize service resources
132
}
133
134
@Override
135
public void destroy() {
136
// Cleanup service resources
137
}
138
}
139
140
// Basic service with HTTP handlers
141
public class BasicService extends AbstractService {
142
@Override
143
public void configure(ServiceConfigurer configurer) {
144
configurer.setName("MyService");
145
configurer.setDescription("HTTP service for data access");
146
configurer.addHandler(new MyHttpHandler());
147
configurer.setInstances(2);
148
configurer.setResources(new Resources(1024, 2));
149
}
150
}
151
```
152
153
### HTTP Service Handlers
154
155
```java { .api }
156
// HTTP service handler interface
157
public interface HttpServiceHandler extends ProgramLifecycle<HttpServiceContext> {
158
// Lifecycle methods inherited from ProgramLifecycle
159
}
160
161
// Abstract handler implementation
162
public abstract class AbstractHttpServiceHandler implements HttpServiceHandler {
163
@Override
164
public void initialize(HttpServiceContext context) throws Exception {
165
// Initialize handler
166
}
167
168
@Override
169
public void destroy() {
170
// Cleanup handler
171
}
172
}
173
174
// HTTP service context
175
public interface HttpServiceContext
176
extends RuntimeContext, DatasetContext, ServiceDiscoverer, PluginContext {
177
178
int getInstanceId();
179
int getInstanceCount();
180
DiscoveryServiceClient getDiscoveryServiceClient();
181
}
182
```
183
184
### HTTP Request Handling
185
186
```java { .api }
187
// HTTP request and response interfaces
188
public interface HttpServiceRequest {
189
String getMethod();
190
String getUri();
191
Map<String, List<String>> getAllHeaders();
192
String getHeader(String name);
193
Map<String, List<String>> getAllParameters();
194
String getParameter(String name);
195
byte[] getContent();
196
String getContentType();
197
int getContentLength();
198
}
199
200
public interface HttpServiceResponder {
201
void sendString(int status, String data, String contentType);
202
void sendBytes(int status, byte[] data, String contentType);
203
void sendJson(int status, Object object);
204
void sendError(int status, String errorMessage);
205
void send(int status, ByteBuffer content, String contentType, Map<String, String> headers);
206
}
207
208
// Content streaming interfaces
209
public interface HttpContentProducer {
210
ByteBuffer nextChunk(TransferContext transferContext) throws Exception;
211
void onFinish() throws Exception;
212
void onError(Throwable failureCause);
213
}
214
215
public interface HttpContentConsumer {
216
void onReceived(ByteBuffer chunk, TransferContext transferContext) throws Exception;
217
void onFinish() throws Exception;
218
void onError(Throwable failureCause);
219
}
220
```
221
222
### Service Examples
223
224
```java { .api }
225
// Example HTTP service handler
226
@Path("/data")
227
public class DataServiceHandler extends AbstractHttpServiceHandler {
228
229
@UseDataSet("users")
230
private Table users;
231
232
@GET
233
@Path("/user/{id}")
234
public void getUser(HttpServiceRequest request, HttpServiceResponder responder,
235
@PathParam("id") String userId) {
236
try {
237
Row row = users.get(Bytes.toBytes(userId));
238
if (row.isEmpty()) {
239
responder.sendError(404, "User not found");
240
} else {
241
String userData = row.getString("data");
242
responder.sendString(200, userData, "application/json");
243
}
244
} catch (Exception e) {
245
responder.sendError(500, "Internal error: " + e.getMessage());
246
}
247
}
248
249
@POST
250
@Path("/user")
251
public void createUser(HttpServiceRequest request, HttpServiceResponder responder) {
252
try {
253
String content = Charset.forName("UTF-8").decode(
254
ByteBuffer.wrap(request.getContent())).toString();
255
256
JsonObject user = new JsonParser().parse(content).getAsJsonObject();
257
String userId = user.get("id").getAsString();
258
259
users.put(Bytes.toBytes(userId), "data", content);
260
responder.sendString(201, "User created", "text/plain");
261
} catch (Exception e) {
262
responder.sendError(400, "Invalid request: " + e.getMessage());
263
}
264
}
265
}
266
```
267
268
## Workers
269
270
Workers are long-running background programs for continuous data processing, monitoring, or housekeeping tasks.
271
272
### Worker Definition
273
274
```java { .api }
275
import io.cdap.cdap.api.worker.*;
276
277
// Worker interface
278
public interface Worker extends ProgramLifecycle<WorkerContext> {
279
void configure(WorkerConfigurer configurer);
280
void run() throws Exception;
281
void stop();
282
}
283
284
// Abstract worker implementation
285
public abstract class AbstractWorker
286
extends AbstractPluginConfigurable<WorkerConfigurer>
287
implements ProgramLifecycle<WorkerContext>, Worker {
288
289
@Override
290
public void initialize(WorkerContext context) throws Exception {
291
// Initialize worker resources
292
}
293
294
@Override
295
public abstract void run() throws Exception;
296
297
@Override
298
public void stop() {
299
// Graceful shutdown logic
300
}
301
302
@Override
303
public void destroy() {
304
// Cleanup resources
305
}
306
}
307
308
// Worker context
309
public interface WorkerContext
310
extends RuntimeContext, DatasetContext, ServiceDiscoverer, PluginContext {
311
312
WorkerSpecification getSpecification();
313
int getInstanceId();
314
int getInstanceCount();
315
}
316
```
317
318
### Worker Configuration
319
320
```java { .api }
321
// Worker configurer interface
322
public interface WorkerConfigurer extends ProgramConfigurer, DatasetConfigurer, PluginConfigurer {
323
void setName(String name);
324
void setDescription(String description);
325
void setInstances(int instances);
326
void setResources(Resources resources);
327
}
328
329
// Worker specification
330
public class WorkerSpecification extends AbstractProgramSpecification {
331
public int getInstances() { /* returns number of instances */ }
332
public Resources getResources() { /* returns resource allocation */ }
333
}
334
```
335
336
### Worker Examples
337
338
```java { .api }
339
// Example data ingestion worker
340
public class DataIngestionWorker extends AbstractWorker {
341
342
private volatile boolean running;
343
344
@Override
345
public void configure(WorkerConfigurer configurer) {
346
configurer.setName("DataIngestionWorker");
347
configurer.setDescription("Continuously ingests data from external source");
348
configurer.setInstances(3);
349
configurer.setResources(new Resources(512, 1));
350
}
351
352
@Override
353
public void run() throws Exception {
354
running = true;
355
356
while (running) {
357
try {
358
// Get context and datasets
359
WorkerContext context = getContext();
360
Table outputTable = context.getDataset("ingested_data");
361
362
// Ingest data (example)
363
List<DataRecord> records = fetchDataFromSource();
364
for (DataRecord record : records) {
365
outputTable.put(
366
Bytes.toBytes(record.getId()),
367
"data", record.getData(),
368
"timestamp", System.currentTimeMillis()
369
);
370
}
371
372
// Sleep before next iteration
373
Thread.sleep(5000);
374
375
} catch (InterruptedException e) {
376
Thread.currentThread().interrupt();
377
break;
378
} catch (Exception e) {
379
LOG.error("Error in data ingestion", e);
380
Thread.sleep(10000); // Wait before retry
381
}
382
}
383
}
384
385
@Override
386
public void stop() {
387
running = false;
388
}
389
390
private List<DataRecord> fetchDataFromSource() {
391
// Implementation for fetching data
392
return new ArrayList<>();
393
}
394
}
395
396
// Example monitoring worker
397
public class MetricsCollectionWorker extends AbstractWorker {
398
399
@Override
400
public void configure(WorkerConfigurer configurer) {
401
configurer.setName("MetricsCollector");
402
configurer.setDescription("Collects and aggregates application metrics");
403
}
404
405
@Override
406
public void run() throws Exception {
407
WorkerContext context = getContext();
408
Metrics metrics = context.getMetrics();
409
410
while (context.getState().equals(ProgramRunStatus.RUNNING)) {
411
// Collect custom metrics
412
collectSystemMetrics(metrics);
413
collectApplicationMetrics(context, metrics);
414
415
Thread.sleep(60000); // Collect every minute
416
}
417
}
418
419
private void collectSystemMetrics(Metrics metrics) {
420
// Emit system-level metrics
421
metrics.gauge("system.memory.used", getUsedMemory());
422
metrics.gauge("system.cpu.usage", getCpuUsage());
423
}
424
425
private void collectApplicationMetrics(WorkerContext context, Metrics metrics) {
426
// Collect application-specific metrics
427
Table userTable = context.getDataset("users");
428
long userCount = countTableRows(userTable);
429
metrics.gauge("app.users.count", userCount);
430
}
431
}
432
```
433
434
## Workflows
435
436
Workflows orchestrate the execution of multiple programs in a defined sequence, with support for conditional logic, parallel execution, and data passing.
437
438
### Workflow Definition
439
440
```java { .api }
441
import io.cdap.cdap.api.workflow.*;
442
443
// Workflow interface
444
public interface Workflow {
445
void configure(WorkflowConfigurer configurer);
446
}
447
448
// Abstract workflow implementation
449
public abstract class AbstractWorkflow
450
extends AbstractPluginConfigurable<WorkflowConfigurer>
451
implements Workflow {
452
453
@Override
454
public abstract void configure(WorkflowConfigurer configurer);
455
}
456
457
// Workflow context
458
public interface WorkflowContext
459
extends RuntimeContext, DatasetContext, ServiceDiscoverer, PluginContext {
460
461
WorkflowToken getToken();
462
WorkflowInfo getWorkflowInfo();
463
WorkflowNodeState getNodeState(String nodeId);
464
}
465
```
466
467
### Workflow Configuration
468
469
```java { .api }
470
// Workflow configurer interface
471
public interface WorkflowConfigurer
472
extends ProgramConfigurer, DatasetConfigurer, PluginConfigurer {
473
474
// Add program execution nodes
475
void addMapReduce(String mapReduce);
476
void addSpark(String spark);
477
void addAction(WorkflowAction action);
478
479
// Control flow constructs
480
WorkflowForkConfigurer fork();
481
WorkflowConditionConfigurer condition(Predicate<WorkflowContext> predicate);
482
483
// Resource allocation
484
void setDriverResources(Resources resources);
485
}
486
487
// Fork configurer for parallel execution
488
public interface WorkflowForkConfigurer {
489
WorkflowForkConfigurer addMapReduce(String mapReduce);
490
WorkflowForkConfigurer addSpark(String spark);
491
WorkflowForkConfigurer addAction(WorkflowAction action);
492
WorkflowForkConfigurer fork();
493
WorkflowForkConfigurer condition(Predicate<WorkflowContext> predicate);
494
WorkflowConfigurer join();
495
}
496
497
// Condition configurer for conditional execution
498
public interface WorkflowConditionConfigurer {
499
WorkflowConditionConfigurer addMapReduce(String mapReduce);
500
WorkflowConditionConfigurer addSpark(String spark);
501
WorkflowConditionConfigurer addAction(WorkflowAction action);
502
WorkflowConditionConfigurer fork();
503
WorkflowConditionConfigurer condition(Predicate<WorkflowContext> predicate);
504
WorkflowConditionConfigurer otherwise();
505
WorkflowConfigurer end();
506
}
507
```
508
509
### Workflow Tokens
510
511
Workflows use tokens to pass data between nodes:
512
513
```java { .api }
514
// Workflow token interface
515
public interface WorkflowToken {
516
void put(String key, String value);
517
void put(String key, String value, WorkflowToken.Scope scope);
518
519
Value get(String key);
520
Value get(String key, String nodeName);
521
Value get(String key, WorkflowToken.Scope scope);
522
523
Map<String, Value> getAll();
524
Map<String, Value> getAll(WorkflowToken.Scope scope);
525
Map<String, Map<String, Value>> getAllFromNodes();
526
527
// Token scopes
528
enum Scope {
529
SYSTEM, // System-wide token data
530
USER // User-defined token data
531
}
532
}
533
534
// Token value container
535
public class Value {
536
public String toString() { /* returns string representation */ }
537
public long getAsLong() { /* returns as long value */ }
538
public double getAsDouble() { /* returns as double value */ }
539
public boolean getAsBoolean() { /* returns as boolean value */ }
540
}
541
542
// Node value for specific workflow nodes
543
public class NodeValue {
544
public String getNodeName() { /* returns node name */ }
545
public Value getValue() { /* returns node value */ }
546
}
547
```
548
549
### Workflow Nodes and Status
550
551
```java { .api }
552
// Workflow node types
553
public enum WorkflowNodeType {
554
ACTION, // Custom action node
555
MAPREDUCE, // MapReduce program node
556
SPARK, // Spark program node
557
FORK, // Parallel execution fork
558
JOIN, // Fork join point
559
CONDITION // Conditional execution node
560
}
561
562
// Workflow node interface
563
public interface WorkflowNode {
564
String getNodeId();
565
WorkflowNodeType getType();
566
}
567
568
// Specific node implementations
569
public class WorkflowActionNode implements WorkflowNode {
570
public WorkflowActionSpecification getProgram() { /* returns action spec */ }
571
}
572
573
public class WorkflowForkNode implements WorkflowNode {
574
public List<List<WorkflowNode>> getBranches() { /* returns fork branches */ }
575
}
576
577
public class WorkflowConditionNode implements WorkflowNode {
578
public List<WorkflowNode> getIfBranch() { /* returns if branch */ }
579
public List<WorkflowNode> getElseBranch() { /* returns else branch */ }
580
public Predicate<WorkflowContext> getPredicate() { /* returns condition */ }
581
}
582
583
// Node status and state
584
public enum NodeStatus {
585
STARTING, // Node is initializing
586
RUNNING, // Node is executing
587
COMPLETED, // Node completed successfully
588
FAILED, // Node failed with error
589
KILLED // Node was terminated
590
}
591
592
public class WorkflowNodeState {
593
public String getNodeId() { /* returns node ID */ }
594
public NodeStatus getNodeStatus() { /* returns current status */ }
595
public String getFailureCause() { /* returns failure reason if failed */ }
596
}
597
```
598
599
### Custom Actions
600
601
Create custom workflow actions for specialized processing:
602
603
```java { .api }
604
// Custom action interface
605
public interface CustomAction extends ProgramLifecycle<CustomActionContext> {
606
void configure(CustomActionConfigurer configurer);
607
void run(CustomActionContext context) throws Exception;
608
}
609
610
// Abstract custom action
611
public abstract class AbstractCustomAction implements CustomAction {
612
@Override
613
public void initialize(CustomActionContext context) throws Exception {
614
// Initialize action resources
615
}
616
617
@Override
618
public abstract void run(CustomActionContext context) throws Exception;
619
620
@Override
621
public void destroy() {
622
// Cleanup action resources
623
}
624
}
625
626
// Custom action context
627
public interface CustomActionContext
628
extends RuntimeContext, DatasetContext, ServiceDiscoverer, PluginContext {
629
630
WorkflowToken getWorkflowToken();
631
CustomActionSpecification getSpecification();
632
}
633
```
634
635
### Workflow Examples
636
637
```java { .api }
638
// Example data processing workflow
639
public class DataProcessingWorkflow extends AbstractWorkflow {
640
641
@Override
642
public void configure(WorkflowConfigurer configurer) {
643
configurer.setName("DataProcessingWorkflow");
644
configurer.setDescription("Complete data processing pipeline");
645
646
// Sequential execution
647
configurer.addAction(new DataValidationAction());
648
configurer.addMapReduce("DataCleaningMapReduce");
649
650
// Conditional processing
651
configurer.condition(new DataQualityCondition())
652
.addSpark("DataTransformationSpark")
653
.addMapReduce("DataAggregationMapReduce")
654
.otherwise()
655
.addAction(new DataRepairAction())
656
.end();
657
658
// Parallel processing
659
configurer.fork()
660
.addSpark("ModelTrainingSpark")
661
.fork()
662
.addMapReduce("ReportGeneration")
663
.addAction(new NotificationAction())
664
.join()
665
.join();
666
667
configurer.addAction(new CleanupAction());
668
}
669
}
670
671
// Example condition implementation
672
public class DataQualityCondition implements Predicate<WorkflowContext> {
673
@Override
674
public boolean apply(WorkflowContext context) {
675
WorkflowToken token = context.getToken();
676
Value errorRate = token.get("data.error_rate");
677
678
if (errorRate != null) {
679
double rate = errorRate.getAsDouble();
680
return rate < 0.05; // Proceed only if error rate < 5%
681
}
682
683
return false; // Default to repair path if no data
684
}
685
}
686
687
// Example custom action
688
public class DataValidationAction extends AbstractCustomAction {
689
690
@Override
691
public void configure(CustomActionConfigurer configurer) {
692
configurer.setName("DataValidation");
693
configurer.setDescription("Validates input data quality");
694
}
695
696
@Override
697
public void run(CustomActionContext context) throws Exception {
698
Table inputData = context.getDataset("raw_data");
699
WorkflowToken token = context.getWorkflowToken();
700
701
// Perform data validation
702
long totalRecords = 0;
703
long errorRecords = 0;
704
705
Scanner scanner = inputData.scan(null, null);
706
try {
707
Row row;
708
while ((row = scanner.next()) != null) {
709
totalRecords++;
710
if (!isValidRecord(row)) {
711
errorRecords++;
712
}
713
}
714
} finally {
715
scanner.close();
716
}
717
718
// Store results in workflow token
719
double errorRate = (double) errorRecords / totalRecords;
720
token.put("data.total_records", String.valueOf(totalRecords));
721
token.put("data.error_records", String.valueOf(errorRecords));
722
token.put("data.error_rate", String.valueOf(errorRate));
723
724
context.getMetrics().gauge("validation.error_rate", errorRate);
725
}
726
727
private boolean isValidRecord(Row row) {
728
// Implement validation logic
729
return row.get("id") != null && row.get("data") != null;
730
}
731
}
732
```
733
734
## Application State Management
735
736
Applications can persist state across runs using the App State Store:
737
738
```java { .api }
739
// Application state store interface
740
public interface AppStateStore {
741
void saveState(String key, byte[] value) throws IOException;
742
byte[] getState(String key) throws IOException;
743
void deleteState(String key) throws IOException;
744
}
745
746
// Usage in application context
747
public class StatefulApplication extends AbstractApplication {
748
@Override
749
public void configure(ApplicationConfigurer configurer, ApplicationContext context) {
750
// Application can access state store through admin interface
751
// State persists across application updates and restarts
752
}
753
}
754
```
755
756
The Application Framework provides the foundation for building complex, multi-component data applications with enterprise-grade operational features including service discovery, resource management, state persistence, and orchestration capabilities.