CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-table-2-11

Apache Flink Table API for SQL-like operations on streaming and batch data

Pending
Overview
Eval results
Files

type-system.mddocs/

Type System

The Flink Table API provides a rich type system supporting primitive types, complex types, and temporal types. The Types object serves as the main factory for creating type information used throughout the API.

Capabilities

Primitive Types

Basic data types for common values.

object Types {
  /**
   * String/VARCHAR type for text data
   */
  val STRING: TypeInformation[String]
  
  /**
   * Boolean type for true/false values
   */
  val BOOLEAN: TypeInformation[java.lang.Boolean]
  
  /**
   * TINYINT type for small integers (-128 to 127)
   */
  val BYTE: TypeInformation[java.lang.Byte]
  
  /**
   * SMALLINT type for short integers (-32,768 to 32,767)
   */
  val SHORT: TypeInformation[java.lang.Short]
  
  /**
   * INT/INTEGER type for standard integers
   */
  val INT: TypeInformation[java.lang.Integer]
  
  /**
   * BIGINT type for large integers
   */
  val LONG: TypeInformation[java.lang.Long]
  
  /**
   * FLOAT/REAL type for single-precision floating point
   */
  val FLOAT: TypeInformation[java.lang.Float]
  
  /**
   * DOUBLE type for double-precision floating point
   */
  val DOUBLE: TypeInformation[java.lang.Double]
  
  /**
   * DECIMAL type for high-precision decimal numbers
   */
  val DECIMAL: TypeInformation[java.math.BigDecimal]
}

Usage Examples:

import org.apache.flink.table.api.Types

// Using primitive types in schema definition
val schema = new TableSchema(
  Array("name", "age", "salary", "active"),
  Array(Types.STRING, Types.INT, Types.DOUBLE, Types.BOOLEAN)
)

// Type information for table sources
val csvSource = new CsvTableSource(
  "/path/to/data.csv",
  Array("id", "name", "score"),
  Array(Types.LONG, Types.STRING, Types.DOUBLE)
)

Date and Time Types

Temporal types for handling date, time, and timestamp data.

object Types {
  /**
   * DATE type for calendar dates (year, month, day)
   */
  val SQL_DATE: TypeInformation[java.sql.Date]
  
  /**
   * TIME type for time of day (hour, minute, second)
   */
  val SQL_TIME: TypeInformation[java.sql.Time]
  
  /**
   * TIMESTAMP type for date and time with millisecond precision
   */
  val SQL_TIMESTAMP: TypeInformation[java.sql.Timestamp]
  
  /**
   * INTERVAL type for month-based intervals
   */
  val INTERVAL_MONTHS: TypeInformation[java.lang.Integer]
  
  /**
   * INTERVAL type for millisecond-based intervals  
   */
  val INTERVAL_MILLIS: TypeInformation[java.lang.Long]
}

Usage Examples:

// Event table with timestamps
val eventSchema = new TableSchema(
  Array("event_id", "event_time", "event_date", "duration"),
  Array(Types.LONG, Types.SQL_TIMESTAMP, Types.SQL_DATE, Types.INTERVAL_MILLIS)
)

// Using in SQL queries
val events = tEnv.sqlQuery("""
  SELECT event_id, 
         DATE_FORMAT(event_time, 'yyyy-MM-dd') as date,
         event_time + INTERVAL '1' HOUR as next_hour
  FROM Events
""")

Complex Types

Structured types for handling nested and collection data.

object Types {
  /**
   * ROW type with anonymous fields (field names: f0, f1, f2, ...)
   * @param types Field types in order
   * @returns Row type information
   */
  def ROW(types: TypeInformation[_]*): TypeInformation[Row]
  
  /**
   * ROW type with named fields  
   * @param fieldNames Field names array
   * @param types Field types array
   * @returns Named row type information
   */
  def ROW(fieldNames: Array[String], types: Array[TypeInformation[_]]): TypeInformation[Row]
  
  /**
   * Array type for primitive elements
   * @param elementType Type of array elements
   * @returns Primitive array type information
   */
  def PRIMITIVE_ARRAY(elementType: TypeInformation[_]): TypeInformation[_]
  
  /**
   * Array type for object elements
   * @param elementType Type of array elements  
   * @returns Object array type information
   */
  def OBJECT_ARRAY[E](elementType: TypeInformation[E]): TypeInformation[Array[E]]
  
  /**
   * Map type for key-value pairs
   * @param keyType Type of map keys
   * @param valueType Type of map values
   * @returns Map type information
   */
  def MAP[K, V](keyType: TypeInformation[K], valueType: TypeInformation[V]): TypeInformation[java.util.Map[K, V]]
  
  /**
   * Multiset type (map with element counts)
   * @param elementType Type of multiset elements
   * @returns Multiset type information
   */
  def MULTISET[E](elementType: TypeInformation[E]): TypeInformation[java.util.Map[E, java.lang.Integer]]
}

Usage Examples:

// Row type with named fields
val personType = Types.ROW(
  Array("name", "age", "address"),
  Array(Types.STRING, Types.INT, Types.STRING)
)

// Nested row type
val addressType = Types.ROW(
  Array("street", "city", "zipCode"),
  Array(Types.STRING, Types.STRING, Types.STRING)
)

val personWithAddressType = Types.ROW(
  Array("name", "age", "address"),
  Array(Types.STRING, Types.INT, addressType)
)

// Collection types
val stringArrayType = Types.OBJECT_ARRAY(Types.STRING)
val intMapType = Types.MAP(Types.STRING, Types.INT)
val tagMultisetType = Types.MULTISET(Types.STRING)

// Complex nested structure
val orderType = Types.ROW(
  Array("orderId", "items", "customerInfo", "metadata"),
  Array(
    Types.LONG,
    Types.OBJECT_ARRAY(Types.ROW(Array("name", "quantity"), Array(Types.STRING, Types.INT))),
    personType,
    Types.MAP(Types.STRING, Types.STRING)
  )
)

Schema Management

Define and manage table schemas with type information.

/**
 * Represents the schema of a table with column names and types
 */
class TableSchema {
  /**
   * Creates a new table schema
   * @param columnNames Array of column names
   * @param columnTypes Array of corresponding column types
   */
  def this(columnNames: Array[String], columnTypes: Array[TypeInformation[_]])
  
  /**
   * Gets the column names of the schema
   * @returns Array of column names
   */
  def getColumnNames: Array[String]
  
  /**
   * Gets all type information as an array
   * @returns Array of column type information
   */
  def getTypes: Array[TypeInformation[_]]
  
  /**
   * Gets the number of columns in the schema
   * @returns Column count
   */
  def getColumnCount: Int
  
  /**
   * Gets the type information for a specific column by name
   * @param columnName Name of the column
   * @returns Optional type information for the column
   */
  def getType(columnName: String): Option[TypeInformation[_]]
  
  /**
   * Gets the type information for a specific column by index
   * @param columnIndex Index of the column (0-based)
   * @returns Optional type information for the column
   */
  def getType(columnIndex: Int): Option[TypeInformation[_]]
  
  /**
   * Gets the column name for a specific column index
   * @param columnIndex Index of the column (0-based)
   * @returns Optional column name
   */
  def getColumnName(columnIndex: Int): Option[String]
  
  /**
   * Converts time attributes to proper TIMESTAMP types
   * @returns TableSchema with time attributes replaced by TIMESTAMP
   */
  def withoutTimeAttributes: TableSchema
  
  /**
   * Creates a deep copy of the TableSchema
   * @returns New TableSchema instance with copied data
   */
  def copy: TableSchema
}

/**
 * Factory methods for creating TableSchema instances
 */
object TableSchema {
  /**
   * Creates a TableSchema from TypeInformation
   * @param typeInfo Type information to convert
   * @returns TableSchema created from the type information
   */
  def fromTypeInfo(typeInfo: TypeInformation[_]): TableSchema
  
  /**
   * Creates a new TableSchemaBuilder for fluent schema construction
   * @returns New TableSchemaBuilder instance
   */
  def builder(): TableSchemaBuilder
}

/**
 * Builder for constructing TableSchema instances
 */
class TableSchemaBuilder {
  /**
   * Adds a field to the schema being built
   * @param name Field name
   * @param tpe Field type information
   * @returns This builder for method chaining
   */
  def field(name: String, tpe: TypeInformation[_]): TableSchemaBuilder
  
  /**
   * Builds the TableSchema from added fields
   * @returns New TableSchema instance
   */
  def build(): TableSchema
}

Usage Examples:

// Create schema for user table
val userSchema = new TableSchema(
  Array("userId", "userName", "profile", "tags", "settings"),
  Array(
    Types.LONG,
    Types.STRING, 
    Types.ROW(Array("email", "phone"), Array(Types.STRING, Types.STRING)),
    Types.OBJECT_ARRAY(Types.STRING),
    Types.MAP(Types.STRING, Types.STRING)
  )
)

// Schema inspection
val columnNames = userSchema.getColumnNames  // Array("userId", "userName", ...)
val columnCount = userSchema.getColumnCount  // 5
val userIdType = userSchema.getType("userId")  // Some(Types.LONG)
val firstColumnName = userSchema.getColumnName(0)  // Some("userId")
val allTypes = userSchema.getTypes  // Array of TypeInformation

// Builder pattern usage
val builtSchema = TableSchema.builder()
  .field("id", Types.LONG)
  .field("name", Types.STRING)
  .field("score", Types.DOUBLE)
  .build()

// Create schema from TypeInformation
val schemaFromType = TableSchema.fromTypeInfo(Types.ROW(Types.STRING, Types.INT))

// Create copy and remove time attributes
val schemaCopy = userSchema.copy
val withoutTime = userSchema.withoutTimeAttributes

// Use schema in table source
val jsonSource = new JsonTableSource("/path/to/users.json", userSchema)

Type Conversion and Casting

Convert between different type representations.

/**
 * Generic Row class for representing structured data
 */
case class Row(fields: Any*) {
  /**
   * Gets the field value at the specified position
   * @param pos Field position (0-based)
   * @returns Field value
   */
  def getField(pos: Int): Any
  
  /**
   * Gets the number of fields in the row
   * @returns Field count
   */
  def getArity: Int
  
  /**
   * Sets the field value at the specified position
   * @param pos Field position (0-based)  
   * @param value New field value
   */
  def setField(pos: Int, value: Any): Unit
}

object Row {
  /**
   * Creates a new row with the specified field values
   * @param values Field values in order
   * @returns New Row instance
   */
  def of(values: Any*): Row
}

Usage Examples:

// Creating rows programmatically
val userRow = Row.of(
  1L,                                    // userId: LONG
  "john_doe",                           // userName: STRING  
  Row.of("john@example.com", "555-1234"), // profile: ROW
  Array("admin", "user"),               // tags: ARRAY
  Map("theme" -> "dark", "lang" -> "en").asJava // settings: MAP
)

// Accessing row fields
val userId = userRow.getField(0).asInstanceOf[Long]
val profile = userRow.getField(2).asInstanceOf[Row]
val email = profile.getField(0).asInstanceOf[String]

// Using in table transformations
val dataSet = env.fromElements(userRow)
val table = dataSet.toTable(tEnv, 'userId, 'userName, 'profile, 'tags, 'settings)

Custom Type Registration

Register custom types for specialized data handling.

/**
 * Type information interface for custom types
 */
trait TypeInformation[T] {
  /**
   * Gets the Java class of the type
   * @returns Java class
   */
  def getTypeClass: Class[T]
  
  /**
   * Checks if the type is a basic type
   * @returns True if basic type, false otherwise
   */
  def isBasicType: Boolean
  
  /**
   * Gets the total number of fields for composite types
   * @returns Field count for composite types, 1 for basic types
   */
  def getTotalFields: Int
}

Usage Examples:

// Custom POJO type
case class Customer(id: Long, name: String, email: String, age: Int)

// Create type information for custom type (automatic in Scala)
val customerTypeInfo = createTypeInformation[Customer]

// Use in table operations
val customers: DataSet[Customer] = env.fromElements(
  Customer(1, "Alice", "alice@example.com", 25),
  Customer(2, "Bob", "bob@example.com", 30)
)

val customerTable = customers.toTable(tEnv, 'id, 'name, 'email, 'age)

Type Compatibility and Conversion

The type system provides automatic conversions between compatible types and explicit casting operations.

// Implicit conversions are available for compatible types
val stringToInt = table.select('stringField.cast(Types.INT))
val timestampToString = table.select('timestampField.cast(Types.STRING))

// Type validation in expressions
val typedExpression = 'age.cast(Types.DOUBLE) * 1.5

Types

object Types
class TableSchema
case class Row
trait TypeInformation[T]

// Built-in type constants
val STRING: TypeInformation[String]
val BOOLEAN: TypeInformation[java.lang.Boolean]
val BYTE: TypeInformation[java.lang.Byte]
val SHORT: TypeInformation[java.lang.Short]
val INT: TypeInformation[java.lang.Integer]
val LONG: TypeInformation[java.lang.Long]
val FLOAT: TypeInformation[java.lang.Float]
val DOUBLE: TypeInformation[java.lang.Double]
val DECIMAL: TypeInformation[java.math.BigDecimal]
val SQL_DATE: TypeInformation[java.sql.Date]
val SQL_TIME: TypeInformation[java.sql.Time]
val SQL_TIMESTAMP: TypeInformation[java.sql.Timestamp]
val INTERVAL_MONTHS: TypeInformation[java.lang.Integer]
val INTERVAL_MILLIS: TypeInformation[java.lang.Long]

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-table-2-11

docs

index.md

sources-sinks.md

sql-integration.md

table-environment.md

table-operations.md

type-system.md

user-defined-functions.md

window-operations.md

tile.json