or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-etl.mdexecution-context.mdindex.mdstreaming-etl.md

execution-context.mddocs/

0

# Execution Context

1

2

Rich context interfaces providing access to Spark execution environment, CDAP datasets, streams, and runtime configuration. Essential for implementing ETL operations that integrate with CDAP's data platform.

3

4

## Capabilities

5

6

### SparkExecutionPluginContext

7

8

Primary execution context interface for Spark operations. Provides comprehensive access to Spark runtime, CDAP datasets, streams, and platform services during ETL execution.

9

10

```java { .api }

11

/**

12

* Context passed to spark plugin types during execution.

13

*/

14

@Beta

15

public interface SparkExecutionPluginContext extends DatasetContext, TransformContext {

16

17

/**

18

* Returns the logical start time of the Batch Job.

19

* @return Time in milliseconds since epoch (January 1, 1970 UTC)

20

*/

21

@Override

22

long getLogicalStartTime();

23

24

/**

25

* Returns runtime arguments of the Batch Job.

26

* @return Map of runtime arguments

27

*/

28

Map<String, String> getRuntimeArguments();

29

30

/**

31

* Returns the JavaSparkContext used during execution.

32

* @return The Spark Context for RDD operations

33

*/

34

JavaSparkContext getSparkContext();

35

36

/**

37

* Returns a Serializable PluginContext for requesting plugin instances.

38

* Can be used in Spark program closures.

39

* @return Serializable PluginContext

40

*/

41

PluginContext getPluginContext();

42

43

/**

44

* Creates a new SparkInterpreter for Scala code compilation and interpretation.

45

* @return New SparkInterpreter instance

46

* @throws IOException If failed to create local directory for compiled classes

47

*/

48

SparkInterpreter createSparkInterpreter() throws IOException;

49

}

50

```

51

52

### Dataset Operations

53

54

Methods for reading from and writing to CDAP datasets using Spark RDDs.

55

56

```java { .api }

57

/**

58

* Creates a JavaPairRDD from the given Dataset.

59

* @param datasetName Name of the Dataset

60

* @param <K> Key type

61

* @param <V> Value type

62

* @return JavaPairRDD instance that reads from the Dataset

63

* @throws DatasetInstantiationException If the Dataset doesn't exist

64

*/

65

<K, V> JavaPairRDD<K, V> fromDataset(String datasetName);

66

67

/**

68

* Creates a JavaPairRDD from the given Dataset with arguments.

69

* @param datasetName Name of the Dataset

70

* @param arguments Dataset arguments

71

* @param <K> Key type

72

* @param <V> Value type

73

* @return JavaPairRDD instance that reads from the Dataset

74

* @throws DatasetInstantiationException If the Dataset doesn't exist

75

*/

76

<K, V> JavaPairRDD<K, V> fromDataset(String datasetName, Map<String, String> arguments);

77

78

/**

79

* Creates a JavaPairRDD from the given Dataset with custom splits.

80

* @param datasetName Name of the Dataset

81

* @param arguments Dataset arguments

82

* @param splits Custom list of splits, or null for default splits

83

* @param <K> Key type

84

* @param <V> Value type

85

* @return JavaPairRDD instance that reads from the Dataset

86

* @throws DatasetInstantiationException If the Dataset doesn't exist

87

*/

88

<K, V> JavaPairRDD<K, V> fromDataset(String datasetName, Map<String, String> arguments,

89

@Nullable Iterable<? extends Split> splits);

90

91

/**

92

* Saves the given JavaPairRDD to the given Dataset.

93

* @param rdd JavaPairRDD to be saved

94

* @param datasetName Name of the Dataset

95

* @param <K> Key type

96

* @param <V> Value type

97

* @throws DatasetInstantiationException If the Dataset doesn't exist

98

*/

99

<K, V> void saveAsDataset(JavaPairRDD<K, V> rdd, String datasetName);

100

101

/**

102

* Saves the given JavaPairRDD to the given Dataset with arguments.

103

* @param rdd JavaPairRDD to be saved

104

* @param datasetName Name of the Dataset

105

* @param arguments Dataset arguments

106

* @param <K> Key type

107

* @param <V> Value type

108

* @throws DatasetInstantiationException If the Dataset doesn't exist

109

*/

110

<K, V> void saveAsDataset(JavaPairRDD<K, V> rdd, String datasetName, Map<String, String> arguments);

111

```

112

113

**Dataset Usage Example:**

114

115

```java

116

import co.cask.cdap.etl.api.batch.SparkExecutionPluginContext;

117

import org.apache.spark.api.java.JavaPairRDD;

118

import scala.Tuple2;

119

120

public class DataProcessor {

121

122

public void processData(SparkExecutionPluginContext context) throws Exception {

123

// Read from input dataset

124

JavaPairRDD<String, UserRecord> inputData = context.fromDataset("user-input");

125

126

// Transform data

127

JavaPairRDD<String, UserRecord> processedData = inputData

128

.filter(tuple -> tuple._2().isActive())

129

.mapValues(user -> normalizeUser(user));

130

131

// Save to output dataset with custom arguments

132

Map<String, String> outputArgs = new HashMap<>();

133

outputArgs.put("compression", "snappy");

134

outputArgs.put("format", "parquet");

135

136

context.saveAsDataset(processedData, "user-output", outputArgs);

137

}

138

}

139

```

140

141

### Stream Operations

142

143

Methods for reading from CDAP streams with various decoding options.

144

145

```java { .api }

146

/**

147

* Creates a JavaRDD representing all events from the given stream.

148

* @param streamName Name of the stream

149

* @return JavaRDD of StreamEvent objects

150

* @throws DatasetInstantiationException If the Stream doesn't exist

151

*/

152

JavaRDD<StreamEvent> fromStream(String streamName);

153

154

/**

155

* Creates a JavaRDD representing events from the stream in a time range.

156

* @param streamName Name of the stream

157

* @param startTime Starting time in milliseconds (inclusive)

158

* @param endTime Ending time in milliseconds (exclusive)

159

* @return JavaRDD of StreamEvent objects

160

* @throws DatasetInstantiationException If the Stream doesn't exist

161

*/

162

JavaRDD<StreamEvent> fromStream(String streamName, long startTime, long endTime);

163

164

/**

165

* Creates a JavaPairRDD with timestamp keys and decoded stream bodies.

166

* Supports Text, String, and ByteWritable value types.

167

* @param streamName Name of the stream

168

* @param valueType Type of the stream body to decode to

169

* @param <V> Value type

170

* @return JavaPairRDD with timestamp keys and decoded values

171

* @throws DatasetInstantiationException If the Stream doesn't exist

172

*/

173

<V> JavaPairRDD<Long, V> fromStream(String streamName, Class<V> valueType);

174

175

/**

176

* Creates a JavaPairRDD with decoded stream events in a time range.

177

* @param streamName Name of the stream

178

* @param startTime Starting time in milliseconds (inclusive)

179

* @param endTime Ending time in milliseconds (exclusive)

180

* @param valueType Type of the stream body to decode to

181

* @param <V> Value type

182

* @return JavaPairRDD with timestamp keys and decoded values

183

* @throws DatasetInstantiationException If the Stream doesn't exist

184

*/

185

<V> JavaPairRDD<Long, V> fromStream(String streamName, long startTime, long endTime, Class<V> valueType);

186

187

/**

188

* Creates a JavaPairRDD with custom stream event decoding.

189

* @param streamName Name of the stream

190

* @param startTime Starting time in milliseconds (inclusive)

191

* @param endTime Ending time in milliseconds (exclusive)

192

* @param decoderClass StreamEventDecoder class for decoding events

193

* @param keyType Decoded key type

194

* @param valueType Decoded value type

195

* @param <K> Key type

196

* @param <V> Value type

197

* @return JavaPairRDD with custom decoded keys and values

198

* @throws DatasetInstantiationException If the Stream doesn't exist

199

*/

200

<K, V> JavaPairRDD<K, V> fromStream(String streamName, long startTime, long endTime,

201

Class<? extends StreamEventDecoder<K, V>> decoderClass,

202

Class<K> keyType, Class<V> valueType);

203

```

204

205

**Stream Usage Example:**

206

207

```java

208

import co.cask.cdap.etl.api.batch.SparkExecutionPluginContext;

209

import co.cask.cdap.api.flow.flowlet.StreamEvent;

210

import co.cask.cdap.api.stream.StreamEventDecoder;

211

import org.apache.spark.api.java.JavaRDD;

212

import org.apache.spark.api.java.JavaPairRDD;

213

import org.apache.hadoop.io.Text;

214

import scala.Tuple2;

215

216

public class StreamProcessor {

217

218

public void processStreamData(SparkExecutionPluginContext context) throws Exception {

219

long startTime = System.currentTimeMillis() - (24 * 60 * 60 * 1000); // 24 hours ago

220

long endTime = System.currentTimeMillis();

221

222

// Read raw stream events

223

JavaRDD<StreamEvent> rawEvents = context.fromStream("user-events", startTime, endTime);

224

225

// Read stream with Text decoding

226

JavaPairRDD<Long, Text> textEvents = context.fromStream("user-events", startTime, endTime, Text.class);

227

228

// Process stream data

229

JavaRDD<UserEvent> userEvents = rawEvents

230

.map(event -> parseUserEvent(event.getBody()))

231

.filter(event -> event != null);

232

233

// Convert to dataset format and save

234

JavaPairRDD<String, UserEvent> keyedEvents = userEvents

235

.mapToPair(event -> new Tuple2<>(event.getUserId(), event));

236

237

context.saveAsDataset(keyedEvents, "processed-events");

238

}

239

}

240

```

241

242

## Parent Context Interfaces

243

244

SparkExecutionPluginContext extends several parent interfaces that provide additional capabilities:

245

246

### DatasetContext

247

- Dataset instantiation and management

248

- Transaction support for dataset operations

249

250

### TransformContext

251

- Stage context with metrics, configuration, and schema access

252

- Lookup provider for reference data

253

- Lineage recording capabilities

254

- Service discovery and metadata operations

255

256

These inherited capabilities enable comprehensive integration with CDAP's data platform, including:

257

258

- **Metrics Collection**: Record custom metrics for monitoring and observability

259

- **Configuration Access**: Retrieve plugin properties and runtime arguments

260

- **Schema Management**: Access input/output schemas for type safety

261

- **Lineage Tracking**: Record data lineage for governance and compliance

262

- **Lookup Tables**: Access reference data for enrichment operations

263

- **Service Discovery**: Locate and interact with CDAP services

264

265

## Error Handling

266

267

Common exceptions thrown by context methods:

268

269

- **DatasetInstantiationException**: Dataset doesn't exist or cannot be instantiated

270

- **IOException**: I/O errors during SparkInterpreter creation

271

- **TransactionFailureException**: Transaction failures during dataset operations (inherited)

272

- **DatasetManagementException**: Dataset management errors (inherited)