or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-co-cask-cdap--cdap-etl-api-spark

Java API interfaces and abstract classes for developing Apache Spark-based ETL operations within the CDAP ecosystem

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/co.cask.cdap/cdap-etl-api-spark@5.1.x

To install, run

npx @tessl/cli install tessl/maven-co-cask-cdap--cdap-etl-api-spark@5.1.0

0

# CDAP ETL API Spark

1

2

CDAP ETL API Spark provides Java API interfaces and abstract classes for developing Apache Spark-based ETL (Extract, Transform, Load) operations within the CDAP (Cask Data Application Platform) ecosystem. It enables developers to create custom Spark transformations, sinks, and streaming components that integrate seamlessly with CDAP's data processing pipelines.

3

4

## Package Information

5

6

- **Package Name**: cdap-etl-api-spark

7

- **Package Type**: maven

8

- **Language**: Java

9

- **Group ID**: co.cask.cdap

10

- **Artifact ID**: cdap-etl-api-spark

11

- **Installation**:

12

13

Maven:

14

```xml

15

<dependency>

16

<groupId>co.cask.cdap</groupId>

17

<artifactId>cdap-etl-api-spark</artifactId>

18

<version>5.1.2</version>

19

</dependency>

20

```

21

22

Gradle:

23

```gradle

24

implementation 'co.cask.cdap:cdap-etl-api-spark:5.1.2'

25

```

26

27

## Core Imports

28

29

```java

30

import co.cask.cdap.etl.api.batch.SparkSink;

31

import co.cask.cdap.etl.api.batch.SparkCompute;

32

import co.cask.cdap.etl.api.batch.SparkPluginContext;

33

import co.cask.cdap.etl.api.batch.SparkExecutionPluginContext;

34

import co.cask.cdap.etl.api.streaming.StreamingContext;

35

import co.cask.cdap.etl.api.streaming.StreamingSource;

36

import co.cask.cdap.etl.api.streaming.Windower;

37

```

38

39

## Basic Usage

40

41

```java

42

import co.cask.cdap.etl.api.batch.SparkSink;

43

import co.cask.cdap.etl.api.batch.SparkPluginContext;

44

import co.cask.cdap.etl.api.batch.SparkExecutionPluginContext;

45

import co.cask.cdap.etl.api.PipelineConfigurer;

46

import org.apache.spark.api.java.JavaRDD;

47

import org.apache.spark.api.java.JavaPairRDD;

48

import scala.Tuple2;

49

50

// Example Spark sink implementation

51

public class MyDataSink extends SparkSink<MyRecord> {

52

53

@Override

54

public void configurePipeline(PipelineConfigurer pipelineConfigurer) {

55

// Configure pipeline datasets if needed

56

super.configurePipeline(pipelineConfigurer);

57

}

58

59

@Override

60

public void prepareRun(SparkPluginContext context) throws Exception {

61

// Prepare for batch run - configure Spark settings if needed

62

}

63

64

@Override

65

public void run(SparkExecutionPluginContext context, JavaRDD<MyRecord> input) throws Exception {

66

// Persist RDD data to storage

67

JavaPairRDD<String, MyRecord> keyedData = input.mapToPair(record ->

68

new Tuple2<>(record.getKey(), record));

69

context.saveAsDataset(keyedData, "output-dataset");

70

}

71

72

@Override

73

public void onRunFinish(boolean succeeded, SparkPluginContext context) {

74

// Cleanup after batch run completes

75

super.onRunFinish(succeeded, context);

76

}

77

}

78

```

79

80

## Architecture

81

82

CDAP ETL API Spark is built around several key components:

83

84

- **Batch Processing**: SparkSink and SparkCompute for batch ETL operations with RDD support

85

- **Streaming Processing**: StreamingSource and Windower for real-time data processing with DStream support

86

- **Context Management**: Rich context interfaces providing access to datasets, streams, metrics, and configuration

87

- **Plugin Framework**: Abstract base classes implementing CDAP's plugin lifecycle and configuration patterns

88

- **Integration Layer**: Seamless integration with CDAP's data platform, including lineage tracking and transactional operations

89

90

## Capabilities

91

92

### Batch ETL Operations

93

94

Core batch processing capabilities for Spark-based ETL pipelines. Provides abstract classes for implementing data sinks and compute transformations with full access to Spark RDDs and CDAP datasets.

95

96

```java { .api }

97

// Spark sink for data persistence

98

public abstract class SparkSink<IN> extends BatchConfigurable<SparkPluginContext> {

99

public abstract void run(SparkExecutionPluginContext context, JavaRDD<IN> input) throws Exception;

100

}

101

102

// Spark compute for data transformations

103

public abstract class SparkCompute<IN, OUT> implements PipelineConfigurable {

104

public abstract JavaRDD<OUT> transform(SparkExecutionPluginContext context, JavaRDD<IN> input) throws Exception;

105

public void initialize(SparkExecutionPluginContext context) throws Exception;

106

}

107

```

108

109

[Batch ETL Operations](./batch-etl.md)

110

111

### Streaming ETL Operations

112

113

Real-time data processing capabilities for Spark Streaming pipelines. Provides interfaces and abstract classes for streaming sources and windowing operations with DStream support.

114

115

```java { .api }

116

// Streaming source for real-time data

117

public abstract class StreamingSource<T> implements PipelineConfigurable {

118

public abstract JavaDStream<T> getStream(StreamingContext context) throws Exception;

119

public int getRequiredExecutors();

120

}

121

122

// Windowing for time-based aggregations

123

public abstract class Windower implements PipelineConfigurable {

124

public abstract long getWidth();

125

public abstract long getSlideInterval();

126

}

127

```

128

129

[Streaming ETL Operations](./streaming-etl.md)

130

131

### Execution Context

132

133

Rich context interfaces providing access to Spark execution environment, CDAP datasets, streams, and runtime configuration. Essential for implementing ETL operations that integrate with CDAP's data platform.

134

135

```java { .api }

136

// Primary execution context for Spark operations

137

public interface SparkExecutionPluginContext extends DatasetContext, TransformContext {

138

JavaSparkContext getSparkContext();

139

long getLogicalStartTime();

140

Map<String, String> getRuntimeArguments();

141

<K, V> JavaPairRDD<K, V> fromDataset(String datasetName);

142

<K, V> void saveAsDataset(JavaPairRDD<K, V> rdd, String datasetName);

143

JavaRDD<StreamEvent> fromStream(String streamName);

144

}

145

```

146

147

[Execution Context](./execution-context.md)

148

149

## Type Definitions

150

151

### Core Parent Types

152

153

```java { .api }

154

/**

155

* Base class for Batch run configuration methods.

156

* @param <T> batch execution context

157

*/

158

@Beta

159

public abstract class BatchConfigurable<T extends BatchContext> implements PipelineConfigurable, SubmitterLifecycle<T> {

160

161

@Override

162

public void configurePipeline(PipelineConfigurer pipelineConfigurer) {

163

// Default no-op implementation

164

}

165

166

/**

167

* Prepare the Batch run. Used to configure the job before starting the run.

168

* @param context batch execution context

169

* @throws Exception if there's an error during this method invocation

170

*/

171

@Override

172

public abstract void prepareRun(T context) throws Exception;

173

174

/**

175

* Invoked after the Batch run finishes. Used to perform any end of the run logic.

176

* @param succeeded defines the result of batch execution: true if run succeeded, false otherwise

177

* @param context batch execution context

178

*/

179

@Override

180

public void onRunFinish(boolean succeeded, T context) {

181

// Default no-op implementation

182

}

183

}

184

185

/**

186

* Context passed to Batch Source and Sink.

187

*/

188

@Beta

189

public interface BatchContext extends DatasetContext, TransformContext {

190

191

/**

192

* Create a new dataset instance.

193

* @param datasetName the name of the new dataset

194

* @param typeName the type of the dataset to create

195

* @param properties the properties for the new dataset

196

* @throws InstanceConflictException if the dataset already exists

197

* @throws DatasetManagementException for any issues encountered in the dataset system

198

*/

199

void createDataset(String datasetName, String typeName, DatasetProperties properties)

200

throws DatasetManagementException;

201

202

/**

203

* Check whether a dataset exists in the current namespace.

204

* @param datasetName the name of the dataset

205

* @return whether a dataset of that name exists

206

* @throws DatasetManagementException for any issues encountered in the dataset system

207

*/

208

boolean datasetExists(String datasetName) throws DatasetManagementException;

209

210

/**

211

* Returns settable pipeline arguments.

212

* @return settable pipeline arguments

213

*/

214

@Override

215

SettableArguments getArguments();

216

}

217

218

/**

219

* Interface for configuring ETL pipelines.

220

*/

221

public interface PipelineConfigurable {

222

223

/**

224

* Configure a pipeline.

225

* @param pipelineConfigurer the configurer used to add required datasets and streams

226

* @throws IllegalArgumentException if the given config is invalid

227

*/

228

void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException;

229

}

230

```