or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

bridge.mdexpressions.mdfunctions.mdindex.mdoperations.mdserialization.mdtypeinfo.mdtypes.md
tile.json

types.mddocs/

Type System and Type Information

The Flink Scala Table API provides a comprehensive type system with automatic type inference through macros and factory methods for all Scala and Java types.

Types Object

The main entry point for type information creation, providing factory methods and constants for common types.

object Types {
  // Generic type creation using macros
  def of[T: TypeInformation]: TypeInformation[T]
  
  // Scala-specific type constants
  val NOTHING: TypeInformation[Nothing]
  val UNIT: TypeInformation[Unit]
  
  // Primitive type constants
  val STRING: TypeInformation[String]
  val BYTE: TypeInformation[java.lang.Byte]
  val BOOLEAN: TypeInformation[java.lang.Boolean]
  val SHORT: TypeInformation[java.lang.Short]
  val INT: TypeInformation[java.lang.Integer]
  val LONG: TypeInformation[java.lang.Long]
  val FLOAT: TypeInformation[java.lang.Float]
  val DOUBLE: TypeInformation[java.lang.Double]
  val CHAR: TypeInformation[java.lang.Character]
  
  // Java numeric types
  val JAVA_BIG_DEC: TypeInformation[java.math.BigDecimal]
  val JAVA_BIG_INT: TypeInformation[java.math.BigInteger]
  
  // Date and time types
  val SQL_DATE: TypeInformation[java.sql.Date]
  val SQL_TIME: TypeInformation[java.sql.Time]
  val SQL_TIMESTAMP: TypeInformation[java.sql.Timestamp]
  val LOCAL_DATE: TypeInformation[java.time.LocalDate]
  val LOCAL_TIME: TypeInformation[java.time.LocalTime]
  val LOCAL_DATE_TIME: TypeInformation[java.time.LocalDateTime]
  val INSTANT: TypeInformation[java.time.Instant]
}

Factory Methods

Row Types

object Types {
  // Row with default field names (f0, f1, f2, ...)
  def ROW(types: TypeInformation[_]*): TypeInformation[Row]
  
  // Row with custom field names
  def ROW(fieldNames: Array[String], types: Array[TypeInformation[_]]): TypeInformation[Row]
}

Usage examples:

// Anonymous row
val rowType = Types.ROW(Types.STRING, Types.INT, Types.DOUBLE)

// Named row  
val namedRowType = Types.ROW(
  Array("name", "age", "salary"),
  Array(Types.STRING, Types.INT, Types.DOUBLE)
)

POJO Types

object Types {
  // Automatic POJO analysis
  def POJO[T](pojoClass: Class[T]): TypeInformation[T]
  
  // Manual POJO field specification
  def POJO[T](pojoClass: Class[T], fields: Map[String, TypeInformation[_]]): TypeInformation[T]
}

Usage examples:

// Java POJO class
class User {
  var id: Int = _
  var name: String = _
  var email: String = _
}

val userType = Types.POJO(classOf[User])

// Manual field specification
val manualUserType = Types.POJO(classOf[User], Map(
  "id" -> Types.INT,
  "name" -> Types.STRING,
  "email" -> Types.STRING
))

Generic Types

object Types {
  // Kryo-based generic serialization
  def GENERIC[T](genericClass: Class[T]): TypeInformation[T]
}

Scala-Specific Types

object Types {
  // Case class type information
  def CASE_CLASS[T: TypeInformation]: TypeInformation[T]
  
  // Tuple type information (alias for CASE_CLASS)
  def TUPLE[T: TypeInformation]: TypeInformation[T]
  
  // Option type
  def OPTION[A, T <: Option[A]](valueType: TypeInformation[A]): TypeInformation[T]
  
  // Either type
  def EITHER[A, B](
    leftType: TypeInformation[A], 
    rightType: TypeInformation[B]
  ): TypeInformation[Either[A, B]]
  
  // Try type
  def TRY[A, T <: Try[A]](valueType: TypeInformation[A]): TypeInformation[T]
  
  // Scala enumeration
  def ENUMERATION[E <: Enumeration](
    enum: E, 
    valueClass: Class[E#Value]
  ): TypeInformation[E#Value]
  
  // Traversable collections
  def TRAVERSABLE[T: TypeInformation]: TypeInformation[T]
}

Usage examples:

// Case class
case class Person(name: String, age: Int)
val personType = Types.CASE_CLASS[Person]

// Option type
val optionalStringType = Types.OPTION(Types.STRING)

// Either type  
val eitherType = Types.EITHER(Types.STRING, Types.INT)

// Try type
val tryIntType = Types.TRY(Types.INT)

// Collection type
val listType = Types.TRAVERSABLE[List[String]]

Array Types

object Types {
  // Primitive arrays (no null elements)
  def PRIMITIVE_ARRAY(elementType: TypeInformation[_]): TypeInformation[_]
  
  // Object arrays (nullable elements)
  def OBJECT_ARRAY[E <: AnyRef](elementType: TypeInformation[E]): TypeInformation[Array[E]]
}

Usage examples:

// Primitive array
val intArrayType = Types.PRIMITIVE_ARRAY(Types.INT)

// Object array
val stringArrayType = Types.OBJECT_ARRAY(Types.STRING)

ImplicitTypeConversions Trait

Provides automatic type inference through Scala macros and implicit values.

trait ImplicitTypeConversions {
  // Macro-based automatic type inference
  implicit def createTypeInformation[T]: TypeInformation[T] = macro TypeUtils.createTypeInfo[T]
  
  // Nothing type fallback
  implicit val scalaNothingTypeInfo: TypeInformation[Nothing]
  
  // Specialized Tuple2 creation
  def createTuple2TypeInformation[T1, T2](
    t1: TypeInformation[T1],
    t2: TypeInformation[T2]
  ): TypeInformation[(T1, T2)]
}

Macro-Based Type Inference

The type system uses Scala macros for compile-time type information generation:

object TypeUtils {
  // Main macro implementation
  def createTypeInfo[T: c.WeakTypeTag](c: Context): c.Expr[TypeInformation[T]]
}

This enables automatic type inference:

// Automatic inference
case class Product(id: Int, name: String, price: Double)
val productType: TypeInformation[Product] = implicitly[TypeInformation[Product]]

// Or using Types.of
val productType2 = Types.of[Product]

Type Information Hierarchy

Base TypeInformation Interface

All type information classes implement the base TypeInformation[T] interface:

abstract class TypeInformation[T] {
  def getTypeClass: Class[T]
  def isBasicType: Boolean
  def isTupleType: Boolean
  def getArity: Int
  def getTotalFields: Int
  def isKeyType: Boolean
  def createSerializer(config: SerializerConfig): TypeSerializer[T]
  def equals(obj: Any): Boolean
  def hashCode(): Int
  def toString: String
}

AtomicType Interface

For types that support comparison and hashing:

trait AtomicType[T] extends TypeInformation[T] {
  def createComparator(sortOrderAscending: Boolean, executionConfig: ExecutionConfig): TypeComparator[T]
}

CompositeType Interface

For types with multiple fields:

abstract class CompositeType[T] extends TypeInformation[T] {
  def getFieldNames: Array[String]
  def getFieldIndex(fieldName: String): Int
  def getFieldTypes: Array[TypeInformation[_]]
  def getTypeAt[X](fieldExpression: String): TypeInformation[X]
  def getFlatFields(fieldExpression: String): List[FlatFieldDescriptor]
}

Usage Patterns

Automatic Type Inference

import org.apache.flink.table.api._

// Case class with automatic inference  
case class Order(id: Int, product: String, amount: Double, date: LocalDate)

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)

// Type information automatically inferred from case class
val orders = env.fromElements(
  Order(1, "laptop", 999.99, LocalDate.now()),
  Order(2, "mouse", 29.99, LocalDate.now())
)
val ordersTable = tEnv.fromDataStream(orders)  // Type info automatic

Manual Type Specification

// When automatic inference fails or explicit control needed
val rowType = Types.ROW(
  Array("id", "name", "email", "age"),
  Array(Types.INT, Types.STRING, Types.STRING, Types.INT)
)

// Create table with explicit schema
val schema = Schema.newBuilder()
  .column("id", DataTypes.INT())
  .column("name", DataTypes.STRING())
  .column("email", DataTypes.STRING())
  .column("age", DataTypes.INT())
  .build()

Option and Either Types

case class UserProfile(
  id: Int,
  name: String, 
  email: Option[String],           // Optional email
  status: Either[String, Int]      // Error message or status code
)

// Type information automatically handles Option and Either
val profileType = Types.of[UserProfile]

Collection Types

case class ShoppingCart(
  userId: Int,
  items: List[String],             // List of product names
  quantities: Array[Int],          // Array of quantities
  metadata: Map[String, String]    // Additional metadata
)

// Nested collection types supported
val cartType = Types.of[ShoppingCart]

Type Safety Features

Compile-Time Verification

// Compile-time type checking
val personType = Types.of[Person]  // Verified at compile time
val wrongType = Types.of[NonExistentClass]  // Compilation error

Generic Type Preservation

// Generic types preserved through transformations
val optionalUsers: DataStream[Option[User]] = ...
val usersTable = tEnv.fromDataStream(optionalUsers)  // Option[User] type preserved

Field Access Type Safety

case class Product(id: Int, name: String, price: Double)

val productsTable = tEnv.fromDataStream(products)
val result = productsTable.select(
  $"id",           // Type: Int
  $"name",         // Type: String  
  $"price" * 1.1   // Type: Double
)

Performance Considerations

  • Macro Expansion: Type information generated at compile time with zero runtime overhead
  • Serialization Efficiency: Specialized serializers for each Scala type
  • Memory Layout: Optimized memory layout for case classes and tuples
  • Kryo Fallback: Generic types use pre-configured Kryo serialization
  • Caching: Type information instances cached and reused

Error Handling

InvalidTypesException

Thrown when type information cannot be generated:

try {
  val problematicType = Types.POJO(classOf[SomeComplexClass])
} catch {
  case e: InvalidTypesException => 
    // Handle type analysis failure
    println(s"Cannot create type info: ${e.getMessage}")
}

Type Inference Failures

When macro-based inference fails:

// Fallback to manual specification
implicit val myComplexTypeInfo: TypeInformation[MyComplexType] = {
  // Custom type information implementation
  Types.GENERIC(classOf[MyComplexType])
}