or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-etl.mdexecution-context.mdindex.mdstreaming-etl.md

streaming-etl.mddocs/

0

# Streaming ETL Operations

1

2

Real-time data processing capabilities for Spark Streaming pipelines within CDAP. Provides interfaces and abstract classes for streaming sources and windowing operations with DStream support.

3

4

## Capabilities

5

6

### StreamingSource

7

8

Abstract class for implementing streaming data sources in CDAP ETL pipelines. StreamingSource creates and manages DStreams that provide continuous data input for real-time processing.

9

10

```java { .api }

11

/**

12

* Source for Spark Streaming pipelines.

13

* @param <T> Type of object contained in the stream

14

*/

15

@Beta

16

public abstract class StreamingSource<T> implements PipelineConfigurable, Serializable {

17

18

public static final String PLUGIN_TYPE = "streamingsource";

19

20

/**

21

* Get the DStream to read from for streaming processing.

22

* @param context The streaming context for this stage of the pipeline

23

* @return JavaDStream providing continuous data input

24

*/

25

public abstract JavaDStream<T> getStream(StreamingContext context) throws Exception;

26

27

/**

28

* Configure the ETL pipeline by adding required datasets and streams.

29

* @param pipelineConfigurer The configurer for adding datasets and streams

30

*/

31

@Override

32

public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException {

33

// Default no-op implementation

34

}

35

36

/**

37

* Get number of required executors for the streaming source.

38

* Override when the DStream is a union of multiple streams.

39

* @return Number of executors required (defaults to 1)

40

*/

41

public int getRequiredExecutors() {

42

return 1;

43

}

44

}

45

```

46

47

**Usage Example:**

48

49

```java

50

import co.cask.cdap.etl.api.streaming.StreamingSource;

51

import co.cask.cdap.etl.api.streaming.StreamingContext;

52

import co.cask.cdap.etl.api.PipelineConfigurer;

53

import org.apache.spark.streaming.api.java.JavaDStream;

54

import org.apache.spark.streaming.kafka.KafkaUtils;

55

import kafka.serializer.StringDecoder;

56

import java.util.HashMap;

57

import java.util.Map;

58

import java.util.Set;

59

import java.util.Collections;

60

61

public class KafkaStreamingSource extends StreamingSource<String> {

62

63

private KafkaConfig config;

64

65

@Override

66

public JavaDStream<String> getStream(StreamingContext context) throws Exception {

67

// Create Kafka DStream

68

Map<String, String> kafkaParams = new HashMap<>();

69

kafkaParams.put("metadata.broker.list", config.getBrokers());

70

kafkaParams.put("auto.offset.reset", "latest");

71

72

Set<String> topics = Collections.singleton(config.getTopic());

73

74

return KafkaUtils.createDirectStream(

75

context.getSparkStreamingContext(),

76

String.class,

77

String.class,

78

StringDecoder.class,

79

StringDecoder.class,

80

kafkaParams,

81

topics

82

).map(tuple -> tuple._2()); // Extract message value

83

}

84

85

@Override

86

public int getRequiredExecutors() {

87

return config.getPartitionCount(); // One executor per Kafka partition

88

}

89

}

90

```

91

92

### Windower

93

94

Abstract class for implementing time-based windowing operations in streaming pipelines. Windower defines window parameters for aggregating streaming data over time intervals.

95

96

```java { .api }

97

/**

98

* Windowing plugin for time-based data aggregation.

99

*/

100

@Beta

101

public abstract class Windower implements PipelineConfigurable, Serializable {

102

103

public static final String PLUGIN_TYPE = "windower";

104

105

/**

106

* Configure the ETL pipeline by adding required datasets and streams.

107

* @param pipelineConfigurer The configurer for adding datasets and streams

108

*/

109

@Override

110

public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException {

111

// Default no-op implementation

112

}

113

114

/**

115

* Get the width of the window in seconds.

116

* Must be a multiple of the underlying batch interval.

117

* @return Window width in seconds

118

*/

119

public abstract long getWidth();

120

121

/**

122

* Get the slide interval of the window in seconds.

123

* Must be a multiple of the underlying batch interval.

124

* @return Window slide interval in seconds

125

*/

126

public abstract long getSlideInterval();

127

}

128

```

129

130

**Usage Example:**

131

132

```java

133

import co.cask.cdap.etl.api.streaming.Windower;

134

135

public class HourlyWindower extends Windower {

136

137

@Override

138

public long getWidth() {

139

return 3600; // 1 hour window

140

}

141

142

@Override

143

public long getSlideInterval() {

144

return 300; // Slide every 5 minutes

145

}

146

}

147

148

public class TumblingWindowTenMinutes extends Windower {

149

150

@Override

151

public long getWidth() {

152

return 600; // 10 minute window

153

}

154

155

@Override

156

public long getSlideInterval() {

157

return 600; // Tumbling window (no overlap)

158

}

159

}

160

```

161

162

### StreamingContext

163

164

Context interface for streaming plugin stages. Provides access to Spark Streaming context, CDAP execution context, and lineage registration capabilities.

165

166

```java { .api }

167

/**

168

* Context for streaming plugin stages.

169

*/

170

@Beta

171

public interface StreamingContext extends StageContext, Transactional {

172

173

/**

174

* Get the Spark JavaStreamingContext for the pipeline.

175

* @return JavaStreamingContext for creating and managing DStreams

176

*/

177

JavaStreamingContext getSparkStreamingContext();

178

179

/**

180

* Get the CDAP JavaSparkExecutionContext for the pipeline.

181

* @return CDAP execution context for accessing datasets and services

182

*/

183

JavaSparkExecutionContext getSparkExecutionContext();

184

185

/**

186

* Register lineage for this Spark program using the given reference name.

187

* @param referenceName Reference name used for source

188

* @throws DatasetManagementException If error creating reference dataset

189

* @throws TransactionFailureException If error fetching dataset for usage registration

190

*/

191

void registerLineage(String referenceName) throws DatasetManagementException, TransactionFailureException;

192

}

193

```

194

195

**Usage Example:**

196

197

```java

198

import co.cask.cdap.etl.api.streaming.StreamingContext;

199

import org.apache.spark.streaming.api.java.JavaDStream;

200

201

public class FileStreamingSource extends StreamingSource<String> {

202

203

@Override

204

public JavaDStream<String> getStream(StreamingContext context) throws Exception {

205

// Register data lineage

206

context.registerLineage("file-input-source");

207

208

// Create file-based DStream

209

JavaStreamingContext jssc = context.getSparkStreamingContext();

210

return jssc.textFileStream("/path/to/streaming/files");

211

}

212

}

213

```

214

215

## Plugin Types

216

217

The streaming ETL package defines two plugin types for real-time processing:

218

219

- **streamingsource**: For implementing streaming data sources using `StreamingSource`

220

- **windower**: For implementing time-based windowing operations using `Windower`

221

222

## Window Constraints

223

224

Both window width and slide interval must be multiples of the underlying Spark Streaming batch interval. Common patterns:

225

226

- **Tumbling Windows**: Width equals slide interval (no overlap)

227

- **Sliding Windows**: Slide interval is smaller than width (overlapping windows)

228

- **Session Windows**: Not directly supported, requires custom logic in transformations

229

230

## Integration with Spark Streaming

231

232

StreamingSource integrates with Spark Streaming by:

233

234

1. Creating DStreams from external data sources (Kafka, files, sockets, etc.)

235

2. Managing executor requirements based on data source characteristics

236

3. Providing lifecycle hooks for resource management

237

4. Enabling lineage tracking for data governance

238

239

Windower integrates by providing temporal parameters that downstream operations can use for time-based aggregations and transformations.