Apache Flink Scala API provides comprehensive support for reading data from various sources and writing results to different output formats with type-safe operations.
Create DataSets from in-memory collections, useful for testing and small datasets.
class ExecutionEnvironment {
// Create DataSet from Scala collections
def fromCollection[T: ClassTag : TypeInformation](data: Iterable[T]): DataSet[T]
def fromCollection[T: ClassTag : TypeInformation](data: Iterator[T]): DataSet[T]
// Create DataSet from individual elements
def fromElements[T: ClassTag : TypeInformation](data: T*): DataSet[T]
// Create DataSet from parallel splittable iterator
def fromParallelCollection[T: ClassTag : TypeInformation](data: SplittableIterator[T]): DataSet[T]
// Generate sequence of numbers
def generateSequence(from: Long, to: Long): DataSet[Long]
}Read data from various file formats with configurable encoding and parsing options.
class ExecutionEnvironment {
// Read text files
def readTextFile(filePath: String, charsetName: String = "UTF-8"): DataSet[String]
def readTextFileWithValue(filePath: String, charsetName: String = "UTF-8"): DataSet[StringValue]
// Read CSV files with type-safe parsing
def readCsvFile[T: ClassTag : TypeInformation](filePath: String): CsvReader[T]
// Read files containing primitive values
def readFileOfPrimitives[T: ClassTag : TypeInformation](filePath: String, delimiter: String): DataSet[T]
def readFileOfPrimitives[T: ClassTag : TypeInformation](filePath: String, typeClass: Class[T]): DataSet[T]
}class ExecutionEnvironment {
// Use custom input format
def createInput[T: ClassTag : TypeInformation](inputFormat: InputFormat[T, _]): DataSet[T]
}class DataSet[T] {
// Write to text files
def writeAsText(filePath: String, writeMode: FileSystem.WriteMode = NO_OVERWRITE): DataSink[T]
// Write with custom text formatting
def writeAsFormattedText(
filePath: String,
writeMode: FileSystem.WriteMode = NO_OVERWRITE,
format: TextFormatter[T]
): DataSink[T]
// Write as CSV
def writeAsCsv(
filePath: String,
rowDelimiter: String = ScalaCsvOutputFormat.DEFAULT_LINE_DELIMITER,
fieldDelimiter: String = ScalaCsvOutputFormat.DEFAULT_FIELD_DELIMITER,
writeMode: FileSystem.WriteMode = NO_OVERWRITE
): DataSink[T]
}class DataSet[T] {
// Use custom output format
def write(
outputFormat: OutputFormat[T],
filePath: String,
writeMode: FileSystem.WriteMode = NO_OVERWRITE
): DataSink[T]
def output(outputFormat: OutputFormat[T]): DataSink[T]
}class DataSet[T] {
// Print to standard output
def print(): DataSink[T]
// Print to standard error
def printToErr(): DataSink[T]
// Print on task manager (for debugging)
def printOnTaskManager(sinkIdentifier: String): DataSink[T]
}class DataSet[T] {
// Collect all elements to driver program
def collect(): Seq[T]
// Count elements
def count(): Long
}class CsvReader[T] {
// Configure field parsing
def fieldDelimiter(delimiter: String): CsvReader[T]
def lineDelimiter(delimiter: String): CsvReader[T]
// Configure data types
def types(types: Class[_]*): CsvReader[T]
def pojoType[P](pojoType: Class[P], fields: String*): DataSet[P]
def tupleType[T](types: Class[_]*): DataSet[T]
// Configure parsing options
def includeFields(mask: String): CsvReader[T]
def includeFields(includeMask: Boolean*): CsvReader[T]
def parseQuotedStrings(delimiter: Char): CsvReader[T]
def ignoreComments(commentPrefix: String): CsvReader[T]
def ignoreInvalidLines(): CsvReader[T]
def ignoreFirstLine(): CsvReader[T]
}import org.apache.flink.api.scala._
val env = ExecutionEnvironment.getExecutionEnvironment
// From collection
val data1 = env.fromCollection(List(1, 2, 3, 4, 5))
// From elements
val data2 = env.fromElements("hello", "world", "flink")
// Generate sequence
val sequence = env.generateSequence(1, 1000)import org.apache.flink.api.scala._
val env = ExecutionEnvironment.getExecutionEnvironment
// Read text file
val lines = env.readTextFile("/path/to/file.txt")
// Process lines
val words = lines.flatMap(_.split(" "))import org.apache.flink.api.scala._
val env = ExecutionEnvironment.getExecutionEnvironment
// Define case class for CSV data
case class Person(name: String, age: Int, city: String)
// Read CSV with case class
val people = env.readCsvFile[Person]("/path/to/people.csv")
.fieldDelimiter(",")
.includeFields("111") // name, age, city
.ignoreFirstLine()
// Read CSV as tuples
val tuples = env.readCsvFile[(String, Int, String)]("/path/to/people.csv")
.fieldDelimiter(",")
.types(classOf[String], classOf[Int], classOf[String])import org.apache.flink.api.scala._
import org.apache.flink.core.fs.FileSystem.WriteMode
val env = ExecutionEnvironment.getExecutionEnvironment
val result = env.fromElements(1, 2, 3, 4, 5).map(_ * 2)
// Write to text file
result.writeAsText("/path/to/output.txt", WriteMode.OVERWRITE)
// Write as CSV
case class Result(id: Int, value: Int)
val resultData = env.fromElements(Result(1, 10), Result(2, 20))
resultData.writeAsCsv("/path/to/results.csv", "\n", ",", WriteMode.OVERWRITE)
// Print to console
result.print()
// Collect to driver
val collected: Seq[Int] = result.collect()
println(s"Results: ${collected.mkString(", ")}")import org.apache.flink.api.scala._
import org.apache.flink.api.common.io.InputFormat
import org.apache.flink.api.java.io.TextOutputFormat
val env = ExecutionEnvironment.getExecutionEnvironment
// Custom input format
class MyInputFormat extends InputFormat[String, _] {
// Implementation details...
}
val customData = env.createInput(new MyInputFormat())
// Custom output format
val data = env.fromElements("a", "b", "c")
data.output(new TextOutputFormat[String](new Path("/path/to/output")))