or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

data-sources-sinks.mdexecution-environment.mdgrouping-aggregation.mdhadoop-integration.mdindex.mditerations.mdjoins-cogroups.mdtransformations.mdtype-system.md
tile.json

data-sources-sinks.mddocs/

Data Sources and Sinks

Apache Flink Scala API provides comprehensive support for reading data from various sources and writing results to different output formats with type-safe operations.

Data Sources

Collection Sources

Create DataSets from in-memory collections, useful for testing and small datasets.

class ExecutionEnvironment {
  // Create DataSet from Scala collections
  def fromCollection[T: ClassTag : TypeInformation](data: Iterable[T]): DataSet[T]
  def fromCollection[T: ClassTag : TypeInformation](data: Iterator[T]): DataSet[T]
  
  // Create DataSet from individual elements
  def fromElements[T: ClassTag : TypeInformation](data: T*): DataSet[T]
  
  // Create DataSet from parallel splittable iterator
  def fromParallelCollection[T: ClassTag : TypeInformation](data: SplittableIterator[T]): DataSet[T]
  
  // Generate sequence of numbers
  def generateSequence(from: Long, to: Long): DataSet[Long]
}

File Sources

Read data from various file formats with configurable encoding and parsing options.

class ExecutionEnvironment {
  // Read text files
  def readTextFile(filePath: String, charsetName: String = "UTF-8"): DataSet[String]
  def readTextFileWithValue(filePath: String, charsetName: String = "UTF-8"): DataSet[StringValue]
  
  // Read CSV files with type-safe parsing
  def readCsvFile[T: ClassTag : TypeInformation](filePath: String): CsvReader[T]
  
  // Read files containing primitive values
  def readFileOfPrimitives[T: ClassTag : TypeInformation](filePath: String, delimiter: String): DataSet[T]
  def readFileOfPrimitives[T: ClassTag : TypeInformation](filePath: String, typeClass: Class[T]): DataSet[T]
}

Custom Input Formats

class ExecutionEnvironment {
  // Use custom input format
  def createInput[T: ClassTag : TypeInformation](inputFormat: InputFormat[T, _]): DataSet[T]
}

Data Sinks

Basic Output Operations

class DataSet[T] {
  // Write to text files
  def writeAsText(filePath: String, writeMode: FileSystem.WriteMode = NO_OVERWRITE): DataSink[T]
  
  // Write with custom text formatting
  def writeAsFormattedText(
    filePath: String, 
    writeMode: FileSystem.WriteMode = NO_OVERWRITE, 
    format: TextFormatter[T]
  ): DataSink[T]
  
  // Write as CSV
  def writeAsCsv(
    filePath: String,
    rowDelimiter: String = ScalaCsvOutputFormat.DEFAULT_LINE_DELIMITER,
    fieldDelimiter: String = ScalaCsvOutputFormat.DEFAULT_FIELD_DELIMITER,
    writeMode: FileSystem.WriteMode = NO_OVERWRITE
  ): DataSink[T]
}

Custom Output Formats

class DataSet[T] {
  // Use custom output format
  def write(
    outputFormat: OutputFormat[T], 
    filePath: String, 
    writeMode: FileSystem.WriteMode = NO_OVERWRITE
  ): DataSink[T]
  
  def output(outputFormat: OutputFormat[T]): DataSink[T]
}

Console Output

class DataSet[T] {
  // Print to standard output
  def print(): DataSink[T]
  
  // Print to standard error
  def printToErr(): DataSink[T]
  
  // Print on task manager (for debugging)
  def printOnTaskManager(sinkIdentifier: String): DataSink[T]
}

Collect Results

class DataSet[T] {
  // Collect all elements to driver program
  def collect(): Seq[T]
  
  // Count elements
  def count(): Long
}

CSV Reader Configuration

class CsvReader[T] {
  // Configure field parsing
  def fieldDelimiter(delimiter: String): CsvReader[T]
  def lineDelimiter(delimiter: String): CsvReader[T]
  
  // Configure data types
  def types(types: Class[_]*): CsvReader[T]
  def pojoType[P](pojoType: Class[P], fields: String*): DataSet[P]
  def tupleType[T](types: Class[_]*): DataSet[T]
  
  // Configure parsing options
  def includeFields(mask: String): CsvReader[T]
  def includeFields(includeMask: Boolean*): CsvReader[T]
  def parseQuotedStrings(delimiter: Char): CsvReader[T]
  def ignoreComments(commentPrefix: String): CsvReader[T]
  def ignoreInvalidLines(): CsvReader[T]
  def ignoreFirstLine(): CsvReader[T]
}

Usage Examples

Reading from Collections

import org.apache.flink.api.scala._

val env = ExecutionEnvironment.getExecutionEnvironment

// From collection
val data1 = env.fromCollection(List(1, 2, 3, 4, 5))

// From elements
val data2 = env.fromElements("hello", "world", "flink")

// Generate sequence  
val sequence = env.generateSequence(1, 1000)

Reading Text Files

import org.apache.flink.api.scala._

val env = ExecutionEnvironment.getExecutionEnvironment

// Read text file
val lines = env.readTextFile("/path/to/file.txt")

// Process lines
val words = lines.flatMap(_.split(" "))

Reading CSV Files

import org.apache.flink.api.scala._

val env = ExecutionEnvironment.getExecutionEnvironment

// Define case class for CSV data
case class Person(name: String, age: Int, city: String)

// Read CSV with case class
val people = env.readCsvFile[Person]("/path/to/people.csv")
  .fieldDelimiter(",")
  .includeFields("111") // name, age, city
  .ignoreFirstLine()

// Read CSV as tuples
val tuples = env.readCsvFile[(String, Int, String)]("/path/to/people.csv")
  .fieldDelimiter(",")
  .types(classOf[String], classOf[Int], classOf[String])

Writing Results

import org.apache.flink.api.scala._
import org.apache.flink.core.fs.FileSystem.WriteMode

val env = ExecutionEnvironment.getExecutionEnvironment
val result = env.fromElements(1, 2, 3, 4, 5).map(_ * 2)

// Write to text file
result.writeAsText("/path/to/output.txt", WriteMode.OVERWRITE)

// Write as CSV
case class Result(id: Int, value: Int)
val resultData = env.fromElements(Result(1, 10), Result(2, 20))
resultData.writeAsCsv("/path/to/results.csv", "\n", ",", WriteMode.OVERWRITE)

// Print to console
result.print()

// Collect to driver
val collected: Seq[Int] = result.collect()
println(s"Results: ${collected.mkString(", ")}")

Custom Input/Output Formats

import org.apache.flink.api.scala._
import org.apache.flink.api.common.io.InputFormat
import org.apache.flink.api.java.io.TextOutputFormat

val env = ExecutionEnvironment.getExecutionEnvironment

// Custom input format
class MyInputFormat extends InputFormat[String, _] {
  // Implementation details...
}

val customData = env.createInput(new MyInputFormat())

// Custom output format  
val data = env.fromElements("a", "b", "c")
data.output(new TextOutputFormat[String](new Path("/path/to/output")))