CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-table-2-11

Apache Flink Table API for SQL-like operations on streaming and batch data

Pending
Overview
Eval results
Files

sources-sinks.mddocs/

Sources and Sinks

The Flink Table API provides pluggable interfaces for integrating external data systems through table sources and sinks. Sources read data into tables, while sinks write table results to external systems.

Capabilities

Table Sources

Base interfaces for reading data from external systems into Flink tables.

/**
 * Base interface for table sources
 * @tparam T Type of records produced by the source
 */
trait TableSource[T] {
  /**
   * Gets the return type of the source
   * @returns Type information for produced records
   */
  def getReturnType: TypeInformation[T]
  
  /**
   * Gets the schema of the produced table
   * @returns TableSchema describing field names and types
   */
  def getTableSchema: TableSchema
  
  /**
   * Returns a string explanation of the source
   * @returns Description of the source for debugging
   */
  def explainSource(): String
}

Batch Table Sources

Sources for batch processing that integrate with DataSet API.

/**
 * Table source for batch processing
 * @tparam T Type of records produced by the source
 */
trait BatchTableSource[T] extends TableSource[T] {
  /**
   * Creates a DataSet from the source
   * @param execEnv Batch execution environment
   * @returns DataSet containing the source data
   */
  def getDataSet(execEnv: ExecutionEnvironment): DataSet[T]
}

Usage Examples:

// Custom CSV batch source
class CsvBatchSource(
  filePath: String,
  fieldNames: Array[String], 
  fieldTypes: Array[TypeInformation[_]]
) extends BatchTableSource[Row] {
  
  override def getReturnType: TypeInformation[Row] = {
    Types.ROW(fieldNames, fieldTypes)
  }
  
  override def getTableSchema: TableSchema = {
    new TableSchema(fieldNames, fieldTypes)
  }
  
  override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
    execEnv.readTextFile(filePath)
      .map(line => {
        val fields = line.split(",")
        Row.of(fields: _*)
      })
  }
  
  override def explainSource(): String = s"CsvBatchSource($filePath)"
}

// Register and use batch source
val csvSource = new CsvBatchSource(
  "/path/to/data.csv",
  Array("id", "name", "age"),
  Array(Types.LONG, Types.STRING, Types.INT)
)

tEnv.registerTableSource("CsvData", csvSource)
val table = tEnv.scan("CsvData")

Stream Table Sources

Sources for stream processing that integrate with DataStream API.

/**
 * Table source for stream processing
 * @tparam T Type of records produced by the source
 */
trait StreamTableSource[T] extends TableSource[T] {
  /**
   * Creates a DataStream from the source
   * @param execEnv Stream execution environment
   * @returns DataStream containing the source data
   */
  def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T]
}

Usage Examples:

// Custom Kafka-like stream source
class KafkaStreamSource(
  topic: String,
  fieldNames: Array[String],
  fieldTypes: Array[TypeInformation[_]]
) extends StreamTableSource[Row] {
  
  override def getReturnType: TypeInformation[Row] = {
    Types.ROW(fieldNames, fieldTypes)
  }
  
  override def getTableSchema: TableSchema = {
    new TableSchema(fieldNames, fieldTypes)
  }
  
  override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
    // Simulate Kafka consumer
    execEnv.addSource(new SourceFunction[Row] {
      var running = true
      
      override def run(ctx: SourceContext[Row]): Unit = {
        while (running) {
          // Emit sample data
          ctx.collect(Row.of(System.currentTimeMillis(), "sample_data", 42))
          Thread.sleep(1000)
        }
      }
      
      override def cancel(): Unit = {
        running = false
      }
    })
  }
  
  override def explainSource(): String = s"KafkaStreamSource($topic)"
}

// Register and use stream source
val kafkaSource = new KafkaStreamSource(
  "events",
  Array("timestamp", "message", "value"),
  Array(Types.LONG, Types.STRING, Types.INT)
)

tEnv.registerTableSource("KafkaEvents", kafkaSource)
val eventTable = tEnv.scan("KafkaEvents")

Advanced Source Capabilities

Enhanced source interfaces supporting optimization features.

/**
 * Source that supports projection pushdown
 */
trait ProjectableTableSource[T] extends TableSource[T] {
  /**
   * Creates a new source with projected fields
   * @param fields Array of projected field indices
   * @returns New source instance with projection applied
   */
  def projectFields(fields: Array[Int]): TableSource[T]
  
  /**
   * Checks if projection pushdown is supported
   * @returns True if projection is supported
   */
  def supportsProjection: Boolean = true
}

/**
 * Source that supports filter pushdown  
 */
trait FilterableTableSource[T] extends TableSource[T] {
  /**
   * Creates a new source with pushed-down filters
   * @param predicates Array of filter expressions
   * @returns New source instance with filters applied
   */
  def applyPredicate(predicates: java.util.List[Expression]): TableSource[T]
  
  /**
   * Checks if filter pushdown is supported
   * @returns True if filtering is supported
   */
  def supportsFiltering: Boolean = true
}

/**
 * Source with custom field mapping
 */
trait DefinedFieldMapping extends TableSource[_] {
  /**
   * Defines mapping from physical to logical fields
   * @returns Map from logical field name to physical field name
   */
  def getFieldMapping: java.util.Map[String, String]
}

/**
 * Source that defines rowtime attributes for event time
 */
trait DefinedRowtimeAttributes extends TableSource[_] {
  /**
   * Gets rowtime attribute descriptors
   * @returns List of rowtime attribute descriptors
   */
  def getRowtimeAttributeDescriptors: java.util.List[RowtimeAttributeDescriptor]
}

/**
 * Source that defines processing time attribute
 */
trait DefinedProctimeAttribute extends TableSource[_] {
  /**
   * Gets the processing time attribute name
   * @returns Processing time attribute name, null if none
   */
  def getProctimeAttribute: String
}

Usage Examples:

// Advanced source with projection and filtering
class OptimizedCsvSource(
  filePath: String,
  fieldNames: Array[String],
  fieldTypes: Array[TypeInformation[_]]
) extends BatchTableSource[Row] 
  with ProjectableTableSource[Row] 
  with FilterableTableSource[Row] {
  
  private var projectedFields: Option[Array[Int]] = None
  private var filters: List[Expression] = List.empty
  
  override def projectFields(fields: Array[Int]): TableSource[Row] = {
    val newSource = new OptimizedCsvSource(filePath, fieldNames, fieldTypes)
    newSource.projectedFields = Some(fields)
    newSource.filters = this.filters
    newSource
  }
  
  override def applyPredicate(predicates: java.util.List[Expression]): TableSource[Row] = {
    val newSource = new OptimizedCsvSource(filePath, fieldNames, fieldTypes) 
    newSource.projectedFields = this.projectedFields
    newSource.filters = predicates.asScala.toList
    newSource
  }
  
  override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
    var dataSet = execEnv.readTextFile(filePath)
      .map(line => {
        val fields = line.split(",")
        Row.of(fields: _*)
      })
    
    // Apply filters if present
    filters.foreach { filter =>
      // Apply filter logic (simplified)
      dataSet = dataSet.filter(row => evaluateFilter(row, filter))
    }
    
    // Apply projection if present
    projectedFields match {
      case Some(fields) => dataSet.map(row => projectRow(row, fields))
      case None => dataSet
    }
  }
  
  private def evaluateFilter(row: Row, filter: Expression): Boolean = {
    // Simplified filter evaluation
    true
  }
  
  private def projectRow(row: Row, fields: Array[Int]): Row = {
    Row.of(fields.map(row.getField): _*)
  }
  
  // Other required methods...
  override def getReturnType: TypeInformation[Row] = Types.ROW(fieldNames, fieldTypes)
  override def getTableSchema: TableSchema = new TableSchema(fieldNames, fieldTypes)
  override def explainSource(): String = s"OptimizedCsvSource($filePath)"
}

Table Sinks

Base interfaces for writing table results to external systems.

/**
 * Base interface for table sinks
 * @tparam T Type of records consumed by the sink
 */
trait TableSink[T] {
  /**
   * Gets the expected input type
   * @returns Type information for consumed records
   */
  def getOutputType: TypeInformation[T]
  
  /**
   * Configures the sink with field information
   * @param fieldNames Array of field names
   * @param fieldTypes Array of field types
   * @returns Configured sink instance
   */
  def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[T]
}

Batch Table Sinks

Sinks for batch processing that integrate with DataSet API.

/**
 * Table sink for batch processing
 * @tparam T Type of records consumed by the sink
 */
trait BatchTableSink[T] extends TableSink[T] {
  /**
   * Emits the DataSet to the sink
   * @param dataSet DataSet to write
   * @param execEnv Batch execution environment
   */
  def emitDataSet(dataSet: DataSet[T], execEnv: ExecutionEnvironment): Unit
}

Usage Examples:

// Custom CSV batch sink
class CsvBatchSink(outputPath: String) extends BatchTableSink[Row] {
  private var fieldNames: Array[String] = _
  private var fieldTypes: Array[TypeInformation[_]] = _
  
  override def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = {
    val newSink = new CsvBatchSink(outputPath)
    newSink.fieldNames = fieldNames
    newSink.fieldTypes = fieldTypes
    newSink
  }
  
  override def getOutputType: TypeInformation[Row] = {
    Types.ROW(fieldNames, fieldTypes)
  }
  
  override def emitDataSet(dataSet: DataSet[Row], execEnv: ExecutionEnvironment): Unit = {
    dataSet
      .map(row => (0 until row.getArity).map(row.getField).mkString(","))
      .writeAsText(outputPath)
  }
}

// Register and use batch sink
val csvSink = new CsvBatchSink("/path/to/output.csv")
tEnv.registerTableSink("CsvOutput", fieldNames, fieldTypes, csvSink)
table.insertInto("CsvOutput")

Stream Table Sinks

Sinks for stream processing with different consistency guarantees.

/**
 * Append-only stream sink for insert-only tables
 * @tparam T Type of records consumed by the sink
 */
trait AppendStreamTableSink[T] extends TableSink[T] {
  /**
   * Emits the DataStream to the sink
   * @param dataStream DataStream to write
   * @param execEnv Stream execution environment
   */
  def emitDataStream(dataStream: DataStream[T], execEnv: StreamExecutionEnvironment): Unit
}

/**
 * Retract stream sink for tables with updates and deletes
 * @tparam T Type of records consumed by the sink
 */
trait RetractStreamTableSink[T] extends TableSink[T] {
  /**
   * Emits the retract DataStream to the sink
   * @param dataStream DataStream of (Boolean, T) where Boolean indicates add/retract
   * @param execEnv Stream execution environment
   */
  def emitDataStream(dataStream: DataStream[(Boolean, T)], execEnv: StreamExecutionEnvironment): Unit
}

/**
 * Upsert stream sink for tables with primary keys
 * @tparam T Type of records consumed by the sink  
 */
trait UpsertStreamTableSink[T] extends TableSink[T] {
  /**
   * Gets the primary key fields for upsert operations
   * @returns Array of primary key field names
   */
  def getKeys: Array[String]
  
  /**
   * Indicates if the sink expects upsert or retract stream
   * @returns True for upsert stream, false for retract stream
   */
  def isUpsertMode: Boolean
  
  /**
   * Emits the upsert DataStream to the sink
   * @param dataStream DataStream of (Boolean, T) for upsert/delete operations
   * @param execEnv Stream execution environment
   */
  def emitDataStream(dataStream: DataStream[(Boolean, T)], execEnv: StreamExecutionEnvironment): Unit
}

Usage Examples:

// Append-only stream sink
class PrintAppendSink extends AppendStreamTableSink[Row] {
  private var fieldNames: Array[String] = _
  private var fieldTypes: Array[TypeInformation[_]] = _
  
  override def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = {
    val newSink = new PrintAppendSink()
    newSink.fieldNames = fieldNames
    newSink.fieldTypes = fieldTypes
    newSink
  }
  
  override def getOutputType: TypeInformation[Row] = Types.ROW(fieldNames, fieldTypes)
  
  override def emitDataStream(dataStream: DataStream[Row], execEnv: StreamExecutionEnvironment): Unit = {
    dataStream.print()
  }
}

// Retract stream sink for aggregated results
class DatabaseRetractSink(jdbcUrl: String) extends RetractStreamTableSink[Row] {
  private var fieldNames: Array[String] = _
  private var fieldTypes: Array[TypeInformation[_]] = _
  
  override def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = {
    val newSink = new DatabaseRetractSink(jdbcUrl)
    newSink.fieldNames = fieldNames  
    newSink.fieldTypes = fieldTypes
    newSink
  }
  
  override def getOutputType: TypeInformation[Row] = Types.ROW(fieldNames, fieldTypes)
  
  override def emitDataStream(dataStream: DataStream[(Boolean, Row)], execEnv: StreamExecutionEnvironment): Unit = {
    dataStream.addSink(new SinkFunction[(Boolean, Row)] {
      override def invoke(value: (Boolean, Row)): Unit = {
        val (isAdd, row) = value
        if (isAdd) {
          // Insert or update row in database
          insertOrUpdate(row)
        } else {
          // Delete row from database  
          delete(row)
        }
      }
    })
  }
  
  private def insertOrUpdate(row: Row): Unit = {
    // Database insertion/update logic
  }
  
  private def delete(row: Row): Unit = {
    // Database deletion logic
  }
}

// Upsert stream sink with primary key
class KafkaUpsertSink(topic: String, keyFields: Array[String]) extends UpsertStreamTableSink[Row] {
  private var fieldNames: Array[String] = _
  private var fieldTypes: Array[TypeInformation[_]] = _
  
  override def getKeys: Array[String] = keyFields
  override def isUpsertMode: Boolean = true
  
  override def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = {
    val newSink = new KafkaUpsertSink(topic, keyFields)
    newSink.fieldNames = fieldNames
    newSink.fieldTypes = fieldTypes
    newSink
  }
  
  override def getOutputType: TypeInformation[Row] = Types.ROW(fieldNames, fieldTypes)
  
  override def emitDataStream(dataStream: DataStream[(Boolean, Row)], execEnv: StreamExecutionEnvironment): Unit = {
    dataStream.addSink(new SinkFunction[(Boolean, Row)] {
      override def invoke(value: (Boolean, Row)): Unit = {
        val (isUpsert, row) = value
        val key = extractKey(row)
        val message = if (isUpsert) serializeRow(row) else null // null for delete
        sendToKafka(topic, key, message)
      }
    })
  }
  
  private def extractKey(row: Row): String = {
    // Extract key fields from row
    keyFields.map(field => row.getField(fieldNames.indexOf(field))).mkString("|")
  }
  
  private def serializeRow(row: Row): String = {
    // Serialize row to JSON or other format
    (0 until row.getArity).map(row.getField).mkString(",")
  }
  
  private def sendToKafka(topic: String, key: String, message: String): Unit = {
    // Send to Kafka
  }
}

Source and Sink Registration

Methods for registering sources and sinks with the table environment.

// TableEnvironment methods for source/sink registration
def registerTableSource(name: String, tableSource: TableSource[_]): Unit
def registerTableSink(name: String, fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]], tableSink: TableSink[_]): Unit

// Creating tables from sources
def fromTableSource(source: TableSource[_]): Table

Usage Examples:

// Register multiple sources and sinks
val csvSource = new CsvBatchSource("/input/data.csv", Array("id", "name"), Array(Types.LONG, Types.STRING))
val kafkaSource = new KafkaStreamSource("events", Array("timestamp", "message"), Array(Types.LONG, Types.STRING))
val printSink = new PrintAppendSink()

tEnv.registerTableSource("CsvInput", csvSource)
tEnv.registerTableSource("KafkaInput", kafkaSource)
tEnv.registerTableSink("PrintOutput", Array("result"), Array(Types.STRING), printSink)

// Use registered sources and sinks
val csvTable = tEnv.scan("CsvInput")
val kafkaTable = tEnv.scan("KafkaInput")
val result = csvTable.union(kafkaTable.select('message.cast(Types.STRING)))
result.insertInto("PrintOutput")

Types

trait TableSource[T]
trait BatchTableSource[T] extends TableSource[T]
trait StreamTableSource[T] extends TableSource[T]
trait ProjectableTableSource[T] extends TableSource[T]
trait FilterableTableSource[T] extends TableSource[T]
trait DefinedFieldMapping extends TableSource[_]
trait DefinedRowtimeAttributes extends TableSource[_]
trait DefinedProctimeAttribute extends TableSource[_]

trait TableSink[T]
trait BatchTableSink[T] extends TableSink[T]
trait AppendStreamTableSink[T] extends TableSink[T]
trait RetractStreamTableSink[T] extends TableSink[T]  
trait UpsertStreamTableSink[T] extends TableSink[T]

case class RowtimeAttributeDescriptor(attributeName: String, timestampExtractor: TimestampExtractor, watermarkStrategy: WatermarkStrategy)

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-table-2-11

docs

index.md

sources-sinks.md

sql-integration.md

table-environment.md

table-operations.md

type-system.md

user-defined-functions.md

window-operations.md

tile.json