Reading data from various sources and writing results to different sinks. These operations handle data ingestion and result persistence.
Read data from various file formats and file systems.
class ExecutionEnvironment {
/**
* Reads a text file as DataSet of strings
* @param filePath Path to the text file
* @param charsetName Character encoding (default: UTF-8)
* @return DataSet of text lines
*/
def readTextFile(filePath: String, charsetName: String = "UTF-8"): DataSet[String]
/**
* Reads a text file as DataSet of StringValue objects
* @param filePath Path to the text file
* @param charsetName Character encoding (default: UTF-8)
* @return DataSet of StringValue objects
*/
def readTextFileWithValue(filePath: String, charsetName: String = "UTF-8"): DataSet[StringValue]
/**
* Reads a CSV file into typed DataSet
* @param filePath Path to the CSV file
* @return DataSet of parsed CSV records
*/
def readCsvFile[T: ClassTag: TypeInformation](filePath: String): DataSet[T]
/**
* Reads primitive values from a file
* @param filePath Path to the file
* @param delimiter Value delimiter (default: newline)
* @return DataSet of primitive values
*/
def readFileOfPrimitives[T: ClassTag: TypeInformation](
filePath: String,
delimiter: String = "\n"
): DataSet[T]
/**
* Reads file using custom input format
* @param inputFormat Custom file input format
* @param filePath Path to the file
* @return DataSet with custom format parsing
*/
def readFile[T: ClassTag: TypeInformation](
inputFormat: FileInputFormat[T],
filePath: String
): DataSet[T]
}Usage Examples:
import org.apache.flink.api.scala._
val env = ExecutionEnvironment.getExecutionEnvironment
// Read text file
val textLines = env.readTextFile("hdfs://path/to/textfile.txt")
// Read CSV file with automatic parsing
case class Person(name: String, age: Int, city: String)
val people = env.readCsvFile[Person]("hdfs://path/to/people.csv")
// Read primitive values
val numbers = env.readFileOfPrimitives[Int]("hdfs://path/to/numbers.txt")
// Read with different encoding
val utf16Text = env.readTextFile("hdfs://path/to/file.txt", "UTF-16")Create DataSets from custom input sources and formats.
class ExecutionEnvironment {
/**
* Creates DataSet from custom input format
* @param inputFormat Custom input format implementation
* @return DataSet using custom input
*/
def createInput[T: ClassTag: TypeInformation](inputFormat: InputFormat[T, _]): DataSet[T]
}Usage Examples:
import org.apache.flink.api.common.io.InputFormat
// Custom input format for reading JSON files
class JsonInputFormat[T: ClassTag] extends InputFormat[T, FileInputSplit] {
// Implementation for reading JSON data
}
val jsonData = env.createInput(new JsonInputFormat[MyDataClass])Create DataSets from in-memory collections and sequences.
class ExecutionEnvironment {
/**
* Creates DataSet from an iterable collection
* @param data Iterable collection of elements
* @return DataSet containing the collection elements
*/
def fromCollection[T: TypeInformation: ClassTag](data: Iterable[T]): DataSet[T]
/**
* Creates DataSet from an iterator
* @param data Iterator of elements
* @return DataSet containing the iterator elements
*/
def fromCollection[T: TypeInformation: ClassTag](data: Iterator[T]): DataSet[T]
/**
* Creates DataSet from individual elements
* @param data Variable arguments of elements
* @return DataSet containing the elements
*/
def fromElements[T: TypeInformation: ClassTag](data: T*): DataSet[T]
/**
* Creates DataSet from a parallel collection
* @param iterator Splittable iterator for parallel processing
* @return DataSet from parallel collection
*/
def fromParallelCollection[T: TypeInformation: ClassTag](iterator: SplittableIterator[T]): DataSet[T]
/**
* Generates a sequence of numbers
* @param from Starting number (inclusive)
* @param to Ending number (inclusive)
* @return DataSet containing the number sequence
*/
def generateSequence(from: Long, to: Long): DataSet[Long]
}Usage Examples:
// From Scala collections
val listData = env.fromCollection(List(1, 2, 3, 4, 5))
val arrayData = env.fromCollection(Array("a", "b", "c"))
// From individual elements
val elementData = env.fromElements("apple", "banana", "cherry")
// Generate number sequence
val numbers = env.generateSequence(1, 1000000)Write DataSets to various file formats and destinations.
class DataSet[T] {
/**
* Writes DataSet as text file
* @param filePath Output file path
* @param writeMode Write mode (default: NO_OVERWRITE)
* @return DataSink for the write operation
*/
def writeAsText(filePath: String, writeMode: FileSystem.WriteMode = null): DataSink[T]
/**
* Writes DataSet as CSV file
* @param filePath Output file path
* @param rowDelimiter Row delimiter (default: newline)
* @param fieldDelimiter Field delimiter (default: comma)
* @param writeMode Write mode (default: NO_OVERWRITE)
* @return DataSink for the write operation
*/
def writeAsCsv(
filePath: String,
rowDelimiter: String = "\n",
fieldDelimiter: String = ",",
writeMode: FileSystem.WriteMode = null
): DataSink[T]
/**
* Writes using custom file output format
* @param outputFormat Custom file output format
* @param filePath Output file path
* @param writeMode Write mode (default: NO_OVERWRITE)
* @return DataSink for the write operation
*/
def write(
outputFormat: FileOutputFormat[T],
filePath: String,
writeMode: FileSystem.WriteMode = null
): DataSink[T]
/**
* Writes using custom output format
* @param outputFormat Custom output format implementation
* @return DataSink for the write operation
*/
def output(outputFormat: OutputFormat[T]): DataSink[T]
}Usage Examples:
// Write as text file
data.writeAsText("hdfs://path/to/output.txt")
// Write as CSV with custom delimiters
people.writeAsCsv("hdfs://path/to/people.csv", "\n", ";")
// Overwrite existing files
results.writeAsText("hdfs://path/to/results.txt", FileSystem.WriteMode.OVERWRITE)Output DataSets to console for debugging and monitoring.
class DataSet[T] {
/**
* Prints all elements to standard output
*/
def print(): Unit
/**
* Prints all elements to standard error
*/
def printToErr(): Unit
/**
* Prints elements on task managers with identifier
* @param sinkIdentifier Identifier for the print sink
* @return DataSink for the print operation
*/
def print(sinkIdentifier: String): DataSink[T]
/**
* Prints elements to standard error on task managers with identifier
* @param sinkIdentifier Identifier for the print sink
* @return DataSink for the print operation
*/
def printToErr(sinkIdentifier: String): DataSink[T]
/**
* Prints elements on task managers with prefix
* @param prefix Prefix for each printed line
* @return DataSink for the print operation
*/
def printOnTaskManager(prefix: String): DataSink[T]
}Usage Examples:
// Print to console (for small datasets)
smallData.print()
// Print with identifier in distributed environment
data.print("MyDataStream")
// Print with prefix for identification
results.printOnTaskManager("RESULT> ")Collect DataSet elements to the driver program for inspection.
class DataSet[T] {
/**
* Collects all elements to the driver program
* @return Sequence containing all elements
*/
def collect(): Seq[T]
/**
* Counts the number of elements in the DataSet
* @return Number of elements
*/
def count(): Long
}Usage Examples:
// Collect results (use carefully with large datasets)
val results = processedData.collect()
results.foreach(println)
// Count elements
val elementCount = largeDataset.count()
println(s"Dataset contains $elementCount elements")Control behavior when output files already exist.
object FileSystem {
sealed trait WriteMode
case object OVERWRITE extends WriteMode // Overwrite existing files
case object NO_OVERWRITE extends WriteMode // Fail if files exist (default)
}Create custom sinks for specialized output requirements.
// Example: Custom output format for writing to databases
abstract class OutputFormat[T] {
/**
* Configures the output format
* @param parameters Configuration parameters
*/
def configure(parameters: Configuration): Unit
/**
* Opens the output format
* @param taskNumber Task number
* @param numTasks Total number of tasks
*/
def open(taskNumber: Int, numTasks: Int): Unit
/**
* Writes a record
* @param record Record to write
*/
def writeRecord(record: T): Unit
/**
* Closes the output format
*/
def close(): Unit
}Usage Examples:
// Custom database output format
class DatabaseOutputFormat[T] extends OutputFormat[T] {
private var connection: Connection = _
override def configure(parameters: Configuration): Unit = {
// Setup database connection parameters
}
override def open(taskNumber: Int, numTasks: Int): Unit = {
// Open database connection
}
override def writeRecord(record: T): Unit = {
// Write record to database
}
override def close(): Unit = {
// Close database connection
}
}
// Use custom output format
data.output(new DatabaseOutputFormat[Person])Access side inputs in operations through broadcast variables.
class DataSet[T] {
/**
* Adds a broadcast DataSet that can be accessed in transformations
* @param data DataSet to broadcast
* @param name Name for accessing the broadcast data
* @return DataSet with broadcast variable configured
*/
def withBroadcastSet(data: DataSet[_], name: String): DataSet[T]
}
// In function implementations, access broadcast data:
abstract class RichMapFunction[T, O] extends MapFunction[T, O] {
/**
* Gets broadcast variable by name
* @param name Broadcast variable name
* @return List of broadcast elements
*/
def getBroadcastVariable[X](name: String): java.util.List[X]
/**
* Gets broadcast variable with hint
* @param name Broadcast variable name
* @param hint Broadcast variable hint
* @return Broadcast variable with hint
*/
def getBroadcastVariableWithInitializer[X, Y](
name: String,
hint: BroadcastVariableInitializer[X, Y]
): Y
}Usage Examples:
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
// Broadcast lookup data
val lookupData = env.fromElements(("key1", "value1"), ("key2", "value2"))
// Use broadcast data in transformation
val enrichedData = data
.map(new RichMapFunction[String, String] {
var lookup: Map[String, String] = _
override def open(parameters: Configuration): Unit = {
val broadcastData = getBroadcastVariable[(String, String)]("lookup")
lookup = broadcastData.asScala.toMap
}
override def map(value: String): String = {
lookup.getOrElse(value, "unknown")
}
})
.withBroadcastSet(lookupData, "lookup")trait InputFormat[T, InputSplit] {
/**
* Configures the input format
* @param parameters Configuration parameters
*/
def configure(parameters: Configuration): Unit
/**
* Creates input splits for parallel reading
* @param minNumSplits Minimum number of splits
* @return Array of input splits
*/
def createInputSplits(minNumSplits: Int): Array[InputSplit]
/**
* Opens an input split for reading
* @param split Input split to open
*/
def open(split: InputSplit): Unit
/**
* Checks if more records are available
* @return True if more records available
*/
def reachedEnd(): Boolean
/**
* Reads the next record
* @param reuse Object to reuse for the record
* @return Next record or null if end reached
*/
def nextRecord(reuse: T): T
/**
* Closes the input format
*/
def close(): Unit
}
abstract class FileInputFormat[T] extends InputFormat[T, FileInputSplit] {
/**
* Sets the path to read from
* @param filePath File path
*/
def setFilePath(filePath: Path): Unit
/**
* Sets file paths to read from
* @param filePaths Array of file paths
*/
def setFilePaths(filePaths: Path*): Unit
}
class DataSink[T] {
/**
* Sets the parallelism for this sink
* @param parallelism Degree of parallelism
* @return DataSink with specified parallelism
*/
def setParallelism(parallelism: Int): DataSink[T]
/**
* Gets the parallelism of this sink
* @return Current parallelism setting
*/
def getParallelism: Int
/**
* Sets the name for this sink
* @param name Sink name
* @return DataSink with specified name
*/
def name(name: String): DataSink[T]
}
trait SplittableIterator[T] extends Iterator[T] {
/**
* Splits the iterator into multiple iterators
* @param numPartitions Number of partitions
* @return Array of split iterators
*/
def split(numPartitions: Int): Array[Iterator[T]]
/**
* Gets the maximum number of splits
* @return Maximum number of splits
*/
def getMaximumNumberOfSplits: Int
}
class StringValue extends Comparable[StringValue] {
/**
* Creates StringValue from string
* @param value String value
*/
def this(value: String)
/**
* Gets the string value
* @return String value
*/
def getValue: String
/**
* Sets the string value
* @param value String value
*/
def setValue(value: String): Unit
}