0
# Execution Context
1
2
Rich context interfaces providing access to Spark execution environment, CDAP datasets, streams, and runtime configuration. Essential for implementing ETL operations that integrate with CDAP's data platform.
3
4
## Capabilities
5
6
### SparkExecutionPluginContext
7
8
Primary execution context interface for Spark operations. Provides comprehensive access to Spark runtime, CDAP datasets, streams, and platform services during ETL execution.
9
10
```java { .api }
11
/**
12
* Context passed to spark plugin types during execution.
13
*/
14
@Beta
15
public interface SparkExecutionPluginContext extends DatasetContext, TransformContext {
16
17
/**
18
* Returns the logical start time of the Batch Job.
19
* @return Time in milliseconds since epoch (January 1, 1970 UTC)
20
*/
21
@Override
22
long getLogicalStartTime();
23
24
/**
25
* Returns runtime arguments of the Batch Job.
26
* @return Map of runtime arguments
27
*/
28
Map<String, String> getRuntimeArguments();
29
30
/**
31
* Returns the JavaSparkContext used during execution.
32
* @return The Spark Context for RDD operations
33
*/
34
JavaSparkContext getSparkContext();
35
36
/**
37
* Returns a Serializable PluginContext for requesting plugin instances.
38
* Can be used in Spark program closures.
39
* @return Serializable PluginContext
40
*/
41
PluginContext getPluginContext();
42
43
/**
44
* Creates a new SparkInterpreter for Scala code compilation and interpretation.
45
* @return New SparkInterpreter instance
46
* @throws IOException If failed to create local directory for compiled classes
47
*/
48
SparkInterpreter createSparkInterpreter() throws IOException;
49
}
50
```
51
52
### Dataset Operations
53
54
Methods for reading from and writing to CDAP datasets using Spark RDDs.
55
56
```java { .api }
57
/**
58
* Creates a JavaPairRDD from the given Dataset.
59
* @param datasetName Name of the Dataset
60
* @param <K> Key type
61
* @param <V> Value type
62
* @return JavaPairRDD instance that reads from the Dataset
63
* @throws DatasetInstantiationException If the Dataset doesn't exist
64
*/
65
<K, V> JavaPairRDD<K, V> fromDataset(String datasetName);
66
67
/**
68
* Creates a JavaPairRDD from the given Dataset with arguments.
69
* @param datasetName Name of the Dataset
70
* @param arguments Dataset arguments
71
* @param <K> Key type
72
* @param <V> Value type
73
* @return JavaPairRDD instance that reads from the Dataset
74
* @throws DatasetInstantiationException If the Dataset doesn't exist
75
*/
76
<K, V> JavaPairRDD<K, V> fromDataset(String datasetName, Map<String, String> arguments);
77
78
/**
79
* Creates a JavaPairRDD from the given Dataset with custom splits.
80
* @param datasetName Name of the Dataset
81
* @param arguments Dataset arguments
82
* @param splits Custom list of splits, or null for default splits
83
* @param <K> Key type
84
* @param <V> Value type
85
* @return JavaPairRDD instance that reads from the Dataset
86
* @throws DatasetInstantiationException If the Dataset doesn't exist
87
*/
88
<K, V> JavaPairRDD<K, V> fromDataset(String datasetName, Map<String, String> arguments,
89
@Nullable Iterable<? extends Split> splits);
90
91
/**
92
* Saves the given JavaPairRDD to the given Dataset.
93
* @param rdd JavaPairRDD to be saved
94
* @param datasetName Name of the Dataset
95
* @param <K> Key type
96
* @param <V> Value type
97
* @throws DatasetInstantiationException If the Dataset doesn't exist
98
*/
99
<K, V> void saveAsDataset(JavaPairRDD<K, V> rdd, String datasetName);
100
101
/**
102
* Saves the given JavaPairRDD to the given Dataset with arguments.
103
* @param rdd JavaPairRDD to be saved
104
* @param datasetName Name of the Dataset
105
* @param arguments Dataset arguments
106
* @param <K> Key type
107
* @param <V> Value type
108
* @throws DatasetInstantiationException If the Dataset doesn't exist
109
*/
110
<K, V> void saveAsDataset(JavaPairRDD<K, V> rdd, String datasetName, Map<String, String> arguments);
111
```
112
113
**Dataset Usage Example:**
114
115
```java
116
import co.cask.cdap.etl.api.batch.SparkExecutionPluginContext;
117
import org.apache.spark.api.java.JavaPairRDD;
118
import scala.Tuple2;
119
120
public class DataProcessor {
121
122
public void processData(SparkExecutionPluginContext context) throws Exception {
123
// Read from input dataset
124
JavaPairRDD<String, UserRecord> inputData = context.fromDataset("user-input");
125
126
// Transform data
127
JavaPairRDD<String, UserRecord> processedData = inputData
128
.filter(tuple -> tuple._2().isActive())
129
.mapValues(user -> normalizeUser(user));
130
131
// Save to output dataset with custom arguments
132
Map<String, String> outputArgs = new HashMap<>();
133
outputArgs.put("compression", "snappy");
134
outputArgs.put("format", "parquet");
135
136
context.saveAsDataset(processedData, "user-output", outputArgs);
137
}
138
}
139
```
140
141
### Stream Operations
142
143
Methods for reading from CDAP streams with various decoding options.
144
145
```java { .api }
146
/**
147
* Creates a JavaRDD representing all events from the given stream.
148
* @param streamName Name of the stream
149
* @return JavaRDD of StreamEvent objects
150
* @throws DatasetInstantiationException If the Stream doesn't exist
151
*/
152
JavaRDD<StreamEvent> fromStream(String streamName);
153
154
/**
155
* Creates a JavaRDD representing events from the stream in a time range.
156
* @param streamName Name of the stream
157
* @param startTime Starting time in milliseconds (inclusive)
158
* @param endTime Ending time in milliseconds (exclusive)
159
* @return JavaRDD of StreamEvent objects
160
* @throws DatasetInstantiationException If the Stream doesn't exist
161
*/
162
JavaRDD<StreamEvent> fromStream(String streamName, long startTime, long endTime);
163
164
/**
165
* Creates a JavaPairRDD with timestamp keys and decoded stream bodies.
166
* Supports Text, String, and ByteWritable value types.
167
* @param streamName Name of the stream
168
* @param valueType Type of the stream body to decode to
169
* @param <V> Value type
170
* @return JavaPairRDD with timestamp keys and decoded values
171
* @throws DatasetInstantiationException If the Stream doesn't exist
172
*/
173
<V> JavaPairRDD<Long, V> fromStream(String streamName, Class<V> valueType);
174
175
/**
176
* Creates a JavaPairRDD with decoded stream events in a time range.
177
* @param streamName Name of the stream
178
* @param startTime Starting time in milliseconds (inclusive)
179
* @param endTime Ending time in milliseconds (exclusive)
180
* @param valueType Type of the stream body to decode to
181
* @param <V> Value type
182
* @return JavaPairRDD with timestamp keys and decoded values
183
* @throws DatasetInstantiationException If the Stream doesn't exist
184
*/
185
<V> JavaPairRDD<Long, V> fromStream(String streamName, long startTime, long endTime, Class<V> valueType);
186
187
/**
188
* Creates a JavaPairRDD with custom stream event decoding.
189
* @param streamName Name of the stream
190
* @param startTime Starting time in milliseconds (inclusive)
191
* @param endTime Ending time in milliseconds (exclusive)
192
* @param decoderClass StreamEventDecoder class for decoding events
193
* @param keyType Decoded key type
194
* @param valueType Decoded value type
195
* @param <K> Key type
196
* @param <V> Value type
197
* @return JavaPairRDD with custom decoded keys and values
198
* @throws DatasetInstantiationException If the Stream doesn't exist
199
*/
200
<K, V> JavaPairRDD<K, V> fromStream(String streamName, long startTime, long endTime,
201
Class<? extends StreamEventDecoder<K, V>> decoderClass,
202
Class<K> keyType, Class<V> valueType);
203
```
204
205
**Stream Usage Example:**
206
207
```java
208
import co.cask.cdap.etl.api.batch.SparkExecutionPluginContext;
209
import co.cask.cdap.api.flow.flowlet.StreamEvent;
210
import co.cask.cdap.api.stream.StreamEventDecoder;
211
import org.apache.spark.api.java.JavaRDD;
212
import org.apache.spark.api.java.JavaPairRDD;
213
import org.apache.hadoop.io.Text;
214
import scala.Tuple2;
215
216
public class StreamProcessor {
217
218
public void processStreamData(SparkExecutionPluginContext context) throws Exception {
219
long startTime = System.currentTimeMillis() - (24 * 60 * 60 * 1000); // 24 hours ago
220
long endTime = System.currentTimeMillis();
221
222
// Read raw stream events
223
JavaRDD<StreamEvent> rawEvents = context.fromStream("user-events", startTime, endTime);
224
225
// Read stream with Text decoding
226
JavaPairRDD<Long, Text> textEvents = context.fromStream("user-events", startTime, endTime, Text.class);
227
228
// Process stream data
229
JavaRDD<UserEvent> userEvents = rawEvents
230
.map(event -> parseUserEvent(event.getBody()))
231
.filter(event -> event != null);
232
233
// Convert to dataset format and save
234
JavaPairRDD<String, UserEvent> keyedEvents = userEvents
235
.mapToPair(event -> new Tuple2<>(event.getUserId(), event));
236
237
context.saveAsDataset(keyedEvents, "processed-events");
238
}
239
}
240
```
241
242
## Parent Context Interfaces
243
244
SparkExecutionPluginContext extends several parent interfaces that provide additional capabilities:
245
246
### DatasetContext
247
- Dataset instantiation and management
248
- Transaction support for dataset operations
249
250
### TransformContext
251
- Stage context with metrics, configuration, and schema access
252
- Lookup provider for reference data
253
- Lineage recording capabilities
254
- Service discovery and metadata operations
255
256
These inherited capabilities enable comprehensive integration with CDAP's data platform, including:
257
258
- **Metrics Collection**: Record custom metrics for monitoring and observability
259
- **Configuration Access**: Retrieve plugin properties and runtime arguments
260
- **Schema Management**: Access input/output schemas for type safety
261
- **Lineage Tracking**: Record data lineage for governance and compliance
262
- **Lookup Tables**: Access reference data for enrichment operations
263
- **Service Discovery**: Locate and interact with CDAP services
264
265
## Error Handling
266
267
Common exceptions thrown by context methods:
268
269
- **DatasetInstantiationException**: Dataset doesn't exist or cannot be instantiated
270
- **IOException**: I/O errors during SparkInterpreter creation
271
- **TransactionFailureException**: Transaction failures during dataset operations (inherited)
272
- **DatasetManagementException**: Dataset management errors (inherited)