CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-streaming-scala-2-12

Scala API for Apache Flink streaming applications providing idiomatic Scala bindings for building high-throughput, low-latency stream processing applications.

Pending
Overview
Eval results
Files

sinks-output.mddocs/

Sinks and Output

Output operations for writing processed data to external systems, monitoring stream results, and collecting data for analysis. This includes various sink types, output formats, and result collection methods.

Capabilities

Basic Output Operations

Simple output operations for debugging and monitoring.

class DataStream[T] {
  /**
   * Print stream elements to standard output
   * @return DataStreamSink for output configuration
   */
  def print(): DataStreamSink[T]
  
  /**
   * Print stream elements with a sink identifier
   * @param sinkIdentifier Identifier to prefix output lines
   * @return DataStreamSink for output configuration
   */
  def print(sinkIdentifier: String): DataStreamSink[T]
  
  /**
   * Print stream elements to standard error
   * @return DataStreamSink for output configuration
   */
  def printToErr(): DataStreamSink[T]
  
  /**
   * Print stream elements to standard error with identifier
   * @param sinkIdentifier Identifier to prefix output lines
   * @return DataStreamSink for output configuration
   */
  def printToErr(sinkIdentifier: String): DataStreamSink[T]
}

Usage Examples:

import org.apache.flink.streaming.api.scala._

val numbers = env.fromElements(1, 2, 3, 4, 5)

// Simple print to stdout
numbers.print()

// Print with identifier (useful for multiple sinks)
numbers.print("NumberStream")

// Print to stderr
numbers.printToErr("ErrorStream")

File Output Operations

Write stream data to files (deprecated but still available).

class DataStream[T] {
  /**
   * Write stream elements as text to a file (deprecated)
   * @param path Output file path
   * @return DataStreamSink for file output
   */
  @deprecated("Use FileSink instead", "1.19.0")
  def writeAsText(path: String): DataStreamSink[T]
  
  /**
   * Write stream elements as text with write mode (deprecated)
   * @param path Output file path
   * @param writeMode File write mode (overwrite/no_overwrite)
   * @return DataStreamSink for file output
   */
  @deprecated("Use FileSink instead", "1.19.0")
  def writeAsText(path: String, writeMode: FileSystem.WriteMode): DataStreamSink[T]
  
  /**
   * Write stream elements as CSV (deprecated)
   * @param path Output file path
   * @return DataStreamSink for CSV output
   */
  @deprecated("Use FileSink instead", "1.19.0")
  def writeAsCsv(path: String): DataStreamSink[T]
  
  /**
   * Write stream elements as CSV with custom delimiters (deprecated)
   * @param path Output file path
   * @param writeMode File write mode
   * @param rowDelimiter Row delimiter character
   * @param fieldDelimiter Field delimiter character
   * @return DataStreamSink for CSV output
   */
  @deprecated("Use FileSink instead", "1.19.0")
  def writeAsCsv(
    path: String, 
    writeMode: FileSystem.WriteMode, 
    rowDelimiter: String, 
    fieldDelimiter: String
  ): DataStreamSink[T]
}

Custom Sink Functions

Apply custom sink functions for specialized output handling.

class DataStream[T] {
  /**
   * Add a custom sink function
   * @param sinkFunction SinkFunction implementation
   * @return DataStreamSink for sink configuration
   */
  def addSink(sinkFunction: SinkFunction[T]): DataStreamSink[T]
  
  /**
   * Add a sink using function syntax
   * @param fun Function to process elements
   * @return DataStreamSink for sink configuration
   */
  def addSink(fun: T => Unit): DataStreamSink[T]
  
  /**
   * Use new Sink interface (recommended)
   * @param sink Sink implementation
   * @return DataStreamSink for sink configuration
   */
  def sinkTo(sink: org.apache.flink.api.connector.sink.Sink[T, _, _, _]): DataStreamSink[T]
  
  /**
   * Use new Sink interface with Scala wrapper
   * @param sink Sink implementation
   * @return DataStreamSink for sink configuration
   */
  def sinkTo(sink: Sink[T]): DataStreamSink[T]
}

Usage Examples:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.api.connector.sink2.Sink

case class SensorReading(sensorId: String, temperature: Double, timestamp: Long)

val readings = env.fromElements(
  SensorReading("sensor1", 20.0, 1000L),
  SensorReading("sensor2", 25.0, 2000L)
)

// Custom sink function
class LoggingSink extends SinkFunction[SensorReading] {
  override def invoke(value: SensorReading, context: SinkFunction.Context): Unit = {
    println(s"Logging: ${value.sensorId} - ${value.temperature}°C at ${value.timestamp}")
    // Could write to database, send to external system, etc.
  }
}

readings.addSink(new LoggingSink)

// Using function syntax
readings.addSink(reading => 
  println(s"Processing: ${reading.sensorId} - ${reading.temperature}°C")
)

// Using new Sink interface (example with FileSink)
import org.apache.flink.connector.file.sink.FileSink
import org.apache.flink.core.fs.Path
import org.apache.flink.formats.parquet.avro.AvroParquetWriters

val fileSink = FileSink
  .forRowFormat(new Path("/path/to/output"), new SimpleStringEncoder[SensorReading]("UTF-8"))
  .build()

readings.map(_.toString).sinkTo(fileSink)

Output Format Operations

Use custom output formats for structured data writing.

class DataStream[T] {
  /**
   * Write using a custom OutputFormat
   * @param format OutputFormat implementation
   * @return DataStreamSink for output configuration
   */
  def writeUsingOutputFormat(format: OutputFormat[T]): DataStreamSink[T]
}

Socket Output

Write stream data to network sockets.

class DataStream[T] {
  /**
   * Write elements to a network socket
   * @param hostname Target hostname
   * @param port Target port
   * @param schema Serialization schema for elements
   * @return DataStreamSink for socket output
   */
  def writeToSocket(hostname: String, port: Integer, schema: SerializationSchema[T]): DataStreamSink[T]
}

Result Collection

Collect stream results for local processing and analysis.

class DataStream[T] {
  /**
   * Execute and collect all results (blocking operation)
   * @return CloseableIterator of all stream elements
   */
  def executeAndCollect(): CloseableIterator[T]
  
  /**
   * Execute and collect all results with job name (blocking operation)
   * @param jobExecutionName Name for the execution job
   * @return CloseableIterator of all stream elements
   */
  def executeAndCollect(jobExecutionName: String): CloseableIterator[T]
  
  /**
   * Execute and collect limited results (blocking operation)
   * @param limit Maximum number of elements to collect
   * @return List of collected elements
   */
  def executeAndCollect(limit: Int): List[T]
  
  /**
   * Execute and collect limited results with job name (blocking operation)
   * @param jobExecutionName Name for the execution job
   * @param limit Maximum number of elements to collect
   * @return List of collected elements
   */
  def executeAndCollect(jobExecutionName: String, limit: Int): List[T]
  
  /**
   * Collect results asynchronously (non-blocking)
   * @return CloseableIterator for async result access
   */
  def collectAsync(): CloseableIterator[T]
  
  /**
   * Collect results using a custom collector
   * @param collector Custom collector implementation
   */
  def collectAsync(collector: JavaStream.Collector[T]): Unit
}

Usage Examples:

import org.apache.flink.streaming.api.scala._

val numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val evenNumbers = numbers.filter(_ % 2 == 0)

// Collect all results (for small datasets only)
val allResults = evenNumbers.executeAndCollect()
allResults.asScala.foreach(println)
allResults.close()

// Collect limited results
val firstThree = evenNumbers.executeAndCollect(3)
println(s"First 3 even numbers: $firstThree")

// Async collection (non-blocking)
val asyncResults = evenNumbers.collectAsync()
// Process results as they arrive
while (asyncResults.hasNext) {
  println(s"Received: ${asyncResults.next()}")
}
asyncResults.close()

DataStreamSink Configuration

Configure sink behavior and properties.

class DataStreamSink[T] {
  /**
   * Set the parallelism for this sink
   * @param parallelism Sink parallelism
   * @return This sink for chaining
   */
  def setParallelism(parallelism: Int): DataStreamSink[T]
  
  /**
   * Set a name for this sink operator
   * @param name Operator name
   * @return This sink for chaining
   */
  def name(name: String): DataStreamSink[T]
  
  /**
   * Set a unique identifier for this sink
   * @param uid Unique identifier
   * @return This sink for chaining
   */
  def uid(uid: String): DataStreamSink[T]
  
  /**
   * Set a description for this sink
   * @param description Operator description
   * @return This sink for chaining
   */
  def setDescription(description: String): DataStreamSink[T]
  
  /**
   * Disable chaining for this sink
   * @return This sink for chaining
   */
  def disableChaining(): DataStreamSink[T]
  
  /**
   * Set slot sharing group for this sink
   * @param slotSharingGroup Slot sharing group name
   * @return This sink for chaining
   */
  def slotSharingGroup(slotSharingGroup: String): DataStreamSink[T]
}

Usage Examples:

val readings = env.fromElements(
  SensorReading("sensor1", 20.0, 1000L),
  SensorReading("sensor2", 25.0, 2000L)
)

// Configure sink properties
readings
  .addSink(new LoggingSink)
  .setParallelism(2)
  .name("Sensor Logging Sink")
  .uid("sensor-logging-sink-v1")
  .setDescription("Logs sensor readings to external system")

Types

// Sink function interface
trait SinkFunction[T] {
  /**
   * Process a single element
   * @param value Element to process
   * @param context Sink context
   */
  def invoke(value: T, context: SinkFunction.Context): Unit
  
  trait Context {
    def currentProcessingTime(): Long
    def currentWatermark(): Long
    def timestamp(): Long
  }
}

// Rich sink function with lifecycle methods
abstract class RichSinkFunction[T] extends SinkFunction[T] with RichFunction {
  override def open(parameters: Configuration): Unit = {}
  override def close(): Unit = {}
}

// Output format interface
trait OutputFormat[T] {
  def configure(parameters: Configuration): Unit
  def open(taskNumber: Int, numTasks: Int): Unit
  def writeRecord(record: T): Unit
  def close(): Unit
}

// Serialization schema for socket output
trait SerializationSchema[T] {
  def serialize(element: T): Array[Byte]
}

// File system write modes
object WriteMode extends Enumeration {
  val NO_OVERWRITE, OVERWRITE = Value
}

// Closeable iterator for result collection
trait CloseableIterator[T] extends Iterator[T] with AutoCloseable {
  def hasNext: Boolean
  def next(): T
  def close(): Unit
}

// New Sink interface (recommended)
trait Sink[IN] {
  def createWriter(context: InitContext): SinkWriter[IN]
  
  trait InitContext {
    def getSubtaskId: Int
    def getNumberOfParallelSubtasks: Int
    def getRestoredCheckpointId: Option[Long]
  }
}

trait SinkWriter[IN] extends AutoCloseable {
  def write(element: IN, context: Context): Unit
  def flush(endOfInput: Boolean): Unit
  def close(): Unit
  
  trait Context {
    def timestamp(): Long
    def currentWatermark(): Long
  }
}

// Built-in encoders
class SimpleStringEncoder[T](charset: String) extends Encoder[T] {
  def encode(element: T, stream: OutputStream): Unit
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-streaming-scala-2-12

docs

async-io.md

data-streams.md

execution-environment.md

index.md

keyed-streams.md

processing-functions.md

sinks-output.md

stream-connections.md

window-functions.md

windowing.md

tile.json