Apache Flink Table API for SQL-like operations on streaming and batch data
—
The Flink Table API provides pluggable interfaces for integrating external data systems through table sources and sinks. Sources read data into tables, while sinks write table results to external systems.
Base interfaces for reading data from external systems into Flink tables.
/**
* Base interface for table sources
* @tparam T Type of records produced by the source
*/
trait TableSource[T] {
/**
* Gets the return type of the source
* @returns Type information for produced records
*/
def getReturnType: TypeInformation[T]
/**
* Gets the schema of the produced table
* @returns TableSchema describing field names and types
*/
def getTableSchema: TableSchema
/**
* Returns a string explanation of the source
* @returns Description of the source for debugging
*/
def explainSource(): String
}Sources for batch processing that integrate with DataSet API.
/**
* Table source for batch processing
* @tparam T Type of records produced by the source
*/
trait BatchTableSource[T] extends TableSource[T] {
/**
* Creates a DataSet from the source
* @param execEnv Batch execution environment
* @returns DataSet containing the source data
*/
def getDataSet(execEnv: ExecutionEnvironment): DataSet[T]
}Usage Examples:
// Custom CSV batch source
class CsvBatchSource(
filePath: String,
fieldNames: Array[String],
fieldTypes: Array[TypeInformation[_]]
) extends BatchTableSource[Row] {
override def getReturnType: TypeInformation[Row] = {
Types.ROW(fieldNames, fieldTypes)
}
override def getTableSchema: TableSchema = {
new TableSchema(fieldNames, fieldTypes)
}
override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
execEnv.readTextFile(filePath)
.map(line => {
val fields = line.split(",")
Row.of(fields: _*)
})
}
override def explainSource(): String = s"CsvBatchSource($filePath)"
}
// Register and use batch source
val csvSource = new CsvBatchSource(
"/path/to/data.csv",
Array("id", "name", "age"),
Array(Types.LONG, Types.STRING, Types.INT)
)
tEnv.registerTableSource("CsvData", csvSource)
val table = tEnv.scan("CsvData")Sources for stream processing that integrate with DataStream API.
/**
* Table source for stream processing
* @tparam T Type of records produced by the source
*/
trait StreamTableSource[T] extends TableSource[T] {
/**
* Creates a DataStream from the source
* @param execEnv Stream execution environment
* @returns DataStream containing the source data
*/
def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T]
}Usage Examples:
// Custom Kafka-like stream source
class KafkaStreamSource(
topic: String,
fieldNames: Array[String],
fieldTypes: Array[TypeInformation[_]]
) extends StreamTableSource[Row] {
override def getReturnType: TypeInformation[Row] = {
Types.ROW(fieldNames, fieldTypes)
}
override def getTableSchema: TableSchema = {
new TableSchema(fieldNames, fieldTypes)
}
override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
// Simulate Kafka consumer
execEnv.addSource(new SourceFunction[Row] {
var running = true
override def run(ctx: SourceContext[Row]): Unit = {
while (running) {
// Emit sample data
ctx.collect(Row.of(System.currentTimeMillis(), "sample_data", 42))
Thread.sleep(1000)
}
}
override def cancel(): Unit = {
running = false
}
})
}
override def explainSource(): String = s"KafkaStreamSource($topic)"
}
// Register and use stream source
val kafkaSource = new KafkaStreamSource(
"events",
Array("timestamp", "message", "value"),
Array(Types.LONG, Types.STRING, Types.INT)
)
tEnv.registerTableSource("KafkaEvents", kafkaSource)
val eventTable = tEnv.scan("KafkaEvents")Enhanced source interfaces supporting optimization features.
/**
* Source that supports projection pushdown
*/
trait ProjectableTableSource[T] extends TableSource[T] {
/**
* Creates a new source with projected fields
* @param fields Array of projected field indices
* @returns New source instance with projection applied
*/
def projectFields(fields: Array[Int]): TableSource[T]
/**
* Checks if projection pushdown is supported
* @returns True if projection is supported
*/
def supportsProjection: Boolean = true
}
/**
* Source that supports filter pushdown
*/
trait FilterableTableSource[T] extends TableSource[T] {
/**
* Creates a new source with pushed-down filters
* @param predicates Array of filter expressions
* @returns New source instance with filters applied
*/
def applyPredicate(predicates: java.util.List[Expression]): TableSource[T]
/**
* Checks if filter pushdown is supported
* @returns True if filtering is supported
*/
def supportsFiltering: Boolean = true
}
/**
* Source with custom field mapping
*/
trait DefinedFieldMapping extends TableSource[_] {
/**
* Defines mapping from physical to logical fields
* @returns Map from logical field name to physical field name
*/
def getFieldMapping: java.util.Map[String, String]
}
/**
* Source that defines rowtime attributes for event time
*/
trait DefinedRowtimeAttributes extends TableSource[_] {
/**
* Gets rowtime attribute descriptors
* @returns List of rowtime attribute descriptors
*/
def getRowtimeAttributeDescriptors: java.util.List[RowtimeAttributeDescriptor]
}
/**
* Source that defines processing time attribute
*/
trait DefinedProctimeAttribute extends TableSource[_] {
/**
* Gets the processing time attribute name
* @returns Processing time attribute name, null if none
*/
def getProctimeAttribute: String
}Usage Examples:
// Advanced source with projection and filtering
class OptimizedCsvSource(
filePath: String,
fieldNames: Array[String],
fieldTypes: Array[TypeInformation[_]]
) extends BatchTableSource[Row]
with ProjectableTableSource[Row]
with FilterableTableSource[Row] {
private var projectedFields: Option[Array[Int]] = None
private var filters: List[Expression] = List.empty
override def projectFields(fields: Array[Int]): TableSource[Row] = {
val newSource = new OptimizedCsvSource(filePath, fieldNames, fieldTypes)
newSource.projectedFields = Some(fields)
newSource.filters = this.filters
newSource
}
override def applyPredicate(predicates: java.util.List[Expression]): TableSource[Row] = {
val newSource = new OptimizedCsvSource(filePath, fieldNames, fieldTypes)
newSource.projectedFields = this.projectedFields
newSource.filters = predicates.asScala.toList
newSource
}
override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
var dataSet = execEnv.readTextFile(filePath)
.map(line => {
val fields = line.split(",")
Row.of(fields: _*)
})
// Apply filters if present
filters.foreach { filter =>
// Apply filter logic (simplified)
dataSet = dataSet.filter(row => evaluateFilter(row, filter))
}
// Apply projection if present
projectedFields match {
case Some(fields) => dataSet.map(row => projectRow(row, fields))
case None => dataSet
}
}
private def evaluateFilter(row: Row, filter: Expression): Boolean = {
// Simplified filter evaluation
true
}
private def projectRow(row: Row, fields: Array[Int]): Row = {
Row.of(fields.map(row.getField): _*)
}
// Other required methods...
override def getReturnType: TypeInformation[Row] = Types.ROW(fieldNames, fieldTypes)
override def getTableSchema: TableSchema = new TableSchema(fieldNames, fieldTypes)
override def explainSource(): String = s"OptimizedCsvSource($filePath)"
}Base interfaces for writing table results to external systems.
/**
* Base interface for table sinks
* @tparam T Type of records consumed by the sink
*/
trait TableSink[T] {
/**
* Gets the expected input type
* @returns Type information for consumed records
*/
def getOutputType: TypeInformation[T]
/**
* Configures the sink with field information
* @param fieldNames Array of field names
* @param fieldTypes Array of field types
* @returns Configured sink instance
*/
def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[T]
}Sinks for batch processing that integrate with DataSet API.
/**
* Table sink for batch processing
* @tparam T Type of records consumed by the sink
*/
trait BatchTableSink[T] extends TableSink[T] {
/**
* Emits the DataSet to the sink
* @param dataSet DataSet to write
* @param execEnv Batch execution environment
*/
def emitDataSet(dataSet: DataSet[T], execEnv: ExecutionEnvironment): Unit
}Usage Examples:
// Custom CSV batch sink
class CsvBatchSink(outputPath: String) extends BatchTableSink[Row] {
private var fieldNames: Array[String] = _
private var fieldTypes: Array[TypeInformation[_]] = _
override def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = {
val newSink = new CsvBatchSink(outputPath)
newSink.fieldNames = fieldNames
newSink.fieldTypes = fieldTypes
newSink
}
override def getOutputType: TypeInformation[Row] = {
Types.ROW(fieldNames, fieldTypes)
}
override def emitDataSet(dataSet: DataSet[Row], execEnv: ExecutionEnvironment): Unit = {
dataSet
.map(row => (0 until row.getArity).map(row.getField).mkString(","))
.writeAsText(outputPath)
}
}
// Register and use batch sink
val csvSink = new CsvBatchSink("/path/to/output.csv")
tEnv.registerTableSink("CsvOutput", fieldNames, fieldTypes, csvSink)
table.insertInto("CsvOutput")Sinks for stream processing with different consistency guarantees.
/**
* Append-only stream sink for insert-only tables
* @tparam T Type of records consumed by the sink
*/
trait AppendStreamTableSink[T] extends TableSink[T] {
/**
* Emits the DataStream to the sink
* @param dataStream DataStream to write
* @param execEnv Stream execution environment
*/
def emitDataStream(dataStream: DataStream[T], execEnv: StreamExecutionEnvironment): Unit
}
/**
* Retract stream sink for tables with updates and deletes
* @tparam T Type of records consumed by the sink
*/
trait RetractStreamTableSink[T] extends TableSink[T] {
/**
* Emits the retract DataStream to the sink
* @param dataStream DataStream of (Boolean, T) where Boolean indicates add/retract
* @param execEnv Stream execution environment
*/
def emitDataStream(dataStream: DataStream[(Boolean, T)], execEnv: StreamExecutionEnvironment): Unit
}
/**
* Upsert stream sink for tables with primary keys
* @tparam T Type of records consumed by the sink
*/
trait UpsertStreamTableSink[T] extends TableSink[T] {
/**
* Gets the primary key fields for upsert operations
* @returns Array of primary key field names
*/
def getKeys: Array[String]
/**
* Indicates if the sink expects upsert or retract stream
* @returns True for upsert stream, false for retract stream
*/
def isUpsertMode: Boolean
/**
* Emits the upsert DataStream to the sink
* @param dataStream DataStream of (Boolean, T) for upsert/delete operations
* @param execEnv Stream execution environment
*/
def emitDataStream(dataStream: DataStream[(Boolean, T)], execEnv: StreamExecutionEnvironment): Unit
}Usage Examples:
// Append-only stream sink
class PrintAppendSink extends AppendStreamTableSink[Row] {
private var fieldNames: Array[String] = _
private var fieldTypes: Array[TypeInformation[_]] = _
override def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = {
val newSink = new PrintAppendSink()
newSink.fieldNames = fieldNames
newSink.fieldTypes = fieldTypes
newSink
}
override def getOutputType: TypeInformation[Row] = Types.ROW(fieldNames, fieldTypes)
override def emitDataStream(dataStream: DataStream[Row], execEnv: StreamExecutionEnvironment): Unit = {
dataStream.print()
}
}
// Retract stream sink for aggregated results
class DatabaseRetractSink(jdbcUrl: String) extends RetractStreamTableSink[Row] {
private var fieldNames: Array[String] = _
private var fieldTypes: Array[TypeInformation[_]] = _
override def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = {
val newSink = new DatabaseRetractSink(jdbcUrl)
newSink.fieldNames = fieldNames
newSink.fieldTypes = fieldTypes
newSink
}
override def getOutputType: TypeInformation[Row] = Types.ROW(fieldNames, fieldTypes)
override def emitDataStream(dataStream: DataStream[(Boolean, Row)], execEnv: StreamExecutionEnvironment): Unit = {
dataStream.addSink(new SinkFunction[(Boolean, Row)] {
override def invoke(value: (Boolean, Row)): Unit = {
val (isAdd, row) = value
if (isAdd) {
// Insert or update row in database
insertOrUpdate(row)
} else {
// Delete row from database
delete(row)
}
}
})
}
private def insertOrUpdate(row: Row): Unit = {
// Database insertion/update logic
}
private def delete(row: Row): Unit = {
// Database deletion logic
}
}
// Upsert stream sink with primary key
class KafkaUpsertSink(topic: String, keyFields: Array[String]) extends UpsertStreamTableSink[Row] {
private var fieldNames: Array[String] = _
private var fieldTypes: Array[TypeInformation[_]] = _
override def getKeys: Array[String] = keyFields
override def isUpsertMode: Boolean = true
override def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = {
val newSink = new KafkaUpsertSink(topic, keyFields)
newSink.fieldNames = fieldNames
newSink.fieldTypes = fieldTypes
newSink
}
override def getOutputType: TypeInformation[Row] = Types.ROW(fieldNames, fieldTypes)
override def emitDataStream(dataStream: DataStream[(Boolean, Row)], execEnv: StreamExecutionEnvironment): Unit = {
dataStream.addSink(new SinkFunction[(Boolean, Row)] {
override def invoke(value: (Boolean, Row)): Unit = {
val (isUpsert, row) = value
val key = extractKey(row)
val message = if (isUpsert) serializeRow(row) else null // null for delete
sendToKafka(topic, key, message)
}
})
}
private def extractKey(row: Row): String = {
// Extract key fields from row
keyFields.map(field => row.getField(fieldNames.indexOf(field))).mkString("|")
}
private def serializeRow(row: Row): String = {
// Serialize row to JSON or other format
(0 until row.getArity).map(row.getField).mkString(",")
}
private def sendToKafka(topic: String, key: String, message: String): Unit = {
// Send to Kafka
}
}Methods for registering sources and sinks with the table environment.
// TableEnvironment methods for source/sink registration
def registerTableSource(name: String, tableSource: TableSource[_]): Unit
def registerTableSink(name: String, fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]], tableSink: TableSink[_]): Unit
// Creating tables from sources
def fromTableSource(source: TableSource[_]): TableUsage Examples:
// Register multiple sources and sinks
val csvSource = new CsvBatchSource("/input/data.csv", Array("id", "name"), Array(Types.LONG, Types.STRING))
val kafkaSource = new KafkaStreamSource("events", Array("timestamp", "message"), Array(Types.LONG, Types.STRING))
val printSink = new PrintAppendSink()
tEnv.registerTableSource("CsvInput", csvSource)
tEnv.registerTableSource("KafkaInput", kafkaSource)
tEnv.registerTableSink("PrintOutput", Array("result"), Array(Types.STRING), printSink)
// Use registered sources and sinks
val csvTable = tEnv.scan("CsvInput")
val kafkaTable = tEnv.scan("KafkaInput")
val result = csvTable.union(kafkaTable.select('message.cast(Types.STRING)))
result.insertInto("PrintOutput")trait TableSource[T]
trait BatchTableSource[T] extends TableSource[T]
trait StreamTableSource[T] extends TableSource[T]
trait ProjectableTableSource[T] extends TableSource[T]
trait FilterableTableSource[T] extends TableSource[T]
trait DefinedFieldMapping extends TableSource[_]
trait DefinedRowtimeAttributes extends TableSource[_]
trait DefinedProctimeAttribute extends TableSource[_]
trait TableSink[T]
trait BatchTableSink[T] extends TableSink[T]
trait AppendStreamTableSink[T] extends TableSink[T]
trait RetractStreamTableSink[T] extends TableSink[T]
trait UpsertStreamTableSink[T] extends TableSink[T]
case class RowtimeAttributeDescriptor(attributeName: String, timestampExtractor: TimestampExtractor, watermarkStrategy: WatermarkStrategy)Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-table-2-11