The StreamExecutionEnvironment provides comprehensive functionality for creating DataStreams from various sources, including collections, files, sockets, and custom source functions.
class StreamExecutionEnvironment {
def fromElements[T: TypeInformation](data: T*): DataStream[T]
}Create a stream from individual elements:
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Create stream from individual elements
val numbers = env.fromElements(1, 2, 3, 4, 5)
val strings = env.fromElements("apple", "banana", "cherry")
val tuples = env.fromElements(("Alice", 25), ("Bob", 30), ("Charlie", 35))class StreamExecutionEnvironment {
def fromCollection[T: TypeInformation](data: Seq[T]): DataStream[T]
def fromCollection[T: TypeInformation](data: Iterator[T]): DataStream[T]
}Create streams from Scala collections:
val env = StreamExecutionEnvironment.getExecutionEnvironment
// From Scala collections
val listData = List(1, 2, 3, 4, 5)
val stream1 = env.fromCollection(listData)
val vectorData = Vector("a", "b", "c")
val stream2 = env.fromCollection(vectorData)
// From iterator
val iteratorData = (1 to 1000).iterator
val stream3 = env.fromCollection(iteratorData)class StreamExecutionEnvironment {
def fromParallelCollection[T: TypeInformation](data: SplittableIterator[T]): DataStream[T]
}Create parallel streams from splittable data:
import org.apache.flink.util.SplittableIterator
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Custom splittable iterator for parallel processing
class NumberSplittableIterator(from: Int, to: Int) extends SplittableIterator[Int] {
private var current = from
override def hasNext: Boolean = current <= to
override def next(): Int = { val result = current; current += 1; result }
override def split(numPartitions: Int): Array[SplittableIterator[Int]] = {
val range = (to - from + 1) / numPartitions
(0 until numPartitions).map { i =>
val start = from + i * range
val end = if (i == numPartitions - 1) to else start + range - 1
new NumberSplittableIterator(start, end)
}.toArray
}
override def getMaximumNumberOfSplits: Int = to - from + 1
}
val parallelStream = env.fromParallelCollection(new NumberSplittableIterator(1, 10000))class StreamExecutionEnvironment {
def generateSequence(from: Long, to: Long): DataStream[Long]
}Generate numeric sequences:
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Generate sequence from 1 to 1000
val sequence = env.generateSequence(1, 1000)
// Process the sequence
sequence
.filter(_ % 2 == 0)
.map(_ * 2)
.print()class StreamExecutionEnvironment {
def readTextFile(filePath: String): DataStream[String]
def readTextFile(filePath: String, charsetName: String): DataStream[String]
}Read text files line by line:
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Read text file with default charset (UTF-8)
val textStream = env.readTextFile("/path/to/input.txt")
// Read with specific charset
val textStreamLatin1 = env.readTextFile("/path/to/input.txt", "ISO-8859-1")
// Process text data
textStream
.flatMap(_.split("\\s+"))
.map(word => (word, 1))
.keyBy(0)
.sum(1)
.print()class StreamExecutionEnvironment {
def readFile[T: TypeInformation](inputFormat: FileInputFormat[T], filePath: String): DataStream[T]
def readFile[T: TypeInformation](
inputFormat: FileInputFormat[T],
filePath: String,
watchType: FileProcessingMode,
interval: Long
): DataStream[T]
}Read files with custom input formats:
import org.apache.flink.api.common.io.FileInputFormat
import org.apache.flink.streaming.api.functions.source.FileProcessingMode
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Custom CSV input format example
class CsvInputFormat extends FileInputFormat[String] {
// Implementation details...
}
// Read with custom format
val csvStream = env.readFile(new CsvInputFormat(), "/path/to/data.csv")
// Monitor file changes (for streaming file ingestion)
val monitoredStream = env.readFile(
new CsvInputFormat(),
"/path/to/data/",
FileProcessingMode.PROCESS_CONTINUOUSLY,
1000 // Check every 1000ms
)class StreamExecutionEnvironment {
def readFileStream(streamPath: String, intervalMillis: Long, watchType: FileMonitoringFunction.WatchType): DataStream[String]
}class StreamExecutionEnvironment {
def socketTextStream(hostname: String, port: Int): DataStream[String]
def socketTextStream(hostname: String, port: Int, delimiter: Char): DataStream[String]
def socketTextStream(hostname: String, port: Int, delimiter: Char, maxRetry: Long): DataStream[String]
}Connect to socket sources for real-time data:
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Connect to socket with default settings
val socketStream = env.socketTextStream("localhost", 9999)
// With custom delimiter and retry settings
val socketStreamCustom = env.socketTextStream("localhost", 9999, '\n', 5)
// Process socket data
socketStream
.flatMap(_.toLowerCase.split("\\W+"))
.filter(_.nonEmpty)
.map((_, 1))
.keyBy(0)
.sum(1)
.print()class StreamExecutionEnvironment {
def addSource[T: TypeInformation](function: SourceFunction[T]): DataStream[T]
def addSource[T: TypeInformation](function: SourceContext[T] => Unit): DataStream[T]
}Create custom source functions:
import org.apache.flink.streaming.api.functions.source.{SourceFunction, SourceContext}
// Custom source function class
class NumberGeneratorSource extends SourceFunction[Int] {
@volatile private var isRunning = true
override def run(ctx: SourceContext[Int]): Unit = {
var counter = 0
while (isRunning && counter < 1000) {
ctx.collect(counter)
counter += 1
Thread.sleep(100) // Emit every 100ms
}
}
override def cancel(): Unit = {
isRunning = false
}
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Use custom source function
val customStream = env.addSource(new NumberGeneratorSource)
// Lambda-based source function
val lambdaStream = env.addSource { ctx =>
for (i <- 1 to 100) {
ctx.collect(s"Message $i")
Thread.sleep(1000)
}
}import org.apache.flink.streaming.api.functions.source.RichSourceFunction
import org.apache.flink.configuration.Configuration
class ConfigurableSource extends RichSourceFunction[String] {
private var isRunning = true
private var config: String = _
override def open(parameters: Configuration): Unit = {
// Initialize with runtime context
config = getRuntimeContext.getJobParameter("source.config", "default")
}
override def run(ctx: SourceContext[String]): Unit = {
while (isRunning) {
ctx.collect(s"Data from $config")
Thread.sleep(1000)
}
}
override def cancel(): Unit = {
isRunning = false
}
}class StreamExecutionEnvironment {
def createInput[T: TypeInformation](inputFormat: InputFormat[T, _]): DataStream[T]
}Create streams from custom input formats:
import org.apache.flink.api.common.io.InputFormat
// Custom input format implementation
class DatabaseInputFormat extends InputFormat[MyRecord, DatabaseInputSplit] {
// Implementation details...
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
val dbStream = env.createInput(new DatabaseInputFormat)val env = StreamExecutionEnvironment.getExecutionEnvironment
// Set specific parallelism for sources
val source = env.addSource(new CustomSource)
.setParallelism(4) // 4 parallel source instances
// Some sources don't support parallelism > 1
val socketSource = env.socketTextStream("localhost", 9999)
.setParallelism(1) // Socket sources typically need parallelism 1import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.windowing.time.Time
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Create source with timestamp and watermark assignment
val timestampedStream = env.addSource(new TimestampedSource)
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[MyEvent](Time.seconds(5)) {
override def extractTimestamp(element: MyEvent): Long = element.timestamp
}
)import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object MultiSourceExample {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Static data source
val staticData = env.fromElements(
("user1", "login"),
("user2", "logout"),
("user3", "login")
)
// File-based source
val fileData = env.readTextFile("/path/to/logs.txt")
.map(line => {
val parts = line.split(",")
(parts(0), parts(1))
})
// Socket-based real-time source
val realtimeData = env.socketTextStream("localhost", 9999)
.map(line => {
val parts = line.split(",")
(parts(0), parts(1))
})
// Union all sources
val allData = staticData
.union(fileData)
.union(realtimeData)
// Process combined data
allData
.keyBy(0)
.timeWindow(Time.minutes(5))
.apply((key, window, events, out) => {
out.collect((key, events.size))
})
.print()
env.execute("Multi-Source Streaming Application")
}
}