0
# Data Sources and Stream Creation
1
2
The StreamExecutionEnvironment provides comprehensive functionality for creating DataStreams from various sources, including collections, files, sockets, and custom source functions.
3
4
## Collection-Based Sources
5
6
### From Elements
7
8
```scala { .api }
9
class StreamExecutionEnvironment {
10
def fromElements[T: TypeInformation](data: T*): DataStream[T]
11
}
12
```
13
14
Create a stream from individual elements:
15
16
```scala
17
val env = StreamExecutionEnvironment.getExecutionEnvironment
18
19
// Create stream from individual elements
20
val numbers = env.fromElements(1, 2, 3, 4, 5)
21
val strings = env.fromElements("apple", "banana", "cherry")
22
val tuples = env.fromElements(("Alice", 25), ("Bob", 30), ("Charlie", 35))
23
```
24
25
### From Collection
26
27
```scala { .api }
28
class StreamExecutionEnvironment {
29
def fromCollection[T: TypeInformation](data: Seq[T]): DataStream[T]
30
def fromCollection[T: TypeInformation](data: Iterator[T]): DataStream[T]
31
}
32
```
33
34
Create streams from Scala collections:
35
36
```scala
37
val env = StreamExecutionEnvironment.getExecutionEnvironment
38
39
// From Scala collections
40
val listData = List(1, 2, 3, 4, 5)
41
val stream1 = env.fromCollection(listData)
42
43
val vectorData = Vector("a", "b", "c")
44
val stream2 = env.fromCollection(vectorData)
45
46
// From iterator
47
val iteratorData = (1 to 1000).iterator
48
val stream3 = env.fromCollection(iteratorData)
49
```
50
51
### From Parallel Collection
52
53
```scala { .api }
54
class StreamExecutionEnvironment {
55
def fromParallelCollection[T: TypeInformation](data: SplittableIterator[T]): DataStream[T]
56
}
57
```
58
59
Create parallel streams from splittable data:
60
61
```scala
62
import org.apache.flink.util.SplittableIterator
63
64
val env = StreamExecutionEnvironment.getExecutionEnvironment
65
66
// Custom splittable iterator for parallel processing
67
class NumberSplittableIterator(from: Int, to: Int) extends SplittableIterator[Int] {
68
private var current = from
69
70
override def hasNext: Boolean = current <= to
71
override def next(): Int = { val result = current; current += 1; result }
72
override def split(numPartitions: Int): Array[SplittableIterator[Int]] = {
73
val range = (to - from + 1) / numPartitions
74
(0 until numPartitions).map { i =>
75
val start = from + i * range
76
val end = if (i == numPartitions - 1) to else start + range - 1
77
new NumberSplittableIterator(start, end)
78
}.toArray
79
}
80
override def getMaximumNumberOfSplits: Int = to - from + 1
81
}
82
83
val parallelStream = env.fromParallelCollection(new NumberSplittableIterator(1, 10000))
84
```
85
86
## Sequence Generation
87
88
```scala { .api }
89
class StreamExecutionEnvironment {
90
def generateSequence(from: Long, to: Long): DataStream[Long]
91
}
92
```
93
94
Generate numeric sequences:
95
96
```scala
97
val env = StreamExecutionEnvironment.getExecutionEnvironment
98
99
// Generate sequence from 1 to 1000
100
val sequence = env.generateSequence(1, 1000)
101
102
// Process the sequence
103
sequence
104
.filter(_ % 2 == 0)
105
.map(_ * 2)
106
.print()
107
```
108
109
## File-Based Sources
110
111
### Text File Reading
112
113
```scala { .api }
114
class StreamExecutionEnvironment {
115
def readTextFile(filePath: String): DataStream[String]
116
def readTextFile(filePath: String, charsetName: String): DataStream[String]
117
}
118
```
119
120
Read text files line by line:
121
122
```scala
123
val env = StreamExecutionEnvironment.getExecutionEnvironment
124
125
// Read text file with default charset (UTF-8)
126
val textStream = env.readTextFile("/path/to/input.txt")
127
128
// Read with specific charset
129
val textStreamLatin1 = env.readTextFile("/path/to/input.txt", "ISO-8859-1")
130
131
// Process text data
132
textStream
133
.flatMap(_.split("\\s+"))
134
.map(word => (word, 1))
135
.keyBy(0)
136
.sum(1)
137
.print()
138
```
139
140
### Custom File Formats
141
142
```scala { .api }
143
class StreamExecutionEnvironment {
144
def readFile[T: TypeInformation](inputFormat: FileInputFormat[T], filePath: String): DataStream[T]
145
def readFile[T: TypeInformation](
146
inputFormat: FileInputFormat[T],
147
filePath: String,
148
watchType: FileProcessingMode,
149
interval: Long
150
): DataStream[T]
151
}
152
```
153
154
Read files with custom input formats:
155
156
```scala
157
import org.apache.flink.api.common.io.FileInputFormat
158
import org.apache.flink.streaming.api.functions.source.FileProcessingMode
159
160
val env = StreamExecutionEnvironment.getExecutionEnvironment
161
162
// Custom CSV input format example
163
class CsvInputFormat extends FileInputFormat[String] {
164
// Implementation details...
165
}
166
167
// Read with custom format
168
val csvStream = env.readFile(new CsvInputFormat(), "/path/to/data.csv")
169
170
// Monitor file changes (for streaming file ingestion)
171
val monitoredStream = env.readFile(
172
new CsvInputFormat(),
173
"/path/to/data/",
174
FileProcessingMode.PROCESS_CONTINUOUSLY,
175
1000 // Check every 1000ms
176
)
177
```
178
179
### File Stream Monitoring (Deprecated)
180
181
```scala { .api }
182
class StreamExecutionEnvironment {
183
def readFileStream(streamPath: String, intervalMillis: Long, watchType: FileMonitoringFunction.WatchType): DataStream[String]
184
}
185
```
186
187
## Socket-Based Sources
188
189
```scala { .api }
190
class StreamExecutionEnvironment {
191
def socketTextStream(hostname: String, port: Int): DataStream[String]
192
def socketTextStream(hostname: String, port: Int, delimiter: Char): DataStream[String]
193
def socketTextStream(hostname: String, port: Int, delimiter: Char, maxRetry: Long): DataStream[String]
194
}
195
```
196
197
Connect to socket sources for real-time data:
198
199
```scala
200
val env = StreamExecutionEnvironment.getExecutionEnvironment
201
202
// Connect to socket with default settings
203
val socketStream = env.socketTextStream("localhost", 9999)
204
205
// With custom delimiter and retry settings
206
val socketStreamCustom = env.socketTextStream("localhost", 9999, '\n', 5)
207
208
// Process socket data
209
socketStream
210
.flatMap(_.toLowerCase.split("\\W+"))
211
.filter(_.nonEmpty)
212
.map((_, 1))
213
.keyBy(0)
214
.sum(1)
215
.print()
216
```
217
218
## Custom Source Functions
219
220
### Simple Source Function
221
222
```scala { .api }
223
class StreamExecutionEnvironment {
224
def addSource[T: TypeInformation](function: SourceFunction[T]): DataStream[T]
225
def addSource[T: TypeInformation](function: SourceContext[T] => Unit): DataStream[T]
226
}
227
```
228
229
Create custom source functions:
230
231
```scala
232
import org.apache.flink.streaming.api.functions.source.{SourceFunction, SourceContext}
233
234
// Custom source function class
235
class NumberGeneratorSource extends SourceFunction[Int] {
236
@volatile private var isRunning = true
237
238
override def run(ctx: SourceContext[Int]): Unit = {
239
var counter = 0
240
while (isRunning && counter < 1000) {
241
ctx.collect(counter)
242
counter += 1
243
Thread.sleep(100) // Emit every 100ms
244
}
245
}
246
247
override def cancel(): Unit = {
248
isRunning = false
249
}
250
}
251
252
val env = StreamExecutionEnvironment.getExecutionEnvironment
253
254
// Use custom source function
255
val customStream = env.addSource(new NumberGeneratorSource)
256
257
// Lambda-based source function
258
val lambdaStream = env.addSource { ctx =>
259
for (i <- 1 to 100) {
260
ctx.collect(s"Message $i")
261
Thread.sleep(1000)
262
}
263
}
264
```
265
266
### Rich Source Function
267
268
```scala
269
import org.apache.flink.streaming.api.functions.source.RichSourceFunction
270
import org.apache.flink.configuration.Configuration
271
272
class ConfigurableSource extends RichSourceFunction[String] {
273
private var isRunning = true
274
private var config: String = _
275
276
override def open(parameters: Configuration): Unit = {
277
// Initialize with runtime context
278
config = getRuntimeContext.getJobParameter("source.config", "default")
279
}
280
281
override def run(ctx: SourceContext[String]): Unit = {
282
while (isRunning) {
283
ctx.collect(s"Data from $config")
284
Thread.sleep(1000)
285
}
286
}
287
288
override def cancel(): Unit = {
289
isRunning = false
290
}
291
}
292
```
293
294
## Input Format Sources
295
296
```scala { .api }
297
class StreamExecutionEnvironment {
298
def createInput[T: TypeInformation](inputFormat: InputFormat[T, _]): DataStream[T]
299
}
300
```
301
302
Create streams from custom input formats:
303
304
```scala
305
import org.apache.flink.api.common.io.InputFormat
306
307
// Custom input format implementation
308
class DatabaseInputFormat extends InputFormat[MyRecord, DatabaseInputSplit] {
309
// Implementation details...
310
}
311
312
val env = StreamExecutionEnvironment.getExecutionEnvironment
313
val dbStream = env.createInput(new DatabaseInputFormat)
314
```
315
316
## Source Configuration and Best Practices
317
318
### Parallelism Control
319
320
```scala
321
val env = StreamExecutionEnvironment.getExecutionEnvironment
322
323
// Set specific parallelism for sources
324
val source = env.addSource(new CustomSource)
325
.setParallelism(4) // 4 parallel source instances
326
327
// Some sources don't support parallelism > 1
328
val socketSource = env.socketTextStream("localhost", 9999)
329
.setParallelism(1) // Socket sources typically need parallelism 1
330
```
331
332
### Source Watermark Assignment
333
334
```scala
335
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
336
import org.apache.flink.streaming.api.windowing.time.Time
337
338
val env = StreamExecutionEnvironment.getExecutionEnvironment
339
340
// Create source with timestamp and watermark assignment
341
val timestampedStream = env.addSource(new TimestampedSource)
342
.assignTimestampsAndWatermarks(
343
new BoundedOutOfOrdernessTimestampExtractor[MyEvent](Time.seconds(5)) {
344
override def extractTimestamp(element: MyEvent): Long = element.timestamp
345
}
346
)
347
```
348
349
## Complete Example: Multi-Source Application
350
351
```scala
352
import org.apache.flink.streaming.api.scala._
353
import org.apache.flink.streaming.api.windowing.time.Time
354
355
object MultiSourceExample {
356
def main(args: Array[String]): Unit = {
357
val env = StreamExecutionEnvironment.getExecutionEnvironment
358
359
// Static data source
360
val staticData = env.fromElements(
361
("user1", "login"),
362
("user2", "logout"),
363
("user3", "login")
364
)
365
366
// File-based source
367
val fileData = env.readTextFile("/path/to/logs.txt")
368
.map(line => {
369
val parts = line.split(",")
370
(parts(0), parts(1))
371
})
372
373
// Socket-based real-time source
374
val realtimeData = env.socketTextStream("localhost", 9999)
375
.map(line => {
376
val parts = line.split(",")
377
(parts(0), parts(1))
378
})
379
380
// Union all sources
381
val allData = staticData
382
.union(fileData)
383
.union(realtimeData)
384
385
// Process combined data
386
allData
387
.keyBy(0)
388
.timeWindow(Time.minutes(5))
389
.apply((key, window, events, out) => {
390
out.collect((key, events.size))
391
})
392
.print()
393
394
env.execute("Multi-Source Streaming Application")
395
}
396
}
397
```