0
# CDAP ETL API
1
2
A comprehensive Java API for building Extract, Transform, Load (ETL) pipelines in the Cloud Data Application Platform (CDAP). This API provides a rich set of interfaces and classes for developing data processing plugins including sources, sinks, transforms, aggregators, joiners, and actions.
3
4
## Package Information
5
6
**Maven Dependency:**
7
```xml
8
<dependency>
9
<groupId>io.cdap.cdap</groupId>
10
<artifactId>cdap-etl-api</artifactId>
11
<version>6.11.0</version>
12
</dependency>
13
```
14
15
**Java Package:** `io.cdap.cdap.etl.api`
16
**Java Version:** 8+
17
18
## Core Imports
19
20
```java
21
// Core Pipeline Components
22
import io.cdap.cdap.etl.api.Transform;
23
import io.cdap.cdap.etl.api.Transformation;
24
import io.cdap.cdap.etl.api.Emitter;
25
import io.cdap.cdap.etl.api.PipelineConfigurer;
26
import io.cdap.cdap.etl.api.StageContext;
27
import io.cdap.cdap.etl.api.TransformContext;
28
29
// Batch Processing
30
import io.cdap.cdap.etl.api.batch.BatchSource;
31
import io.cdap.cdap.etl.api.batch.BatchSink;
32
import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;
33
import io.cdap.cdap.etl.api.batch.BatchAggregator;
34
import io.cdap.cdap.etl.api.batch.BatchJoiner;
35
36
// Data Types
37
import io.cdap.cdap.api.data.format.StructuredRecord;
38
import io.cdap.cdap.api.data.schema.Schema;
39
40
// Plugin Configuration
41
import io.cdap.cdap.etl.api.PipelineConfigurable;
42
import io.cdap.cdap.etl.api.StageConfigurer;
43
import io.cdap.cdap.etl.api.StageLifecycle;
44
45
// Actions and Conditions
46
import io.cdap.cdap.etl.api.action.Action;
47
import io.cdap.cdap.etl.api.condition.Condition;
48
49
// Error Handling and Validation
50
import io.cdap.cdap.etl.api.ErrorEmitter;
51
import io.cdap.cdap.etl.api.InvalidEntry;
52
import io.cdap.cdap.etl.api.validation.FailureCollector;
53
```
54
55
## Basic Usage
56
57
### Simple Transform Plugin
58
59
```java
60
@Plugin(type = Transform.PLUGIN_TYPE)
61
@Name("MyTransform")
62
public class MyTransform extends Transform<StructuredRecord, StructuredRecord> {
63
64
private Config config;
65
66
@Override
67
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
68
StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
69
Schema inputSchema = stageConfigurer.getInputSchema();
70
71
if (inputSchema != null) {
72
Schema outputSchema = buildOutputSchema(inputSchema);
73
stageConfigurer.setOutputSchema(outputSchema);
74
}
75
}
76
77
@Override
78
public void initialize(TransformContext context) throws Exception {
79
this.config = getConfig();
80
}
81
82
@Override
83
public void transform(StructuredRecord input, Emitter<StructuredRecord> emitter) throws Exception {
84
StructuredRecord.Builder builder = StructuredRecord.builder(getContext().getOutputSchema());
85
86
// Transform logic here
87
for (Schema.Field field : input.getSchema().getFields()) {
88
builder.set(field.getName(), input.get(field.getName()));
89
}
90
91
emitter.emit(builder.build());
92
}
93
}
94
```
95
96
### Basic Pipeline Configuration
97
98
```java
99
public class MyPipeline implements PipelineConfigurable {
100
101
@Override
102
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
103
StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
104
105
// Configure stage properties
106
stageConfigurer.setName("MyStage");
107
stageConfigurer.setDescription("A sample ETL stage");
108
109
// Set input/output schemas
110
stageConfigurer.setInputSchema(inputSchema);
111
stageConfigurer.setOutputSchema(outputSchema);
112
113
// Validate configuration
114
FailureCollector collector = stageConfigurer.getFailureCollector();
115
validateConfig(collector);
116
}
117
}
118
```
119
120
## Architecture
121
122
The CDAP ETL API is built around several core concepts:
123
124
### Pipeline Stages
125
- **Sources** - Read data from external systems
126
- **Transforms** - Process and modify data records
127
- **Sinks** - Write data to external systems
128
- **Aggregators** - Perform grouping and aggregation operations
129
- **Joiners** - Combine data from multiple inputs
130
- **Actions** - Execute custom logic or external operations
131
132
### Execution Engines
133
- **MapReduce** - Traditional Hadoop MapReduce execution
134
- **Spark** - Apache Spark execution for better performance
135
- **Native** - CDAP native execution engine
136
137
### Data Flow
138
Data flows through pipeline stages using the **Emitter Pattern**:
139
- `Emitter<T>` - Emit regular data records
140
- `ErrorEmitter<T>` - Emit error records for error handling
141
- `AlertEmitter` - Emit alerts and notifications
142
- `MultiOutputEmitter<T>` - Emit to multiple output ports
143
144
## Capabilities
145
146
### Core Pipeline Development
147
148
Build ETL pipeline components with comprehensive lifecycle management, configuration validation, and multi-engine support.
149
150
```java
151
// Transform interface
152
public void transform(IN input, Emitter<OUT> emitter) throws Exception
153
154
// Aggregator interface
155
public void aggregate(GROUP_KEY groupKey, Iterator<GROUP_VALUE> groupValues,
156
Emitter<OUT> emitter) throws Exception
157
158
// Joiner interface
159
public OUT merge(JOIN_KEY joinKey, Iterable<JoinElement<INPUT_RECORD>> joinResult)
160
throws Exception
161
```
162
163
[Core Pipeline →](./core-pipeline.md)
164
165
### Batch Processing
166
167
Comprehensive support for batch data processing with sources, sinks, and specialized batch operations.
168
169
```java
170
// Batch source
171
public abstract class BatchSource<KEY_IN, VAL_IN, OUT>
172
extends BatchConfigurable<BatchSourceContext>
173
174
// Batch sink
175
public abstract class BatchSink<IN, KEY_OUT, VAL_OUT>
176
extends BatchConfigurable<BatchSinkContext>
177
```
178
179
[Batch Processing →](./batch-processing.md)
180
181
### Data Connectors
182
183
Framework for building data connectors with browse, sample, and specification generation capabilities.
184
185
```java
186
// Connector interface
187
BrowseDetail browse(ConnectorContext context, BrowseRequest request)
188
SampleDetail sample(ConnectorContext context, SampleRequest request)
189
ConnectorSpec generateSpec(ConnectorContext context, ConnectorSpecRequest request)
190
```
191
192
[Data Connectors →](./data-connectors.md)
193
194
### Join Operations
195
196
Advanced join operations with automatic join optimization and comprehensive join definitions.
197
198
```java
199
// AutoJoiner interface
200
JoinDefinition define(AutoJoinerContext context)
201
202
// Join definition builder
203
JoinDefinition.builder()
204
.select(fields)
205
.from(stages)
206
.on(joinConditions)
207
.build()
208
```
209
210
[Join Operations →](./join-operations.md)
211
212
### Validation Framework
213
214
Robust validation system for early error detection and structured error reporting.
215
216
```java
217
// Validation failure collection
218
FailureCollector collector = context.getFailureCollector();
219
ValidationFailure failure = collector.addFailure("Invalid configuration", "Use valid value")
220
.withConfigProperty("propertyName")
221
.withCorrectiveAction("Set property to valid range");
222
223
// Exception with structured failures
224
throw new ValidationException(collector.getValidationFailures());
225
```
226
227
[Validation →](./validation.md)
228
229
### SQL Engine Support
230
231
SQL engine integration for advanced query processing and optimization.
232
233
```java
234
// SQL Engine interface
235
boolean canTransform(SQLTransformDefinition transformDefinition)
236
SQLDataset transform(SQLTransformRequest transformRequest)
237
SQLDataset join(SQLJoinRequest joinRequest)
238
```
239
240
[SQL Engine →](./sql-engine.md)
241
242
### Lineage and Metadata
243
244
Track data lineage and field-level transformations for governance and debugging.
245
246
```java
247
// Field lineage recording
248
LineageRecorder recorder = context.getLineageRecorder();
249
recorder.record(Arrays.asList(
250
new FieldReadOperation("read", "Read source field",
251
Collections.singletonList("input.field")),
252
new FieldTransformOperation("transform", "Transform field",
253
Arrays.asList("input.field"), Arrays.asList("output.field"))
254
));
255
```
256
257
[Lineage & Metadata →](./lineage-metadata.md)
258
259
### Actions and Conditions
260
261
Pipeline actions and conditional execution for workflow control and external integrations.
262
263
```java
264
// Action implementation
265
public abstract class Action implements PipelineConfigurable,
266
SubmitterLifecycle<ActionContext>,
267
StageLifecycle<ActionContext>
268
269
// Condition evaluation
270
public abstract ConditionResult apply() throws Exception
271
```
272
273
[Actions & Conditions →](./actions-conditions.md)
274
275
## Plugin Types
276
277
The API supports these plugin types for the CDAP plugin system:
278
279
| Plugin Type | Constant | Description |
280
|-------------|----------|-------------|
281
| Transform | `"transform"` | Data transformation stages |
282
| Splitter Transform | `"splittertransform"` | Multi-output transformation stages |
283
| Error Transform | `"errortransform"` | Error record transformation stages |
284
| Batch Source | `"batchsource"` | Batch data sources |
285
| Batch Sink | `"batchsink"` | Batch data sinks |
286
| Batch Aggregator | `"batchaggregator"` | Batch aggregation operations |
287
| Batch Joiner | `"batchjoiner"` | Batch join operations |
288
| Action | `"action"` | Pipeline actions |
289
| Post Action | `"postaction"` | Post-run pipeline actions |
290
| Condition | `"condition"` | Conditional execution |
291
| Alert Publisher | `"alertpublisher"` | Alert publishing |
292
| Connector | `"connector"` | Data connectors |
293
| SQL Engine | `"sqlengine"` | SQL query processing engines |
294
295
## Key Interfaces
296
297
### Type Definitions
298
299
```java { .api }
300
// Core transformation interface
301
interface Transformation<IN, OUT> {
302
void transform(IN input, Emitter<OUT> emitter) throws Exception;
303
}
304
305
// Data emission interface
306
interface Emitter<T> extends AlertEmitter, ErrorEmitter<T> {
307
void emit(T value);
308
}
309
310
// Pipeline configuration interface
311
interface PipelineConfigurable {
312
void configurePipeline(PipelineConfigurer pipelineConfigurer);
313
}
314
315
// Pipeline configurer interface
316
interface PipelineConfigurer extends PluginConfigurer, DatasetConfigurer, FeatureFlagsProvider {
317
StageConfigurer getStageConfigurer();
318
}
319
320
// Stage configurer interface
321
interface StageConfigurer {
322
@Nullable Schema getInputSchema();
323
String getStageName();
324
void setOutputSchema(@Nullable Schema outputSchema);
325
FailureCollector getFailureCollector();
326
}
327
328
// Stage lifecycle interface
329
interface StageLifecycle<T> extends Destroyable {
330
void initialize(T context) throws Exception;
331
}
332
333
// Runtime context interface
334
interface StageContext extends ServiceDiscoverer, MetadataReader, MetadataWriter,
335
LineageRecorder, FeatureFlagsProvider {
336
String getStageName();
337
StageMetrics getMetrics();
338
PluginProperties getPluginProperties();
339
PluginProperties getPluginProperties(String pluginId);
340
<T> T newPluginInstance(String pluginId) throws InstantiationException;
341
Schema getInputSchema();
342
Map<String, Schema> getInputSchemas();
343
Schema getOutputSchema();
344
Map<String, Schema> getOutputSchemas();
345
Arguments getArguments();
346
}
347
```