Apache Flink Scala API providing type-safe distributed stream and batch processing with idiomatic Scala constructs, functional programming features, and seamless runtime integration.
Apache Flink Scala API provides comprehensive support for reading data from various sources and writing results to different output formats with type-safe operations.
Create DataSets from in-memory collections, useful for testing and small datasets.
class ExecutionEnvironment {
// Create DataSet from Scala collections
def fromCollection[T: ClassTag : TypeInformation](data: Iterable[T]): DataSet[T]
def fromCollection[T: ClassTag : TypeInformation](data: Iterator[T]): DataSet[T]
// Create DataSet from individual elements
def fromElements[T: ClassTag : TypeInformation](data: T*): DataSet[T]
// Create DataSet from parallel splittable iterator
def fromParallelCollection[T: ClassTag : TypeInformation](data: SplittableIterator[T]): DataSet[T]
// Generate sequence of numbers
def generateSequence(from: Long, to: Long): DataSet[Long]
}Read data from various file formats with configurable encoding and parsing options.
class ExecutionEnvironment {
// Read text files
def readTextFile(filePath: String, charsetName: String = "UTF-8"): DataSet[String]
def readTextFileWithValue(filePath: String, charsetName: String = "UTF-8"): DataSet[StringValue]
// Read CSV files with type-safe parsing
def readCsvFile[T: ClassTag : TypeInformation](filePath: String): CsvReader[T]
// Read files containing primitive values
def readFileOfPrimitives[T: ClassTag : TypeInformation](filePath: String, delimiter: String): DataSet[T]
def readFileOfPrimitives[T: ClassTag : TypeInformation](filePath: String, typeClass: Class[T]): DataSet[T]
}class ExecutionEnvironment {
// Use custom input format
def createInput[T: ClassTag : TypeInformation](inputFormat: InputFormat[T, _]): DataSet[T]
}class DataSet[T] {
// Write to text files
def writeAsText(filePath: String, writeMode: FileSystem.WriteMode = NO_OVERWRITE): DataSink[T]
// Write with custom text formatting
def writeAsFormattedText(
filePath: String,
writeMode: FileSystem.WriteMode = NO_OVERWRITE,
format: TextFormatter[T]
): DataSink[T]
// Write as CSV
def writeAsCsv(
filePath: String,
rowDelimiter: String = ScalaCsvOutputFormat.DEFAULT_LINE_DELIMITER,
fieldDelimiter: String = ScalaCsvOutputFormat.DEFAULT_FIELD_DELIMITER,
writeMode: FileSystem.WriteMode = NO_OVERWRITE
): DataSink[T]
}class DataSet[T] {
// Use custom output format
def write(
outputFormat: OutputFormat[T],
filePath: String,
writeMode: FileSystem.WriteMode = NO_OVERWRITE
): DataSink[T]
def output(outputFormat: OutputFormat[T]): DataSink[T]
}class DataSet[T] {
// Print to standard output
def print(): DataSink[T]
// Print to standard error
def printToErr(): DataSink[T]
// Print on task manager (for debugging)
def printOnTaskManager(sinkIdentifier: String): DataSink[T]
}class DataSet[T] {
// Collect all elements to driver program
def collect(): Seq[T]
// Count elements
def count(): Long
}class CsvReader[T] {
// Configure field parsing
def fieldDelimiter(delimiter: String): CsvReader[T]
def lineDelimiter(delimiter: String): CsvReader[T]
// Configure data types
def types(types: Class[_]*): CsvReader[T]
def pojoType[P](pojoType: Class[P], fields: String*): DataSet[P]
def tupleType[T](types: Class[_]*): DataSet[T]
// Configure parsing options
def includeFields(mask: String): CsvReader[T]
def includeFields(includeMask: Boolean*): CsvReader[T]
def parseQuotedStrings(delimiter: Char): CsvReader[T]
def ignoreComments(commentPrefix: String): CsvReader[T]
def ignoreInvalidLines(): CsvReader[T]
def ignoreFirstLine(): CsvReader[T]
}import org.apache.flink.api.scala._
val env = ExecutionEnvironment.getExecutionEnvironment
// From collection
val data1 = env.fromCollection(List(1, 2, 3, 4, 5))
// From elements
val data2 = env.fromElements("hello", "world", "flink")
// Generate sequence
val sequence = env.generateSequence(1, 1000)import org.apache.flink.api.scala._
val env = ExecutionEnvironment.getExecutionEnvironment
// Read text file
val lines = env.readTextFile("/path/to/file.txt")
// Process lines
val words = lines.flatMap(_.split(" "))import org.apache.flink.api.scala._
val env = ExecutionEnvironment.getExecutionEnvironment
// Define case class for CSV data
case class Person(name: String, age: Int, city: String)
// Read CSV with case class
val people = env.readCsvFile[Person]("/path/to/people.csv")
.fieldDelimiter(",")
.includeFields("111") // name, age, city
.ignoreFirstLine()
// Read CSV as tuples
val tuples = env.readCsvFile[(String, Int, String)]("/path/to/people.csv")
.fieldDelimiter(",")
.types(classOf[String], classOf[Int], classOf[String])import org.apache.flink.api.scala._
import org.apache.flink.core.fs.FileSystem.WriteMode
val env = ExecutionEnvironment.getExecutionEnvironment
val result = env.fromElements(1, 2, 3, 4, 5).map(_ * 2)
// Write to text file
result.writeAsText("/path/to/output.txt", WriteMode.OVERWRITE)
// Write as CSV
case class Result(id: Int, value: Int)
val resultData = env.fromElements(Result(1, 10), Result(2, 20))
resultData.writeAsCsv("/path/to/results.csv", "\n", ",", WriteMode.OVERWRITE)
// Print to console
result.print()
// Collect to driver
val collected: Seq[Int] = result.collect()
println(s"Results: ${collected.mkString(", ")}")import org.apache.flink.api.scala._
import org.apache.flink.api.common.io.InputFormat
import org.apache.flink.api.java.io.TextOutputFormat
val env = ExecutionEnvironment.getExecutionEnvironment
// Custom input format
class MyInputFormat extends InputFormat[String, _] {
// Implementation details...
}
val customData = env.createInput(new MyInputFormat())
// Custom output format
val data = env.fromElements("a", "b", "c")
data.output(new TextOutputFormat[String](new Path("/path/to/output")))Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-scala-2-10