or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

actions-conditions.mdbatch-processing.mdcore-pipeline.mddata-connectors.mdindex.mdjoin-operations.mdlineage-metadata.mdsql-engine.mdvalidation.md

index.mddocs/

0

# CDAP ETL API

1

2

A comprehensive Java API for building Extract, Transform, Load (ETL) pipelines in the Cloud Data Application Platform (CDAP). This API provides a rich set of interfaces and classes for developing data processing plugins including sources, sinks, transforms, aggregators, joiners, and actions.

3

4

## Package Information

5

6

**Maven Dependency:**

7

```xml

8

<dependency>

9

<groupId>io.cdap.cdap</groupId>

10

<artifactId>cdap-etl-api</artifactId>

11

<version>6.11.0</version>

12

</dependency>

13

```

14

15

**Java Package:** `io.cdap.cdap.etl.api`

16

**Java Version:** 8+

17

18

## Core Imports

19

20

```java

21

// Core Pipeline Components

22

import io.cdap.cdap.etl.api.Transform;

23

import io.cdap.cdap.etl.api.Transformation;

24

import io.cdap.cdap.etl.api.Emitter;

25

import io.cdap.cdap.etl.api.PipelineConfigurer;

26

import io.cdap.cdap.etl.api.StageContext;

27

import io.cdap.cdap.etl.api.TransformContext;

28

29

// Batch Processing

30

import io.cdap.cdap.etl.api.batch.BatchSource;

31

import io.cdap.cdap.etl.api.batch.BatchSink;

32

import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;

33

import io.cdap.cdap.etl.api.batch.BatchAggregator;

34

import io.cdap.cdap.etl.api.batch.BatchJoiner;

35

36

// Data Types

37

import io.cdap.cdap.api.data.format.StructuredRecord;

38

import io.cdap.cdap.api.data.schema.Schema;

39

40

// Plugin Configuration

41

import io.cdap.cdap.etl.api.PipelineConfigurable;

42

import io.cdap.cdap.etl.api.StageConfigurer;

43

import io.cdap.cdap.etl.api.StageLifecycle;

44

45

// Actions and Conditions

46

import io.cdap.cdap.etl.api.action.Action;

47

import io.cdap.cdap.etl.api.condition.Condition;

48

49

// Error Handling and Validation

50

import io.cdap.cdap.etl.api.ErrorEmitter;

51

import io.cdap.cdap.etl.api.InvalidEntry;

52

import io.cdap.cdap.etl.api.validation.FailureCollector;

53

```

54

55

## Basic Usage

56

57

### Simple Transform Plugin

58

59

```java

60

@Plugin(type = Transform.PLUGIN_TYPE)

61

@Name("MyTransform")

62

public class MyTransform extends Transform<StructuredRecord, StructuredRecord> {

63

64

private Config config;

65

66

@Override

67

public void configurePipeline(PipelineConfigurer pipelineConfigurer) {

68

StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();

69

Schema inputSchema = stageConfigurer.getInputSchema();

70

71

if (inputSchema != null) {

72

Schema outputSchema = buildOutputSchema(inputSchema);

73

stageConfigurer.setOutputSchema(outputSchema);

74

}

75

}

76

77

@Override

78

public void initialize(TransformContext context) throws Exception {

79

this.config = getConfig();

80

}

81

82

@Override

83

public void transform(StructuredRecord input, Emitter<StructuredRecord> emitter) throws Exception {

84

StructuredRecord.Builder builder = StructuredRecord.builder(getContext().getOutputSchema());

85

86

// Transform logic here

87

for (Schema.Field field : input.getSchema().getFields()) {

88

builder.set(field.getName(), input.get(field.getName()));

89

}

90

91

emitter.emit(builder.build());

92

}

93

}

94

```

95

96

### Basic Pipeline Configuration

97

98

```java

99

public class MyPipeline implements PipelineConfigurable {

100

101

@Override

102

public void configurePipeline(PipelineConfigurer pipelineConfigurer) {

103

StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();

104

105

// Configure stage properties

106

stageConfigurer.setName("MyStage");

107

stageConfigurer.setDescription("A sample ETL stage");

108

109

// Set input/output schemas

110

stageConfigurer.setInputSchema(inputSchema);

111

stageConfigurer.setOutputSchema(outputSchema);

112

113

// Validate configuration

114

FailureCollector collector = stageConfigurer.getFailureCollector();

115

validateConfig(collector);

116

}

117

}

118

```

119

120

## Architecture

121

122

The CDAP ETL API is built around several core concepts:

123

124

### Pipeline Stages

125

- **Sources** - Read data from external systems

126

- **Transforms** - Process and modify data records

127

- **Sinks** - Write data to external systems

128

- **Aggregators** - Perform grouping and aggregation operations

129

- **Joiners** - Combine data from multiple inputs

130

- **Actions** - Execute custom logic or external operations

131

132

### Execution Engines

133

- **MapReduce** - Traditional Hadoop MapReduce execution

134

- **Spark** - Apache Spark execution for better performance

135

- **Native** - CDAP native execution engine

136

137

### Data Flow

138

Data flows through pipeline stages using the **Emitter Pattern**:

139

- `Emitter<T>` - Emit regular data records

140

- `ErrorEmitter<T>` - Emit error records for error handling

141

- `AlertEmitter` - Emit alerts and notifications

142

- `MultiOutputEmitter<T>` - Emit to multiple output ports

143

144

## Capabilities

145

146

### Core Pipeline Development

147

148

Build ETL pipeline components with comprehensive lifecycle management, configuration validation, and multi-engine support.

149

150

```java

151

// Transform interface

152

public void transform(IN input, Emitter<OUT> emitter) throws Exception

153

154

// Aggregator interface

155

public void aggregate(GROUP_KEY groupKey, Iterator<GROUP_VALUE> groupValues,

156

Emitter<OUT> emitter) throws Exception

157

158

// Joiner interface

159

public OUT merge(JOIN_KEY joinKey, Iterable<JoinElement<INPUT_RECORD>> joinResult)

160

throws Exception

161

```

162

163

[Core Pipeline →](./core-pipeline.md)

164

165

### Batch Processing

166

167

Comprehensive support for batch data processing with sources, sinks, and specialized batch operations.

168

169

```java

170

// Batch source

171

public abstract class BatchSource<KEY_IN, VAL_IN, OUT>

172

extends BatchConfigurable<BatchSourceContext>

173

174

// Batch sink

175

public abstract class BatchSink<IN, KEY_OUT, VAL_OUT>

176

extends BatchConfigurable<BatchSinkContext>

177

```

178

179

[Batch Processing →](./batch-processing.md)

180

181

### Data Connectors

182

183

Framework for building data connectors with browse, sample, and specification generation capabilities.

184

185

```java

186

// Connector interface

187

BrowseDetail browse(ConnectorContext context, BrowseRequest request)

188

SampleDetail sample(ConnectorContext context, SampleRequest request)

189

ConnectorSpec generateSpec(ConnectorContext context, ConnectorSpecRequest request)

190

```

191

192

[Data Connectors →](./data-connectors.md)

193

194

### Join Operations

195

196

Advanced join operations with automatic join optimization and comprehensive join definitions.

197

198

```java

199

// AutoJoiner interface

200

JoinDefinition define(AutoJoinerContext context)

201

202

// Join definition builder

203

JoinDefinition.builder()

204

.select(fields)

205

.from(stages)

206

.on(joinConditions)

207

.build()

208

```

209

210

[Join Operations →](./join-operations.md)

211

212

### Validation Framework

213

214

Robust validation system for early error detection and structured error reporting.

215

216

```java

217

// Validation failure collection

218

FailureCollector collector = context.getFailureCollector();

219

ValidationFailure failure = collector.addFailure("Invalid configuration", "Use valid value")

220

.withConfigProperty("propertyName")

221

.withCorrectiveAction("Set property to valid range");

222

223

// Exception with structured failures

224

throw new ValidationException(collector.getValidationFailures());

225

```

226

227

[Validation →](./validation.md)

228

229

### SQL Engine Support

230

231

SQL engine integration for advanced query processing and optimization.

232

233

```java

234

// SQL Engine interface

235

boolean canTransform(SQLTransformDefinition transformDefinition)

236

SQLDataset transform(SQLTransformRequest transformRequest)

237

SQLDataset join(SQLJoinRequest joinRequest)

238

```

239

240

[SQL Engine →](./sql-engine.md)

241

242

### Lineage and Metadata

243

244

Track data lineage and field-level transformations for governance and debugging.

245

246

```java

247

// Field lineage recording

248

LineageRecorder recorder = context.getLineageRecorder();

249

recorder.record(Arrays.asList(

250

new FieldReadOperation("read", "Read source field",

251

Collections.singletonList("input.field")),

252

new FieldTransformOperation("transform", "Transform field",

253

Arrays.asList("input.field"), Arrays.asList("output.field"))

254

));

255

```

256

257

[Lineage & Metadata →](./lineage-metadata.md)

258

259

### Actions and Conditions

260

261

Pipeline actions and conditional execution for workflow control and external integrations.

262

263

```java

264

// Action implementation

265

public abstract class Action implements PipelineConfigurable,

266

SubmitterLifecycle<ActionContext>,

267

StageLifecycle<ActionContext>

268

269

// Condition evaluation

270

public abstract ConditionResult apply() throws Exception

271

```

272

273

[Actions & Conditions →](./actions-conditions.md)

274

275

## Plugin Types

276

277

The API supports these plugin types for the CDAP plugin system:

278

279

| Plugin Type | Constant | Description |

280

|-------------|----------|-------------|

281

| Transform | `"transform"` | Data transformation stages |

282

| Splitter Transform | `"splittertransform"` | Multi-output transformation stages |

283

| Error Transform | `"errortransform"` | Error record transformation stages |

284

| Batch Source | `"batchsource"` | Batch data sources |

285

| Batch Sink | `"batchsink"` | Batch data sinks |

286

| Batch Aggregator | `"batchaggregator"` | Batch aggregation operations |

287

| Batch Joiner | `"batchjoiner"` | Batch join operations |

288

| Action | `"action"` | Pipeline actions |

289

| Post Action | `"postaction"` | Post-run pipeline actions |

290

| Condition | `"condition"` | Conditional execution |

291

| Alert Publisher | `"alertpublisher"` | Alert publishing |

292

| Connector | `"connector"` | Data connectors |

293

| SQL Engine | `"sqlengine"` | SQL query processing engines |

294

295

## Key Interfaces

296

297

### Type Definitions

298

299

```java { .api }

300

// Core transformation interface

301

interface Transformation<IN, OUT> {

302

void transform(IN input, Emitter<OUT> emitter) throws Exception;

303

}

304

305

// Data emission interface

306

interface Emitter<T> extends AlertEmitter, ErrorEmitter<T> {

307

void emit(T value);

308

}

309

310

// Pipeline configuration interface

311

interface PipelineConfigurable {

312

void configurePipeline(PipelineConfigurer pipelineConfigurer);

313

}

314

315

// Pipeline configurer interface

316

interface PipelineConfigurer extends PluginConfigurer, DatasetConfigurer, FeatureFlagsProvider {

317

StageConfigurer getStageConfigurer();

318

}

319

320

// Stage configurer interface

321

interface StageConfigurer {

322

@Nullable Schema getInputSchema();

323

String getStageName();

324

void setOutputSchema(@Nullable Schema outputSchema);

325

FailureCollector getFailureCollector();

326

}

327

328

// Stage lifecycle interface

329

interface StageLifecycle<T> extends Destroyable {

330

void initialize(T context) throws Exception;

331

}

332

333

// Runtime context interface

334

interface StageContext extends ServiceDiscoverer, MetadataReader, MetadataWriter,

335

LineageRecorder, FeatureFlagsProvider {

336

String getStageName();

337

StageMetrics getMetrics();

338

PluginProperties getPluginProperties();

339

PluginProperties getPluginProperties(String pluginId);

340

<T> T newPluginInstance(String pluginId) throws InstantiationException;

341

Schema getInputSchema();

342

Map<String, Schema> getInputSchemas();

343

Schema getOutputSchema();

344

Map<String, Schema> getOutputSchemas();

345

Arguments getArguments();

346

}

347

```