The Flink Scala Table API provides a comprehensive type system with automatic type inference through macros and factory methods for all Scala and Java types.
The main entry point for type information creation, providing factory methods and constants for common types.
object Types {
// Generic type creation using macros
def of[T: TypeInformation]: TypeInformation[T]
// Scala-specific type constants
val NOTHING: TypeInformation[Nothing]
val UNIT: TypeInformation[Unit]
// Primitive type constants
val STRING: TypeInformation[String]
val BYTE: TypeInformation[java.lang.Byte]
val BOOLEAN: TypeInformation[java.lang.Boolean]
val SHORT: TypeInformation[java.lang.Short]
val INT: TypeInformation[java.lang.Integer]
val LONG: TypeInformation[java.lang.Long]
val FLOAT: TypeInformation[java.lang.Float]
val DOUBLE: TypeInformation[java.lang.Double]
val CHAR: TypeInformation[java.lang.Character]
// Java numeric types
val JAVA_BIG_DEC: TypeInformation[java.math.BigDecimal]
val JAVA_BIG_INT: TypeInformation[java.math.BigInteger]
// Date and time types
val SQL_DATE: TypeInformation[java.sql.Date]
val SQL_TIME: TypeInformation[java.sql.Time]
val SQL_TIMESTAMP: TypeInformation[java.sql.Timestamp]
val LOCAL_DATE: TypeInformation[java.time.LocalDate]
val LOCAL_TIME: TypeInformation[java.time.LocalTime]
val LOCAL_DATE_TIME: TypeInformation[java.time.LocalDateTime]
val INSTANT: TypeInformation[java.time.Instant]
}object Types {
// Row with default field names (f0, f1, f2, ...)
def ROW(types: TypeInformation[_]*): TypeInformation[Row]
// Row with custom field names
def ROW(fieldNames: Array[String], types: Array[TypeInformation[_]]): TypeInformation[Row]
}Usage examples:
// Anonymous row
val rowType = Types.ROW(Types.STRING, Types.INT, Types.DOUBLE)
// Named row
val namedRowType = Types.ROW(
Array("name", "age", "salary"),
Array(Types.STRING, Types.INT, Types.DOUBLE)
)object Types {
// Automatic POJO analysis
def POJO[T](pojoClass: Class[T]): TypeInformation[T]
// Manual POJO field specification
def POJO[T](pojoClass: Class[T], fields: Map[String, TypeInformation[_]]): TypeInformation[T]
}Usage examples:
// Java POJO class
class User {
var id: Int = _
var name: String = _
var email: String = _
}
val userType = Types.POJO(classOf[User])
// Manual field specification
val manualUserType = Types.POJO(classOf[User], Map(
"id" -> Types.INT,
"name" -> Types.STRING,
"email" -> Types.STRING
))object Types {
// Kryo-based generic serialization
def GENERIC[T](genericClass: Class[T]): TypeInformation[T]
}object Types {
// Case class type information
def CASE_CLASS[T: TypeInformation]: TypeInformation[T]
// Tuple type information (alias for CASE_CLASS)
def TUPLE[T: TypeInformation]: TypeInformation[T]
// Option type
def OPTION[A, T <: Option[A]](valueType: TypeInformation[A]): TypeInformation[T]
// Either type
def EITHER[A, B](
leftType: TypeInformation[A],
rightType: TypeInformation[B]
): TypeInformation[Either[A, B]]
// Try type
def TRY[A, T <: Try[A]](valueType: TypeInformation[A]): TypeInformation[T]
// Scala enumeration
def ENUMERATION[E <: Enumeration](
enum: E,
valueClass: Class[E#Value]
): TypeInformation[E#Value]
// Traversable collections
def TRAVERSABLE[T: TypeInformation]: TypeInformation[T]
}Usage examples:
// Case class
case class Person(name: String, age: Int)
val personType = Types.CASE_CLASS[Person]
// Option type
val optionalStringType = Types.OPTION(Types.STRING)
// Either type
val eitherType = Types.EITHER(Types.STRING, Types.INT)
// Try type
val tryIntType = Types.TRY(Types.INT)
// Collection type
val listType = Types.TRAVERSABLE[List[String]]object Types {
// Primitive arrays (no null elements)
def PRIMITIVE_ARRAY(elementType: TypeInformation[_]): TypeInformation[_]
// Object arrays (nullable elements)
def OBJECT_ARRAY[E <: AnyRef](elementType: TypeInformation[E]): TypeInformation[Array[E]]
}Usage examples:
// Primitive array
val intArrayType = Types.PRIMITIVE_ARRAY(Types.INT)
// Object array
val stringArrayType = Types.OBJECT_ARRAY(Types.STRING)Provides automatic type inference through Scala macros and implicit values.
trait ImplicitTypeConversions {
// Macro-based automatic type inference
implicit def createTypeInformation[T]: TypeInformation[T] = macro TypeUtils.createTypeInfo[T]
// Nothing type fallback
implicit val scalaNothingTypeInfo: TypeInformation[Nothing]
// Specialized Tuple2 creation
def createTuple2TypeInformation[T1, T2](
t1: TypeInformation[T1],
t2: TypeInformation[T2]
): TypeInformation[(T1, T2)]
}The type system uses Scala macros for compile-time type information generation:
object TypeUtils {
// Main macro implementation
def createTypeInfo[T: c.WeakTypeTag](c: Context): c.Expr[TypeInformation[T]]
}This enables automatic type inference:
// Automatic inference
case class Product(id: Int, name: String, price: Double)
val productType: TypeInformation[Product] = implicitly[TypeInformation[Product]]
// Or using Types.of
val productType2 = Types.of[Product]All type information classes implement the base TypeInformation[T] interface:
abstract class TypeInformation[T] {
def getTypeClass: Class[T]
def isBasicType: Boolean
def isTupleType: Boolean
def getArity: Int
def getTotalFields: Int
def isKeyType: Boolean
def createSerializer(config: SerializerConfig): TypeSerializer[T]
def equals(obj: Any): Boolean
def hashCode(): Int
def toString: String
}For types that support comparison and hashing:
trait AtomicType[T] extends TypeInformation[T] {
def createComparator(sortOrderAscending: Boolean, executionConfig: ExecutionConfig): TypeComparator[T]
}For types with multiple fields:
abstract class CompositeType[T] extends TypeInformation[T] {
def getFieldNames: Array[String]
def getFieldIndex(fieldName: String): Int
def getFieldTypes: Array[TypeInformation[_]]
def getTypeAt[X](fieldExpression: String): TypeInformation[X]
def getFlatFields(fieldExpression: String): List[FlatFieldDescriptor]
}import org.apache.flink.table.api._
// Case class with automatic inference
case class Order(id: Int, product: String, amount: Double, date: LocalDate)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
// Type information automatically inferred from case class
val orders = env.fromElements(
Order(1, "laptop", 999.99, LocalDate.now()),
Order(2, "mouse", 29.99, LocalDate.now())
)
val ordersTable = tEnv.fromDataStream(orders) // Type info automatic// When automatic inference fails or explicit control needed
val rowType = Types.ROW(
Array("id", "name", "email", "age"),
Array(Types.INT, Types.STRING, Types.STRING, Types.INT)
)
// Create table with explicit schema
val schema = Schema.newBuilder()
.column("id", DataTypes.INT())
.column("name", DataTypes.STRING())
.column("email", DataTypes.STRING())
.column("age", DataTypes.INT())
.build()case class UserProfile(
id: Int,
name: String,
email: Option[String], // Optional email
status: Either[String, Int] // Error message or status code
)
// Type information automatically handles Option and Either
val profileType = Types.of[UserProfile]case class ShoppingCart(
userId: Int,
items: List[String], // List of product names
quantities: Array[Int], // Array of quantities
metadata: Map[String, String] // Additional metadata
)
// Nested collection types supported
val cartType = Types.of[ShoppingCart]// Compile-time type checking
val personType = Types.of[Person] // Verified at compile time
val wrongType = Types.of[NonExistentClass] // Compilation error// Generic types preserved through transformations
val optionalUsers: DataStream[Option[User]] = ...
val usersTable = tEnv.fromDataStream(optionalUsers) // Option[User] type preservedcase class Product(id: Int, name: String, price: Double)
val productsTable = tEnv.fromDataStream(products)
val result = productsTable.select(
$"id", // Type: Int
$"name", // Type: String
$"price" * 1.1 // Type: Double
)Thrown when type information cannot be generated:
try {
val problematicType = Types.POJO(classOf[SomeComplexClass])
} catch {
case e: InvalidTypesException =>
// Handle type analysis failure
println(s"Cannot create type info: ${e.getMessage}")
}When macro-based inference fails:
// Fallback to manual specification
implicit val myComplexTypeInfo: TypeInformation[MyComplexType] = {
// Custom type information implementation
Types.GENERIC(classOf[MyComplexType])
}