or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

binary-operations.mddata-transformations.mdexecution-environment.mdgrouping-aggregation.mdindex.mdinput-output.mdpartitioning-distribution.mdtype-system.mdutilities.md
tile.json

input-output.mddocs/

Input and Output Operations

Reading data from various sources and writing results to different sinks. These operations handle data ingestion and result persistence.

Capabilities

File Input Operations

Read data from various file formats and file systems.

class ExecutionEnvironment {
  /**
   * Reads a text file as DataSet of strings
   * @param filePath Path to the text file
   * @param charsetName Character encoding (default: UTF-8)
   * @return DataSet of text lines
   */
  def readTextFile(filePath: String, charsetName: String = "UTF-8"): DataSet[String]
  
  /**
   * Reads a text file as DataSet of StringValue objects
   * @param filePath Path to the text file
   * @param charsetName Character encoding (default: UTF-8)
   * @return DataSet of StringValue objects
   */
  def readTextFileWithValue(filePath: String, charsetName: String = "UTF-8"): DataSet[StringValue]
  
  /**
   * Reads a CSV file into typed DataSet
   * @param filePath Path to the CSV file
   * @return DataSet of parsed CSV records
   */
  def readCsvFile[T: ClassTag: TypeInformation](filePath: String): DataSet[T]
  
  /**
   * Reads primitive values from a file
   * @param filePath Path to the file
   * @param delimiter Value delimiter (default: newline)
   * @return DataSet of primitive values
   */
  def readFileOfPrimitives[T: ClassTag: TypeInformation](
    filePath: String, 
    delimiter: String = "\n"
  ): DataSet[T]
  
  /**
   * Reads file using custom input format
   * @param inputFormat Custom file input format
   * @param filePath Path to the file
   * @return DataSet with custom format parsing
   */
  def readFile[T: ClassTag: TypeInformation](
    inputFormat: FileInputFormat[T], 
    filePath: String
  ): DataSet[T]
}

Usage Examples:

import org.apache.flink.api.scala._

val env = ExecutionEnvironment.getExecutionEnvironment

// Read text file
val textLines = env.readTextFile("hdfs://path/to/textfile.txt")

// Read CSV file with automatic parsing
case class Person(name: String, age: Int, city: String)
val people = env.readCsvFile[Person]("hdfs://path/to/people.csv")

// Read primitive values
val numbers = env.readFileOfPrimitives[Int]("hdfs://path/to/numbers.txt")

// Read with different encoding
val utf16Text = env.readTextFile("hdfs://path/to/file.txt", "UTF-16")

Custom Input Formats

Create DataSets from custom input sources and formats.

class ExecutionEnvironment {
  /**
   * Creates DataSet from custom input format
   * @param inputFormat Custom input format implementation
   * @return DataSet using custom input
   */
  def createInput[T: ClassTag: TypeInformation](inputFormat: InputFormat[T, _]): DataSet[T]
}

Usage Examples:

import org.apache.flink.api.common.io.InputFormat

// Custom input format for reading JSON files
class JsonInputFormat[T: ClassTag] extends InputFormat[T, FileInputSplit] {
  // Implementation for reading JSON data
}

val jsonData = env.createInput(new JsonInputFormat[MyDataClass])

Collection Input Sources

Create DataSets from in-memory collections and sequences.

class ExecutionEnvironment {
  /**
   * Creates DataSet from an iterable collection
   * @param data Iterable collection of elements
   * @return DataSet containing the collection elements
   */
  def fromCollection[T: TypeInformation: ClassTag](data: Iterable[T]): DataSet[T]
  
  /**
   * Creates DataSet from an iterator
   * @param data Iterator of elements
   * @return DataSet containing the iterator elements
   */
  def fromCollection[T: TypeInformation: ClassTag](data: Iterator[T]): DataSet[T]
  
  /**
   * Creates DataSet from individual elements
   * @param data Variable arguments of elements
   * @return DataSet containing the elements
   */
  def fromElements[T: TypeInformation: ClassTag](data: T*): DataSet[T]
  
  /**
   * Creates DataSet from a parallel collection
   * @param iterator Splittable iterator for parallel processing
   * @return DataSet from parallel collection
   */
  def fromParallelCollection[T: TypeInformation: ClassTag](iterator: SplittableIterator[T]): DataSet[T]
  
  /**
   * Generates a sequence of numbers
   * @param from Starting number (inclusive)
   * @param to Ending number (inclusive)
   * @return DataSet containing the number sequence
   */
  def generateSequence(from: Long, to: Long): DataSet[Long]
}

Usage Examples:

// From Scala collections
val listData = env.fromCollection(List(1, 2, 3, 4, 5))
val arrayData = env.fromCollection(Array("a", "b", "c"))

// From individual elements
val elementData = env.fromElements("apple", "banana", "cherry")

// Generate number sequence
val numbers = env.generateSequence(1, 1000000)

File Output Operations

Write DataSets to various file formats and destinations.

class DataSet[T] {
  /**
   * Writes DataSet as text file
   * @param filePath Output file path
   * @param writeMode Write mode (default: NO_OVERWRITE)
   * @return DataSink for the write operation
   */
  def writeAsText(filePath: String, writeMode: FileSystem.WriteMode = null): DataSink[T]
  
  /**
   * Writes DataSet as CSV file
   * @param filePath Output file path
   * @param rowDelimiter Row delimiter (default: newline)
   * @param fieldDelimiter Field delimiter (default: comma)
   * @param writeMode Write mode (default: NO_OVERWRITE)
   * @return DataSink for the write operation
   */
  def writeAsCsv(
    filePath: String,
    rowDelimiter: String = "\n",
    fieldDelimiter: String = ",",
    writeMode: FileSystem.WriteMode = null
  ): DataSink[T]
  
  /**
   * Writes using custom file output format
   * @param outputFormat Custom file output format
   * @param filePath Output file path
   * @param writeMode Write mode (default: NO_OVERWRITE)
   * @return DataSink for the write operation
   */
  def write(
    outputFormat: FileOutputFormat[T], 
    filePath: String, 
    writeMode: FileSystem.WriteMode = null
  ): DataSink[T]
  
  /**
   * Writes using custom output format
   * @param outputFormat Custom output format implementation
   * @return DataSink for the write operation
   */
  def output(outputFormat: OutputFormat[T]): DataSink[T]
}

Usage Examples:

// Write as text file
data.writeAsText("hdfs://path/to/output.txt")

// Write as CSV with custom delimiters
people.writeAsCsv("hdfs://path/to/people.csv", "\n", ";")

// Overwrite existing files
results.writeAsText("hdfs://path/to/results.txt", FileSystem.WriteMode.OVERWRITE)

Console Output Operations

Output DataSets to console for debugging and monitoring.

class DataSet[T] {
  /**
   * Prints all elements to standard output
   */
  def print(): Unit
  
  /**
   * Prints all elements to standard error
   */
  def printToErr(): Unit
  
  /**
   * Prints elements on task managers with identifier
   * @param sinkIdentifier Identifier for the print sink
   * @return DataSink for the print operation
   */
  def print(sinkIdentifier: String): DataSink[T]
  
  /**
   * Prints elements to standard error on task managers with identifier
   * @param sinkIdentifier Identifier for the print sink  
   * @return DataSink for the print operation
   */
  def printToErr(sinkIdentifier: String): DataSink[T]
  
  /**
   * Prints elements on task managers with prefix
   * @param prefix Prefix for each printed line
   * @return DataSink for the print operation
   */
  def printOnTaskManager(prefix: String): DataSink[T]
}

Usage Examples:

// Print to console (for small datasets)
smallData.print()

// Print with identifier in distributed environment
data.print("MyDataStream")

// Print with prefix for identification
results.printOnTaskManager("RESULT> ")

Data Collection

Collect DataSet elements to the driver program for inspection.

class DataSet[T] {
  /**
   * Collects all elements to the driver program
   * @return Sequence containing all elements
   */
  def collect(): Seq[T]
  
  /**
   * Counts the number of elements in the DataSet
   * @return Number of elements
   */
  def count(): Long
}

Usage Examples:

// Collect results (use carefully with large datasets)
val results = processedData.collect()
results.foreach(println)

// Count elements
val elementCount = largeDataset.count()
println(s"Dataset contains $elementCount elements")

Write Modes

Control behavior when output files already exist.

object FileSystem {
  sealed trait WriteMode
  case object OVERWRITE extends WriteMode    // Overwrite existing files
  case object NO_OVERWRITE extends WriteMode // Fail if files exist (default)
}

Custom Output Formats

Create custom sinks for specialized output requirements.

// Example: Custom output format for writing to databases
abstract class OutputFormat[T] {
  /**
   * Configures the output format
   * @param parameters Configuration parameters
   */
  def configure(parameters: Configuration): Unit
  
  /**
   * Opens the output format
   * @param taskNumber Task number
   * @param numTasks Total number of tasks
   */
  def open(taskNumber: Int, numTasks: Int): Unit
  
  /**
   * Writes a record
   * @param record Record to write
   */
  def writeRecord(record: T): Unit
  
  /**
   * Closes the output format
   */
  def close(): Unit
}

Usage Examples:

// Custom database output format
class DatabaseOutputFormat[T] extends OutputFormat[T] {
  private var connection: Connection = _
  
  override def configure(parameters: Configuration): Unit = {
    // Setup database connection parameters
  }
  
  override def open(taskNumber: Int, numTasks: Int): Unit = {
    // Open database connection
  }
  
  override def writeRecord(record: T): Unit = {
    // Write record to database
  }
  
  override def close(): Unit = {
    // Close database connection
  }
}

// Use custom output format
data.output(new DatabaseOutputFormat[Person])

Broadcast Variables

Access side inputs in operations through broadcast variables.

class DataSet[T] {
  /**
   * Adds a broadcast DataSet that can be accessed in transformations
   * @param data DataSet to broadcast
   * @param name Name for accessing the broadcast data
   * @return DataSet with broadcast variable configured
   */
  def withBroadcastSet(data: DataSet[_], name: String): DataSet[T]
}

// In function implementations, access broadcast data:
abstract class RichMapFunction[T, O] extends MapFunction[T, O] {
  /**
   * Gets broadcast variable by name
   * @param name Broadcast variable name
   * @return List of broadcast elements
   */
  def getBroadcastVariable[X](name: String): java.util.List[X]
  
  /**
   * Gets broadcast variable with hint
   * @param name Broadcast variable name
   * @param hint Broadcast variable hint
   * @return Broadcast variable with hint
   */
  def getBroadcastVariableWithInitializer[X, Y](
    name: String, 
    hint: BroadcastVariableInitializer[X, Y]
  ): Y
}

Usage Examples:

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration

// Broadcast lookup data
val lookupData = env.fromElements(("key1", "value1"), ("key2", "value2"))

// Use broadcast data in transformation
val enrichedData = data
  .map(new RichMapFunction[String, String] {
    var lookup: Map[String, String] = _
    
    override def open(parameters: Configuration): Unit = {
      val broadcastData = getBroadcastVariable[(String, String)]("lookup")
      lookup = broadcastData.asScala.toMap
    }
    
    override def map(value: String): String = {
      lookup.getOrElse(value, "unknown")
    }
  })
  .withBroadcastSet(lookupData, "lookup")

Types

trait InputFormat[T, InputSplit] {
  /**
   * Configures the input format
   * @param parameters Configuration parameters
   */
  def configure(parameters: Configuration): Unit
  
  /**
   * Creates input splits for parallel reading
   * @param minNumSplits Minimum number of splits
   * @return Array of input splits
   */
  def createInputSplits(minNumSplits: Int): Array[InputSplit]
  
  /**
   * Opens an input split for reading
   * @param split Input split to open
   */
  def open(split: InputSplit): Unit
  
  /**
   * Checks if more records are available
   * @return True if more records available
   */
  def reachedEnd(): Boolean
  
  /**
   * Reads the next record
   * @param reuse Object to reuse for the record
   * @return Next record or null if end reached
   */
  def nextRecord(reuse: T): T
  
  /**
   * Closes the input format
   */
  def close(): Unit
}

abstract class FileInputFormat[T] extends InputFormat[T, FileInputSplit] {
  /**
   * Sets the path to read from
   * @param filePath File path
   */
  def setFilePath(filePath: Path): Unit
  
  /**
   * Sets file paths to read from
   * @param filePaths Array of file paths
   */
  def setFilePaths(filePaths: Path*): Unit
}

class DataSink[T] {
  /**
   * Sets the parallelism for this sink
   * @param parallelism Degree of parallelism
   * @return DataSink with specified parallelism
   */
  def setParallelism(parallelism: Int): DataSink[T]
  
  /**
   * Gets the parallelism of this sink
   * @return Current parallelism setting
   */
  def getParallelism: Int
  
  /**
   * Sets the name for this sink
   * @param name Sink name
   * @return DataSink with specified name
   */
  def name(name: String): DataSink[T]
}

trait SplittableIterator[T] extends Iterator[T] {
  /**
   * Splits the iterator into multiple iterators
   * @param numPartitions Number of partitions
   * @return Array of split iterators
   */
  def split(numPartitions: Int): Array[Iterator[T]]
  
  /**
   * Gets the maximum number of splits
   * @return Maximum number of splits
   */
  def getMaximumNumberOfSplits: Int
}

class StringValue extends Comparable[StringValue] {
  /**
   * Creates StringValue from string
   * @param value String value
   */
  def this(value: String)
  
  /**
   * Gets the string value
   * @return String value
   */
  def getValue: String
  
  /**
   * Sets the string value
   * @param value String value
   */
  def setValue(value: String): Unit
}