or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

async-operations.mddata-sources.mdfunction-interfaces.mdindex.mdkeyed-streams.mdoutput-operations.mdscala-extensions.mdstream-composition.mdstream-environment.mdstream-partitioning.mdstream-transformations.mdwindow-operations.mdwindowing.md
tile.json

data-sources.mddocs/

Data Sources and Stream Creation

The StreamExecutionEnvironment provides comprehensive functionality for creating DataStreams from various sources, including collections, files, sockets, and custom source functions.

Collection-Based Sources

From Elements

class StreamExecutionEnvironment {
  def fromElements[T: TypeInformation](data: T*): DataStream[T]
}

Create a stream from individual elements:

val env = StreamExecutionEnvironment.getExecutionEnvironment

// Create stream from individual elements
val numbers = env.fromElements(1, 2, 3, 4, 5)
val strings = env.fromElements("apple", "banana", "cherry")
val tuples = env.fromElements(("Alice", 25), ("Bob", 30), ("Charlie", 35))

From Collection

class StreamExecutionEnvironment {
  def fromCollection[T: TypeInformation](data: Seq[T]): DataStream[T]
  def fromCollection[T: TypeInformation](data: Iterator[T]): DataStream[T]
}

Create streams from Scala collections:

val env = StreamExecutionEnvironment.getExecutionEnvironment

// From Scala collections
val listData = List(1, 2, 3, 4, 5)
val stream1 = env.fromCollection(listData)

val vectorData = Vector("a", "b", "c")
val stream2 = env.fromCollection(vectorData)

// From iterator
val iteratorData = (1 to 1000).iterator
val stream3 = env.fromCollection(iteratorData)

From Parallel Collection

class StreamExecutionEnvironment {
  def fromParallelCollection[T: TypeInformation](data: SplittableIterator[T]): DataStream[T]
}

Create parallel streams from splittable data:

import org.apache.flink.util.SplittableIterator

val env = StreamExecutionEnvironment.getExecutionEnvironment

// Custom splittable iterator for parallel processing
class NumberSplittableIterator(from: Int, to: Int) extends SplittableIterator[Int] {
  private var current = from
  
  override def hasNext: Boolean = current <= to
  override def next(): Int = { val result = current; current += 1; result }
  override def split(numPartitions: Int): Array[SplittableIterator[Int]] = {
    val range = (to - from + 1) / numPartitions
    (0 until numPartitions).map { i =>
      val start = from + i * range
      val end = if (i == numPartitions - 1) to else start + range - 1
      new NumberSplittableIterator(start, end)
    }.toArray
  }
  override def getMaximumNumberOfSplits: Int = to - from + 1
}

val parallelStream = env.fromParallelCollection(new NumberSplittableIterator(1, 10000))

Sequence Generation

class StreamExecutionEnvironment {
  def generateSequence(from: Long, to: Long): DataStream[Long]
}

Generate numeric sequences:

val env = StreamExecutionEnvironment.getExecutionEnvironment

// Generate sequence from 1 to 1000
val sequence = env.generateSequence(1, 1000)

// Process the sequence
sequence
  .filter(_ % 2 == 0)
  .map(_ * 2)
  .print()

File-Based Sources

Text File Reading

class StreamExecutionEnvironment {
  def readTextFile(filePath: String): DataStream[String]
  def readTextFile(filePath: String, charsetName: String): DataStream[String]
}

Read text files line by line:

val env = StreamExecutionEnvironment.getExecutionEnvironment

// Read text file with default charset (UTF-8)
val textStream = env.readTextFile("/path/to/input.txt")

// Read with specific charset
val textStreamLatin1 = env.readTextFile("/path/to/input.txt", "ISO-8859-1")

// Process text data
textStream
  .flatMap(_.split("\\s+"))
  .map(word => (word, 1))
  .keyBy(0)
  .sum(1)
  .print()

Custom File Formats

class StreamExecutionEnvironment {
  def readFile[T: TypeInformation](inputFormat: FileInputFormat[T], filePath: String): DataStream[T]
  def readFile[T: TypeInformation](
    inputFormat: FileInputFormat[T],
    filePath: String,
    watchType: FileProcessingMode,
    interval: Long
  ): DataStream[T]
}

Read files with custom input formats:

import org.apache.flink.api.common.io.FileInputFormat
import org.apache.flink.streaming.api.functions.source.FileProcessingMode

val env = StreamExecutionEnvironment.getExecutionEnvironment

// Custom CSV input format example
class CsvInputFormat extends FileInputFormat[String] {
  // Implementation details...
}

// Read with custom format
val csvStream = env.readFile(new CsvInputFormat(), "/path/to/data.csv")

// Monitor file changes (for streaming file ingestion)
val monitoredStream = env.readFile(
  new CsvInputFormat(),
  "/path/to/data/",
  FileProcessingMode.PROCESS_CONTINUOUSLY,
  1000  // Check every 1000ms
)

File Stream Monitoring (Deprecated)

class StreamExecutionEnvironment {
  def readFileStream(streamPath: String, intervalMillis: Long, watchType: FileMonitoringFunction.WatchType): DataStream[String]
}

Socket-Based Sources

class StreamExecutionEnvironment {
  def socketTextStream(hostname: String, port: Int): DataStream[String]
  def socketTextStream(hostname: String, port: Int, delimiter: Char): DataStream[String]
  def socketTextStream(hostname: String, port: Int, delimiter: Char, maxRetry: Long): DataStream[String]
}

Connect to socket sources for real-time data:

val env = StreamExecutionEnvironment.getExecutionEnvironment

// Connect to socket with default settings
val socketStream = env.socketTextStream("localhost", 9999)

// With custom delimiter and retry settings
val socketStreamCustom = env.socketTextStream("localhost", 9999, '\n', 5)

// Process socket data
socketStream
  .flatMap(_.toLowerCase.split("\\W+"))
  .filter(_.nonEmpty)
  .map((_, 1))
  .keyBy(0)
  .sum(1)
  .print()

Custom Source Functions

Simple Source Function

class StreamExecutionEnvironment {
  def addSource[T: TypeInformation](function: SourceFunction[T]): DataStream[T]
  def addSource[T: TypeInformation](function: SourceContext[T] => Unit): DataStream[T]
}

Create custom source functions:

import org.apache.flink.streaming.api.functions.source.{SourceFunction, SourceContext}

// Custom source function class
class NumberGeneratorSource extends SourceFunction[Int] {
  @volatile private var isRunning = true
  
  override def run(ctx: SourceContext[Int]): Unit = {
    var counter = 0
    while (isRunning && counter < 1000) {
      ctx.collect(counter)
      counter += 1
      Thread.sleep(100)  // Emit every 100ms
    }
  }
  
  override def cancel(): Unit = {
    isRunning = false
  }
}

val env = StreamExecutionEnvironment.getExecutionEnvironment

// Use custom source function
val customStream = env.addSource(new NumberGeneratorSource)

// Lambda-based source function
val lambdaStream = env.addSource { ctx =>
  for (i <- 1 to 100) {
    ctx.collect(s"Message $i")
    Thread.sleep(1000)
  }
}

Rich Source Function

import org.apache.flink.streaming.api.functions.source.RichSourceFunction
import org.apache.flink.configuration.Configuration

class ConfigurableSource extends RichSourceFunction[String] {
  private var isRunning = true
  private var config: String = _
  
  override def open(parameters: Configuration): Unit = {
    // Initialize with runtime context
    config = getRuntimeContext.getJobParameter("source.config", "default")
  }
  
  override def run(ctx: SourceContext[String]): Unit = {
    while (isRunning) {
      ctx.collect(s"Data from $config")
      Thread.sleep(1000)
    }
  }
  
  override def cancel(): Unit = {
    isRunning = false
  }
}

Input Format Sources

class StreamExecutionEnvironment {
  def createInput[T: TypeInformation](inputFormat: InputFormat[T, _]): DataStream[T]
}

Create streams from custom input formats:

import org.apache.flink.api.common.io.InputFormat

// Custom input format implementation
class DatabaseInputFormat extends InputFormat[MyRecord, DatabaseInputSplit] {
  // Implementation details...
}

val env = StreamExecutionEnvironment.getExecutionEnvironment
val dbStream = env.createInput(new DatabaseInputFormat)

Source Configuration and Best Practices

Parallelism Control

val env = StreamExecutionEnvironment.getExecutionEnvironment

// Set specific parallelism for sources
val source = env.addSource(new CustomSource)
  .setParallelism(4)  // 4 parallel source instances

// Some sources don't support parallelism > 1
val socketSource = env.socketTextStream("localhost", 9999)
  .setParallelism(1)  // Socket sources typically need parallelism 1

Source Watermark Assignment

import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.windowing.time.Time

val env = StreamExecutionEnvironment.getExecutionEnvironment

// Create source with timestamp and watermark assignment
val timestampedStream = env.addSource(new TimestampedSource)
  .assignTimestampsAndWatermarks(
    new BoundedOutOfOrdernessTimestampExtractor[MyEvent](Time.seconds(5)) {
      override def extractTimestamp(element: MyEvent): Long = element.timestamp
    }
  )

Complete Example: Multi-Source Application

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object MultiSourceExample {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    
    // Static data source
    val staticData = env.fromElements(
      ("user1", "login"),
      ("user2", "logout"),
      ("user3", "login")
    )
    
    // File-based source
    val fileData = env.readTextFile("/path/to/logs.txt")
      .map(line => {
        val parts = line.split(",")
        (parts(0), parts(1))
      })
    
    // Socket-based real-time source
    val realtimeData = env.socketTextStream("localhost", 9999)
      .map(line => {
        val parts = line.split(",")
        (parts(0), parts(1))
      })
    
    // Union all sources
    val allData = staticData
      .union(fileData)
      .union(realtimeData)
    
    // Process combined data
    allData
      .keyBy(0)
      .timeWindow(Time.minutes(5))
      .apply((key, window, events, out) => {
        out.collect((key, events.size))
      })
      .print()
    
    env.execute("Multi-Source Streaming Application")
  }
}