Java API interfaces and abstract classes for developing Apache Spark-based ETL operations within the CDAP ecosystem
npx @tessl/cli install tessl/maven-co-cask-cdap--cdap-etl-api-spark@5.1.00
# CDAP ETL API Spark
1
2
CDAP ETL API Spark provides Java API interfaces and abstract classes for developing Apache Spark-based ETL (Extract, Transform, Load) operations within the CDAP (Cask Data Application Platform) ecosystem. It enables developers to create custom Spark transformations, sinks, and streaming components that integrate seamlessly with CDAP's data processing pipelines.
3
4
## Package Information
5
6
- **Package Name**: cdap-etl-api-spark
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Group ID**: co.cask.cdap
10
- **Artifact ID**: cdap-etl-api-spark
11
- **Installation**:
12
13
Maven:
14
```xml
15
<dependency>
16
<groupId>co.cask.cdap</groupId>
17
<artifactId>cdap-etl-api-spark</artifactId>
18
<version>5.1.2</version>
19
</dependency>
20
```
21
22
Gradle:
23
```gradle
24
implementation 'co.cask.cdap:cdap-etl-api-spark:5.1.2'
25
```
26
27
## Core Imports
28
29
```java
30
import co.cask.cdap.etl.api.batch.SparkSink;
31
import co.cask.cdap.etl.api.batch.SparkCompute;
32
import co.cask.cdap.etl.api.batch.SparkPluginContext;
33
import co.cask.cdap.etl.api.batch.SparkExecutionPluginContext;
34
import co.cask.cdap.etl.api.streaming.StreamingContext;
35
import co.cask.cdap.etl.api.streaming.StreamingSource;
36
import co.cask.cdap.etl.api.streaming.Windower;
37
```
38
39
## Basic Usage
40
41
```java
42
import co.cask.cdap.etl.api.batch.SparkSink;
43
import co.cask.cdap.etl.api.batch.SparkPluginContext;
44
import co.cask.cdap.etl.api.batch.SparkExecutionPluginContext;
45
import co.cask.cdap.etl.api.PipelineConfigurer;
46
import org.apache.spark.api.java.JavaRDD;
47
import org.apache.spark.api.java.JavaPairRDD;
48
import scala.Tuple2;
49
50
// Example Spark sink implementation
51
public class MyDataSink extends SparkSink<MyRecord> {
52
53
@Override
54
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
55
// Configure pipeline datasets if needed
56
super.configurePipeline(pipelineConfigurer);
57
}
58
59
@Override
60
public void prepareRun(SparkPluginContext context) throws Exception {
61
// Prepare for batch run - configure Spark settings if needed
62
}
63
64
@Override
65
public void run(SparkExecutionPluginContext context, JavaRDD<MyRecord> input) throws Exception {
66
// Persist RDD data to storage
67
JavaPairRDD<String, MyRecord> keyedData = input.mapToPair(record ->
68
new Tuple2<>(record.getKey(), record));
69
context.saveAsDataset(keyedData, "output-dataset");
70
}
71
72
@Override
73
public void onRunFinish(boolean succeeded, SparkPluginContext context) {
74
// Cleanup after batch run completes
75
super.onRunFinish(succeeded, context);
76
}
77
}
78
```
79
80
## Architecture
81
82
CDAP ETL API Spark is built around several key components:
83
84
- **Batch Processing**: SparkSink and SparkCompute for batch ETL operations with RDD support
85
- **Streaming Processing**: StreamingSource and Windower for real-time data processing with DStream support
86
- **Context Management**: Rich context interfaces providing access to datasets, streams, metrics, and configuration
87
- **Plugin Framework**: Abstract base classes implementing CDAP's plugin lifecycle and configuration patterns
88
- **Integration Layer**: Seamless integration with CDAP's data platform, including lineage tracking and transactional operations
89
90
## Capabilities
91
92
### Batch ETL Operations
93
94
Core batch processing capabilities for Spark-based ETL pipelines. Provides abstract classes for implementing data sinks and compute transformations with full access to Spark RDDs and CDAP datasets.
95
96
```java { .api }
97
// Spark sink for data persistence
98
public abstract class SparkSink<IN> extends BatchConfigurable<SparkPluginContext> {
99
public abstract void run(SparkExecutionPluginContext context, JavaRDD<IN> input) throws Exception;
100
}
101
102
// Spark compute for data transformations
103
public abstract class SparkCompute<IN, OUT> implements PipelineConfigurable {
104
public abstract JavaRDD<OUT> transform(SparkExecutionPluginContext context, JavaRDD<IN> input) throws Exception;
105
public void initialize(SparkExecutionPluginContext context) throws Exception;
106
}
107
```
108
109
[Batch ETL Operations](./batch-etl.md)
110
111
### Streaming ETL Operations
112
113
Real-time data processing capabilities for Spark Streaming pipelines. Provides interfaces and abstract classes for streaming sources and windowing operations with DStream support.
114
115
```java { .api }
116
// Streaming source for real-time data
117
public abstract class StreamingSource<T> implements PipelineConfigurable {
118
public abstract JavaDStream<T> getStream(StreamingContext context) throws Exception;
119
public int getRequiredExecutors();
120
}
121
122
// Windowing for time-based aggregations
123
public abstract class Windower implements PipelineConfigurable {
124
public abstract long getWidth();
125
public abstract long getSlideInterval();
126
}
127
```
128
129
[Streaming ETL Operations](./streaming-etl.md)
130
131
### Execution Context
132
133
Rich context interfaces providing access to Spark execution environment, CDAP datasets, streams, and runtime configuration. Essential for implementing ETL operations that integrate with CDAP's data platform.
134
135
```java { .api }
136
// Primary execution context for Spark operations
137
public interface SparkExecutionPluginContext extends DatasetContext, TransformContext {
138
JavaSparkContext getSparkContext();
139
long getLogicalStartTime();
140
Map<String, String> getRuntimeArguments();
141
<K, V> JavaPairRDD<K, V> fromDataset(String datasetName);
142
<K, V> void saveAsDataset(JavaPairRDD<K, V> rdd, String datasetName);
143
JavaRDD<StreamEvent> fromStream(String streamName);
144
}
145
```
146
147
[Execution Context](./execution-context.md)
148
149
## Type Definitions
150
151
### Core Parent Types
152
153
```java { .api }
154
/**
155
* Base class for Batch run configuration methods.
156
* @param <T> batch execution context
157
*/
158
@Beta
159
public abstract class BatchConfigurable<T extends BatchContext> implements PipelineConfigurable, SubmitterLifecycle<T> {
160
161
@Override
162
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
163
// Default no-op implementation
164
}
165
166
/**
167
* Prepare the Batch run. Used to configure the job before starting the run.
168
* @param context batch execution context
169
* @throws Exception if there's an error during this method invocation
170
*/
171
@Override
172
public abstract void prepareRun(T context) throws Exception;
173
174
/**
175
* Invoked after the Batch run finishes. Used to perform any end of the run logic.
176
* @param succeeded defines the result of batch execution: true if run succeeded, false otherwise
177
* @param context batch execution context
178
*/
179
@Override
180
public void onRunFinish(boolean succeeded, T context) {
181
// Default no-op implementation
182
}
183
}
184
185
/**
186
* Context passed to Batch Source and Sink.
187
*/
188
@Beta
189
public interface BatchContext extends DatasetContext, TransformContext {
190
191
/**
192
* Create a new dataset instance.
193
* @param datasetName the name of the new dataset
194
* @param typeName the type of the dataset to create
195
* @param properties the properties for the new dataset
196
* @throws InstanceConflictException if the dataset already exists
197
* @throws DatasetManagementException for any issues encountered in the dataset system
198
*/
199
void createDataset(String datasetName, String typeName, DatasetProperties properties)
200
throws DatasetManagementException;
201
202
/**
203
* Check whether a dataset exists in the current namespace.
204
* @param datasetName the name of the dataset
205
* @return whether a dataset of that name exists
206
* @throws DatasetManagementException for any issues encountered in the dataset system
207
*/
208
boolean datasetExists(String datasetName) throws DatasetManagementException;
209
210
/**
211
* Returns settable pipeline arguments.
212
* @return settable pipeline arguments
213
*/
214
@Override
215
SettableArguments getArguments();
216
}
217
218
/**
219
* Interface for configuring ETL pipelines.
220
*/
221
public interface PipelineConfigurable {
222
223
/**
224
* Configure a pipeline.
225
* @param pipelineConfigurer the configurer used to add required datasets and streams
226
* @throws IllegalArgumentException if the given config is invalid
227
*/
228
void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException;
229
}
230
```