or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-etl.mdexecution-context.mdindex.mdstreaming-etl.md

batch-etl.mddocs/

0

# Batch ETL Operations

1

2

Core batch processing capabilities for Spark-based ETL pipelines within CDAP. Provides abstract classes for implementing data sinks and compute transformations with full access to Spark RDDs and CDAP datasets.

3

4

## Capabilities

5

6

### SparkSink

7

8

Abstract class for implementing the final stage of a batch ETL pipeline. SparkSink performs RDD operations and is responsible for persisting data to external storage systems or CDAP datasets.

9

10

```java { .api }

11

/**

12

* SparkSink composes a final, optional stage of a Batch ETL Pipeline. In addition to configuring the Batch run, it

13

* can also perform RDD operations on the key value pairs provided by the Batch run.

14

*

15

* {@link SparkSink#run} method is called inside the Batch Run while {@link SparkSink#prepareRun} and

16

* {@link SparkSink#onRunFinish} methods are called on the client side, which launches the Batch run, before the

17

* Batch run starts and after it finishes respectively.

18

*

19

* @param <IN> The type of input record to the SparkSink.

20

*/

21

@Beta

22

public abstract class SparkSink<IN> extends BatchConfigurable<SparkPluginContext> implements Serializable {

23

24

public static final String PLUGIN_TYPE = "sparksink";

25

26

private static final long serialVersionUID = -8600555200583639593L;

27

28

/**

29

* User Spark job which will be executed and is responsible for persisting any data.

30

* @param context {@link SparkExecutionPluginContext} for this job

31

* @param input the input from previous stages of the Batch run.

32

*/

33

public abstract void run(SparkExecutionPluginContext context, JavaRDD<IN> input) throws Exception;

34

35

// Inherited from BatchConfigurable<SparkPluginContext>:

36

37

/**

38

* Configure the ETL pipeline by adding required datasets and streams.

39

* @param pipelineConfigurer The configurer for adding datasets and streams

40

*/

41

@Override

42

public void configurePipeline(PipelineConfigurer pipelineConfigurer) {

43

// Default no-op implementation

44

}

45

46

/**

47

* Prepare the Batch run. Used to configure the job before starting the run.

48

* @param context batch execution context

49

* @throws Exception if there's an error during this method invocation

50

*/

51

@Override

52

public abstract void prepareRun(SparkPluginContext context) throws Exception;

53

54

/**

55

* Invoked after the Batch run finishes. Used to perform any end of the run logic.

56

* @param succeeded defines the result of batch execution: true if run succeeded, false otherwise

57

* @param context batch execution context

58

*/

59

@Override

60

public void onRunFinish(boolean succeeded, SparkPluginContext context) {

61

// Default no-op implementation

62

}

63

}

64

```

65

66

**Usage Example:**

67

68

```java

69

import co.cask.cdap.etl.api.batch.SparkSink;

70

import co.cask.cdap.etl.api.batch.SparkPluginContext;

71

import co.cask.cdap.etl.api.batch.SparkExecutionPluginContext;

72

import co.cask.cdap.etl.api.PipelineConfigurer;

73

import org.apache.spark.api.java.JavaRDD;

74

import org.apache.spark.api.java.JavaPairRDD;

75

import org.apache.spark.SparkConf;

76

import scala.Tuple2;

77

78

public class DatabaseSink extends SparkSink<UserRecord> {

79

80

@Override

81

public void configurePipeline(PipelineConfigurer pipelineConfigurer) {

82

// Configure any required datasets or streams

83

super.configurePipeline(pipelineConfigurer);

84

}

85

86

@Override

87

public void prepareRun(SparkPluginContext context) throws Exception {

88

// Configure Spark settings for this job

89

SparkConf sparkConf = new SparkConf()

90

.set("spark.executor.memory", "2g")

91

.set("spark.executor.cores", "2");

92

93

context.setSparkConf(sparkConf);

94

}

95

96

@Override

97

public void run(SparkExecutionPluginContext context, JavaRDD<UserRecord> input) throws Exception {

98

// Transform to key-value pairs and save to dataset

99

JavaPairRDD<String, UserRecord> keyedData = input.mapToPair(record ->

100

new Tuple2<>(record.getId(), record));

101

102

context.saveAsDataset(keyedData, "user-database");

103

}

104

105

@Override

106

public void onRunFinish(boolean succeeded, SparkPluginContext context) {

107

// Cleanup logic if needed

108

super.onRunFinish(succeeded, context);

109

}

110

}

111

```

112

113

### SparkCompute

114

115

Abstract class for implementing data transformation stages in a batch ETL pipeline. SparkCompute transforms input RDDs into output RDDs with different types and structures.

116

117

```java { .api }

118

/**

119

* Spark Compute stage for data transformations.

120

* @param <IN> Type of input object

121

* @param <OUT> Type of output object

122

*/

123

@Beta

124

public abstract class SparkCompute<IN, OUT> implements PipelineConfigurable, Serializable {

125

126

public static final String PLUGIN_TYPE = "sparkcompute";

127

128

/**

129

* Configure the ETL pipeline by adding required datasets and streams.

130

* @param pipelineConfigurer The configurer for adding datasets and streams

131

*/

132

@Override

133

public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException {

134

// Default no-op implementation

135

}

136

137

/**

138

* Initialize the plugin before any transform calls are made.

139

* @param context SparkExecutionPluginContext for this job

140

*/

141

public void initialize(SparkExecutionPluginContext context) throws Exception {

142

// Default no-op implementation

143

}

144

145

/**

146

* Transform the input RDD and return the output RDD for the next pipeline stage.

147

* @param context SparkExecutionPluginContext for this job

148

* @param input Input RDD to be transformed

149

* @return Transformed output RDD

150

*/

151

public abstract JavaRDD<OUT> transform(SparkExecutionPluginContext context, JavaRDD<IN> input) throws Exception;

152

}

153

```

154

155

**Usage Example:**

156

157

```java

158

import co.cask.cdap.etl.api.batch.SparkCompute;

159

import co.cask.cdap.etl.api.batch.SparkExecutionPluginContext;

160

import org.apache.spark.api.java.JavaRDD;

161

162

public class DataCleaner extends SparkCompute<RawRecord, CleanRecord> {

163

164

@Override

165

public void initialize(SparkExecutionPluginContext context) throws Exception {

166

// Initialize any required resources or configurations

167

this.config = getConfig(); // Assume config is available

168

}

169

170

@Override

171

public JavaRDD<CleanRecord> transform(SparkExecutionPluginContext context, JavaRDD<RawRecord> input) throws Exception {

172

return input

173

.filter(record -> record.isValid()) // Filter invalid records

174

.map(record -> cleanAndNormalize(record)) // Transform to clean records

175

.filter(record -> record != null); // Remove null results

176

}

177

178

private CleanRecord cleanAndNormalize(RawRecord raw) {

179

// Implementation for cleaning and normalizing data

180

return new CleanRecord(raw.getName().trim(), raw.getAge(), raw.getEmail().toLowerCase());

181

}

182

}

183

```

184

185

### SparkPluginContext

186

187

Context interface for Spark plugins during the preparation phase. Provides access to batch context capabilities and Spark configuration management.

188

189

```java { .api }

190

/**

191

* Context passed to spark plugin types during prepare run phase.

192

*/

193

@Beta

194

public interface SparkPluginContext extends BatchContext {

195

196

/**

197

* Sets a {@link SparkConf} to be used for the Spark execution.

198

*

199

* If your configuration will not change between pipeline runs,

200

* use {@link PipelineConfigurer#setPipelineProperties}

201

* instead. This method should only be used when you need different

202

* configuration settings for each run.

203

*

204

* Due to limitations in Spark Streaming, this method cannot be used

205

* in realtime data pipelines. Calling this method will throw an

206

* {@link UnsupportedOperationException} in realtime pipelines.

207

*

208

* @param sparkConf Spark configuration for the execution

209

* @throws UnsupportedOperationException in realtime data pipelines

210

*/

211

void setSparkConf(SparkConf sparkConf);

212

}

213

```

214

215

**Usage Example:**

216

217

```java

218

import co.cask.cdap.etl.api.batch.SparkPluginContext;

219

import org.apache.spark.SparkConf;

220

221

public class MySparkSink extends SparkSink<MyRecord> {

222

223

@Override

224

public void prepareRun(SparkPluginContext context) throws Exception {

225

// Configure Spark settings for this job

226

SparkConf sparkConf = new SparkConf()

227

.set("spark.executor.memory", "2g")

228

.set("spark.executor.cores", "2")

229

.set("spark.sql.adaptive.enabled", "true");

230

231

context.setSparkConf(sparkConf);

232

}

233

234

@Override

235

public void run(SparkExecutionPluginContext context, JavaRDD<MyRecord> input) throws Exception {

236

// Spark job implementation

237

}

238

}

239

```

240

241

## Plugin Types

242

243

The batch ETL package defines two plugin types that can be used in CDAP ETL pipelines:

244

245

- **sparksink**: For implementing data persistence operations using `SparkSink`

246

- **sparkcompute**: For implementing data transformation operations using `SparkCompute`

247

248

## Lifecycle Methods

249

250

Both SparkSink and SparkCompute inherit lifecycle methods from their parent classes:

251

252

- **configurePipeline()**: Called during pipeline configuration to add required datasets and streams

253

- **prepareRun()**: Called on the client side before the batch run starts (SparkSink only)

254

- **initialize()**: Called before any processing operations (SparkCompute only)

255

- **run()/transform()**: Called during Spark execution to perform the actual data processing

256

- **onRunFinish()**: Called on the client side after the batch run completes (SparkSink only)