0
# Streaming ETL Operations
1
2
Real-time data processing capabilities for Spark Streaming pipelines within CDAP. Provides interfaces and abstract classes for streaming sources and windowing operations with DStream support.
3
4
## Capabilities
5
6
### StreamingSource
7
8
Abstract class for implementing streaming data sources in CDAP ETL pipelines. StreamingSource creates and manages DStreams that provide continuous data input for real-time processing.
9
10
```java { .api }
11
/**
12
* Source for Spark Streaming pipelines.
13
* @param <T> Type of object contained in the stream
14
*/
15
@Beta
16
public abstract class StreamingSource<T> implements PipelineConfigurable, Serializable {
17
18
public static final String PLUGIN_TYPE = "streamingsource";
19
20
/**
21
* Get the DStream to read from for streaming processing.
22
* @param context The streaming context for this stage of the pipeline
23
* @return JavaDStream providing continuous data input
24
*/
25
public abstract JavaDStream<T> getStream(StreamingContext context) throws Exception;
26
27
/**
28
* Configure the ETL pipeline by adding required datasets and streams.
29
* @param pipelineConfigurer The configurer for adding datasets and streams
30
*/
31
@Override
32
public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException {
33
// Default no-op implementation
34
}
35
36
/**
37
* Get number of required executors for the streaming source.
38
* Override when the DStream is a union of multiple streams.
39
* @return Number of executors required (defaults to 1)
40
*/
41
public int getRequiredExecutors() {
42
return 1;
43
}
44
}
45
```
46
47
**Usage Example:**
48
49
```java
50
import co.cask.cdap.etl.api.streaming.StreamingSource;
51
import co.cask.cdap.etl.api.streaming.StreamingContext;
52
import co.cask.cdap.etl.api.PipelineConfigurer;
53
import org.apache.spark.streaming.api.java.JavaDStream;
54
import org.apache.spark.streaming.kafka.KafkaUtils;
55
import kafka.serializer.StringDecoder;
56
import java.util.HashMap;
57
import java.util.Map;
58
import java.util.Set;
59
import java.util.Collections;
60
61
public class KafkaStreamingSource extends StreamingSource<String> {
62
63
private KafkaConfig config;
64
65
@Override
66
public JavaDStream<String> getStream(StreamingContext context) throws Exception {
67
// Create Kafka DStream
68
Map<String, String> kafkaParams = new HashMap<>();
69
kafkaParams.put("metadata.broker.list", config.getBrokers());
70
kafkaParams.put("auto.offset.reset", "latest");
71
72
Set<String> topics = Collections.singleton(config.getTopic());
73
74
return KafkaUtils.createDirectStream(
75
context.getSparkStreamingContext(),
76
String.class,
77
String.class,
78
StringDecoder.class,
79
StringDecoder.class,
80
kafkaParams,
81
topics
82
).map(tuple -> tuple._2()); // Extract message value
83
}
84
85
@Override
86
public int getRequiredExecutors() {
87
return config.getPartitionCount(); // One executor per Kafka partition
88
}
89
}
90
```
91
92
### Windower
93
94
Abstract class for implementing time-based windowing operations in streaming pipelines. Windower defines window parameters for aggregating streaming data over time intervals.
95
96
```java { .api }
97
/**
98
* Windowing plugin for time-based data aggregation.
99
*/
100
@Beta
101
public abstract class Windower implements PipelineConfigurable, Serializable {
102
103
public static final String PLUGIN_TYPE = "windower";
104
105
/**
106
* Configure the ETL pipeline by adding required datasets and streams.
107
* @param pipelineConfigurer The configurer for adding datasets and streams
108
*/
109
@Override
110
public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException {
111
// Default no-op implementation
112
}
113
114
/**
115
* Get the width of the window in seconds.
116
* Must be a multiple of the underlying batch interval.
117
* @return Window width in seconds
118
*/
119
public abstract long getWidth();
120
121
/**
122
* Get the slide interval of the window in seconds.
123
* Must be a multiple of the underlying batch interval.
124
* @return Window slide interval in seconds
125
*/
126
public abstract long getSlideInterval();
127
}
128
```
129
130
**Usage Example:**
131
132
```java
133
import co.cask.cdap.etl.api.streaming.Windower;
134
135
public class HourlyWindower extends Windower {
136
137
@Override
138
public long getWidth() {
139
return 3600; // 1 hour window
140
}
141
142
@Override
143
public long getSlideInterval() {
144
return 300; // Slide every 5 minutes
145
}
146
}
147
148
public class TumblingWindowTenMinutes extends Windower {
149
150
@Override
151
public long getWidth() {
152
return 600; // 10 minute window
153
}
154
155
@Override
156
public long getSlideInterval() {
157
return 600; // Tumbling window (no overlap)
158
}
159
}
160
```
161
162
### StreamingContext
163
164
Context interface for streaming plugin stages. Provides access to Spark Streaming context, CDAP execution context, and lineage registration capabilities.
165
166
```java { .api }
167
/**
168
* Context for streaming plugin stages.
169
*/
170
@Beta
171
public interface StreamingContext extends StageContext, Transactional {
172
173
/**
174
* Get the Spark JavaStreamingContext for the pipeline.
175
* @return JavaStreamingContext for creating and managing DStreams
176
*/
177
JavaStreamingContext getSparkStreamingContext();
178
179
/**
180
* Get the CDAP JavaSparkExecutionContext for the pipeline.
181
* @return CDAP execution context for accessing datasets and services
182
*/
183
JavaSparkExecutionContext getSparkExecutionContext();
184
185
/**
186
* Register lineage for this Spark program using the given reference name.
187
* @param referenceName Reference name used for source
188
* @throws DatasetManagementException If error creating reference dataset
189
* @throws TransactionFailureException If error fetching dataset for usage registration
190
*/
191
void registerLineage(String referenceName) throws DatasetManagementException, TransactionFailureException;
192
}
193
```
194
195
**Usage Example:**
196
197
```java
198
import co.cask.cdap.etl.api.streaming.StreamingContext;
199
import org.apache.spark.streaming.api.java.JavaDStream;
200
201
public class FileStreamingSource extends StreamingSource<String> {
202
203
@Override
204
public JavaDStream<String> getStream(StreamingContext context) throws Exception {
205
// Register data lineage
206
context.registerLineage("file-input-source");
207
208
// Create file-based DStream
209
JavaStreamingContext jssc = context.getSparkStreamingContext();
210
return jssc.textFileStream("/path/to/streaming/files");
211
}
212
}
213
```
214
215
## Plugin Types
216
217
The streaming ETL package defines two plugin types for real-time processing:
218
219
- **streamingsource**: For implementing streaming data sources using `StreamingSource`
220
- **windower**: For implementing time-based windowing operations using `Windower`
221
222
## Window Constraints
223
224
Both window width and slide interval must be multiples of the underlying Spark Streaming batch interval. Common patterns:
225
226
- **Tumbling Windows**: Width equals slide interval (no overlap)
227
- **Sliding Windows**: Slide interval is smaller than width (overlapping windows)
228
- **Session Windows**: Not directly supported, requires custom logic in transformations
229
230
## Integration with Spark Streaming
231
232
StreamingSource integrates with Spark Streaming by:
233
234
1. Creating DStreams from external data sources (Kafka, files, sockets, etc.)
235
2. Managing executor requirements based on data source characteristics
236
3. Providing lifecycle hooks for resource management
237
4. Enabling lineage tracking for data governance
238
239
Windower integrates by providing temporal parameters that downstream operations can use for time-based aggregations and transformations.