0
# Batch ETL Operations
1
2
Core batch processing capabilities for Spark-based ETL pipelines within CDAP. Provides abstract classes for implementing data sinks and compute transformations with full access to Spark RDDs and CDAP datasets.
3
4
## Capabilities
5
6
### SparkSink
7
8
Abstract class for implementing the final stage of a batch ETL pipeline. SparkSink performs RDD operations and is responsible for persisting data to external storage systems or CDAP datasets.
9
10
```java { .api }
11
/**
12
* SparkSink composes a final, optional stage of a Batch ETL Pipeline. In addition to configuring the Batch run, it
13
* can also perform RDD operations on the key value pairs provided by the Batch run.
14
*
15
* {@link SparkSink#run} method is called inside the Batch Run while {@link SparkSink#prepareRun} and
16
* {@link SparkSink#onRunFinish} methods are called on the client side, which launches the Batch run, before the
17
* Batch run starts and after it finishes respectively.
18
*
19
* @param <IN> The type of input record to the SparkSink.
20
*/
21
@Beta
22
public abstract class SparkSink<IN> extends BatchConfigurable<SparkPluginContext> implements Serializable {
23
24
public static final String PLUGIN_TYPE = "sparksink";
25
26
private static final long serialVersionUID = -8600555200583639593L;
27
28
/**
29
* User Spark job which will be executed and is responsible for persisting any data.
30
* @param context {@link SparkExecutionPluginContext} for this job
31
* @param input the input from previous stages of the Batch run.
32
*/
33
public abstract void run(SparkExecutionPluginContext context, JavaRDD<IN> input) throws Exception;
34
35
// Inherited from BatchConfigurable<SparkPluginContext>:
36
37
/**
38
* Configure the ETL pipeline by adding required datasets and streams.
39
* @param pipelineConfigurer The configurer for adding datasets and streams
40
*/
41
@Override
42
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
43
// Default no-op implementation
44
}
45
46
/**
47
* Prepare the Batch run. Used to configure the job before starting the run.
48
* @param context batch execution context
49
* @throws Exception if there's an error during this method invocation
50
*/
51
@Override
52
public abstract void prepareRun(SparkPluginContext context) throws Exception;
53
54
/**
55
* Invoked after the Batch run finishes. Used to perform any end of the run logic.
56
* @param succeeded defines the result of batch execution: true if run succeeded, false otherwise
57
* @param context batch execution context
58
*/
59
@Override
60
public void onRunFinish(boolean succeeded, SparkPluginContext context) {
61
// Default no-op implementation
62
}
63
}
64
```
65
66
**Usage Example:**
67
68
```java
69
import co.cask.cdap.etl.api.batch.SparkSink;
70
import co.cask.cdap.etl.api.batch.SparkPluginContext;
71
import co.cask.cdap.etl.api.batch.SparkExecutionPluginContext;
72
import co.cask.cdap.etl.api.PipelineConfigurer;
73
import org.apache.spark.api.java.JavaRDD;
74
import org.apache.spark.api.java.JavaPairRDD;
75
import org.apache.spark.SparkConf;
76
import scala.Tuple2;
77
78
public class DatabaseSink extends SparkSink<UserRecord> {
79
80
@Override
81
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
82
// Configure any required datasets or streams
83
super.configurePipeline(pipelineConfigurer);
84
}
85
86
@Override
87
public void prepareRun(SparkPluginContext context) throws Exception {
88
// Configure Spark settings for this job
89
SparkConf sparkConf = new SparkConf()
90
.set("spark.executor.memory", "2g")
91
.set("spark.executor.cores", "2");
92
93
context.setSparkConf(sparkConf);
94
}
95
96
@Override
97
public void run(SparkExecutionPluginContext context, JavaRDD<UserRecord> input) throws Exception {
98
// Transform to key-value pairs and save to dataset
99
JavaPairRDD<String, UserRecord> keyedData = input.mapToPair(record ->
100
new Tuple2<>(record.getId(), record));
101
102
context.saveAsDataset(keyedData, "user-database");
103
}
104
105
@Override
106
public void onRunFinish(boolean succeeded, SparkPluginContext context) {
107
// Cleanup logic if needed
108
super.onRunFinish(succeeded, context);
109
}
110
}
111
```
112
113
### SparkCompute
114
115
Abstract class for implementing data transformation stages in a batch ETL pipeline. SparkCompute transforms input RDDs into output RDDs with different types and structures.
116
117
```java { .api }
118
/**
119
* Spark Compute stage for data transformations.
120
* @param <IN> Type of input object
121
* @param <OUT> Type of output object
122
*/
123
@Beta
124
public abstract class SparkCompute<IN, OUT> implements PipelineConfigurable, Serializable {
125
126
public static final String PLUGIN_TYPE = "sparkcompute";
127
128
/**
129
* Configure the ETL pipeline by adding required datasets and streams.
130
* @param pipelineConfigurer The configurer for adding datasets and streams
131
*/
132
@Override
133
public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException {
134
// Default no-op implementation
135
}
136
137
/**
138
* Initialize the plugin before any transform calls are made.
139
* @param context SparkExecutionPluginContext for this job
140
*/
141
public void initialize(SparkExecutionPluginContext context) throws Exception {
142
// Default no-op implementation
143
}
144
145
/**
146
* Transform the input RDD and return the output RDD for the next pipeline stage.
147
* @param context SparkExecutionPluginContext for this job
148
* @param input Input RDD to be transformed
149
* @return Transformed output RDD
150
*/
151
public abstract JavaRDD<OUT> transform(SparkExecutionPluginContext context, JavaRDD<IN> input) throws Exception;
152
}
153
```
154
155
**Usage Example:**
156
157
```java
158
import co.cask.cdap.etl.api.batch.SparkCompute;
159
import co.cask.cdap.etl.api.batch.SparkExecutionPluginContext;
160
import org.apache.spark.api.java.JavaRDD;
161
162
public class DataCleaner extends SparkCompute<RawRecord, CleanRecord> {
163
164
@Override
165
public void initialize(SparkExecutionPluginContext context) throws Exception {
166
// Initialize any required resources or configurations
167
this.config = getConfig(); // Assume config is available
168
}
169
170
@Override
171
public JavaRDD<CleanRecord> transform(SparkExecutionPluginContext context, JavaRDD<RawRecord> input) throws Exception {
172
return input
173
.filter(record -> record.isValid()) // Filter invalid records
174
.map(record -> cleanAndNormalize(record)) // Transform to clean records
175
.filter(record -> record != null); // Remove null results
176
}
177
178
private CleanRecord cleanAndNormalize(RawRecord raw) {
179
// Implementation for cleaning and normalizing data
180
return new CleanRecord(raw.getName().trim(), raw.getAge(), raw.getEmail().toLowerCase());
181
}
182
}
183
```
184
185
### SparkPluginContext
186
187
Context interface for Spark plugins during the preparation phase. Provides access to batch context capabilities and Spark configuration management.
188
189
```java { .api }
190
/**
191
* Context passed to spark plugin types during prepare run phase.
192
*/
193
@Beta
194
public interface SparkPluginContext extends BatchContext {
195
196
/**
197
* Sets a {@link SparkConf} to be used for the Spark execution.
198
*
199
* If your configuration will not change between pipeline runs,
200
* use {@link PipelineConfigurer#setPipelineProperties}
201
* instead. This method should only be used when you need different
202
* configuration settings for each run.
203
*
204
* Due to limitations in Spark Streaming, this method cannot be used
205
* in realtime data pipelines. Calling this method will throw an
206
* {@link UnsupportedOperationException} in realtime pipelines.
207
*
208
* @param sparkConf Spark configuration for the execution
209
* @throws UnsupportedOperationException in realtime data pipelines
210
*/
211
void setSparkConf(SparkConf sparkConf);
212
}
213
```
214
215
**Usage Example:**
216
217
```java
218
import co.cask.cdap.etl.api.batch.SparkPluginContext;
219
import org.apache.spark.SparkConf;
220
221
public class MySparkSink extends SparkSink<MyRecord> {
222
223
@Override
224
public void prepareRun(SparkPluginContext context) throws Exception {
225
// Configure Spark settings for this job
226
SparkConf sparkConf = new SparkConf()
227
.set("spark.executor.memory", "2g")
228
.set("spark.executor.cores", "2")
229
.set("spark.sql.adaptive.enabled", "true");
230
231
context.setSparkConf(sparkConf);
232
}
233
234
@Override
235
public void run(SparkExecutionPluginContext context, JavaRDD<MyRecord> input) throws Exception {
236
// Spark job implementation
237
}
238
}
239
```
240
241
## Plugin Types
242
243
The batch ETL package defines two plugin types that can be used in CDAP ETL pipelines:
244
245
- **sparksink**: For implementing data persistence operations using `SparkSink`
246
- **sparkcompute**: For implementing data transformation operations using `SparkCompute`
247
248
## Lifecycle Methods
249
250
Both SparkSink and SparkCompute inherit lifecycle methods from their parent classes:
251
252
- **configurePipeline()**: Called during pipeline configuration to add required datasets and streams
253
- **prepareRun()**: Called on the client side before the batch run starts (SparkSink only)
254
- **initialize()**: Called before any processing operations (SparkCompute only)
255
- **run()/transform()**: Called during Spark execution to perform the actual data processing
256
- **onRunFinish()**: Called on the client side after the batch run completes (SparkSink only)