or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

binary-operations.mddata-transformations.mdexecution-environment.mdgrouping-aggregation.mdindex.mdinput-output.mdpartitioning-distribution.mdtype-system.mdutilities.md
tile.json

type-system.mddocs/

Type System and Serialization

Comprehensive type information system and Scala-specific serialization support. The type system provides automatic type inference and specialized serializers for Scala's rich type hierarchy.

Capabilities

Automatic Type Information

Macro-based automatic type information generation for seamless type safety.

package object scala {
  /**
   * Automatic type information generation via Scala macros
   * Provides type information for any type T at compile time
   * @return TypeInformation[T] for the specified type
   */
  implicit def createTypeInformation[T]: TypeInformation[T] = macro TypeUtils.createTypeInfo[T]
  
  /**
   * Special case type information for Nothing type
   * @return TypeInformation for Nothing
   */
  implicit val scalaNothingTypeInfo: TypeInformation[Nothing]
  
  /**
   * Creates type information for Tuple2 types
   * @param t1 Type information for first element
   * @param t2 Type information for second element  
   * @return TypeInformation for (T1, T2) tuple
   */
  def createTuple2TypeInformation[T1, T2](
    t1: TypeInformation[T1], 
    t2: TypeInformation[T2]
  ): TypeInformation[(T1, T2)]
}

Types Object

Central access point for Scala-specific type information with extensive built-in type support.

object Types {
  // Basic type constants
  val NOTHING: TypeInformation[Nothing]
  val UNIT: TypeInformation[Unit]
  val STRING: TypeInformation[String]
  val BYTE: TypeInformation[java.lang.Byte]
  val BOOLEAN: TypeInformation[java.lang.Boolean]
  val SHORT: TypeInformation[java.lang.Short]
  val INT: TypeInformation[java.lang.Integer]
  val LONG: TypeInformation[java.lang.Long]
  val FLOAT: TypeInformation[java.lang.Float]
  val DOUBLE: TypeInformation[java.lang.Double]
  val CHAR: TypeInformation[java.lang.Character]
  
  // BigDecimal and BigInteger types
  val JAVA_BIG_DEC: TypeInformation[java.math.BigDecimal]
  val JAVA_BIG_INT: TypeInformation[java.math.BigInteger]
  
  // Date and time types
  val SQL_DATE: TypeInformation[java.sql.Date]
  val SQL_TIME: TypeInformation[java.sql.Time]
  val SQL_TIMESTAMP: TypeInformation[java.sql.Timestamp]
  val LOCAL_DATE: TypeInformation[java.time.LocalDate]
  val LOCAL_TIME: TypeInformation[java.time.LocalTime]
  val LOCAL_DATE_TIME: TypeInformation[java.time.LocalDateTime]
  val INSTANT: TypeInformation[java.time.Instant]
  
  /**
   * Creates type information using implicit TypeInformation
   * @return TypeInformation for type T
   */
  def of[T: TypeInformation]: TypeInformation[T]
}

Factory Methods for Complex Types

Methods for creating type information for complex data structures.

object Types {
  /**
   * Creates type information for Row types
   * @param types Type information for each field
   * @return TypeInformation for Row
   */
  def ROW(types: TypeInformation[_]*): TypeInformation[Row]
  
  /**
   * Creates type information for Row with field names
   * @param fieldNames Array of field names
   * @param types Array of field type information
   * @return TypeInformation for named Row
   */
  def ROW(fieldNames: Array[String], types: Array[TypeInformation[_]]): TypeInformation[Row]
  
  /**
   * Creates type information for POJO classes
   * @param pojoClass POJO class
   * @return TypeInformation for POJO
   */
  def POJO[T](pojoClass: Class[T]): TypeInformation[T]
  
  /**
   * Creates type information for POJO with field mapping
   * @param pojoClass POJO class
   * @param fields Map of field names to type information
   * @return TypeInformation for POJO with field mapping
   */
  def POJO[T](pojoClass: Class[T], fields: Map[String, TypeInformation[_]]): TypeInformation[T]
  
  /**
   * Creates generic type information using Kryo serialization
   * @param genericClass Class for generic serialization
   * @return TypeInformation using Kryo
   */
  def GENERIC[T](genericClass: Class[T]): TypeInformation[T]
  
  /**
   * Creates type information for case classes
   * @return TypeInformation for case class T
   */
  def CASE_CLASS[T: TypeInformation]: TypeInformation[T]
  
  /**
   * Creates type information for tuple types
   * @return TypeInformation for tuple T
   */
  def TUPLE[T: TypeInformation]: TypeInformation[T]
}

Array Type Support

Type information for primitive and object arrays.

object Types {
  /**
   * Creates type information for primitive arrays
   * @param elementType Type information for array elements
   * @return TypeInformation for primitive array
   */
  def PRIMITIVE_ARRAY(elementType: TypeInformation[_]): TypeInformation[_]
  
  /**
   * Creates type information for object arrays
   * @param elementType Type information for array elements
   * @return TypeInformation for object array
   */
  def OBJECT_ARRAY[E <: AnyRef](elementType: TypeInformation[E]): TypeInformation[Array[E]]
}

Scala Collections Type Support

Specialized type information for Scala's collection types and monads.

object Types {
  /**
   * Creates type information for Either types
   * @param leftType Type information for Left value
   * @param rightType Type information for Right value
   * @return TypeInformation for Either[A, B]
   */
  def EITHER[A, B](leftType: TypeInformation[A], rightType: TypeInformation[B]): TypeInformation[Either[A, B]]
  
  /**
   * Creates type information for Option types
   * @param valueType Type information for Option value
   * @return TypeInformation for Option[A]
   */
  def OPTION[A, T <: Option[A]](valueType: TypeInformation[A]): TypeInformation[T]
  
  /**
   * Creates type information for Try types
   * @param valueType Type information for Try value
   * @return TypeInformation for Try[A]
   */
  def TRY[A, T <: Try[A]](valueType: TypeInformation[A]): TypeInformation[T]
  
  /**
   * Creates type information for Enumeration types
   * @param enum Enumeration instance
   * @param valueClass Class for enumeration values
   * @return TypeInformation for enumeration values
   */
  def ENUMERATION[E <: Enumeration](enum: E, valueClass: Class[E#Value]): TypeInformation[E#Value]
  
  /**
   * Creates type information for Traversable collections
   * @return TypeInformation for Traversable[T]
   */
  def TRAVERSABLE[T: TypeInformation]: TypeInformation[T]
}

Usage Examples:

import org.apache.flink.api.scala._
import org.apache.flink.api.scala.typeutils.Types

// Automatic type information (most common usage)
val data = env.fromElements(1, 2, 3, 4, 5) // TypeInformation[Int] inferred

// Explicit type information using Types object
val stringType: TypeInformation[String] = Types.STRING
val intType: TypeInformation[Int] = Types.of[Int]

// Complex types
case class Person(name: String, age: Int)
val personType: TypeInformation[Person] = Types.CASE_CLASS[Person]

// Option types
val optionStringType: TypeInformation[Option[String]] = Types.OPTION(Types.STRING)

// Either types  
val eitherType: TypeInformation[Either[String, Int]] = Types.EITHER(Types.STRING, Types.INT)

// Array types
val stringArrayType: TypeInformation[Array[String]] = Types.OBJECT_ARRAY(Types.STRING)

Case Class Type Information

Specialized type information for Scala case classes with field-level access.

abstract class CaseClassTypeInfo[T <: Product] extends CompositeType[T] {
  /**
   * Gets generic type parameters
   * @return Map of generic parameter names to type information
   */ 
  def getGenericParameters: java.util.Map[String, TypeInformation[_]]
  
  /**
   * Gets field indices for specified field names
   * @param fields Array of field names
   * @return Array of field indices
   */
  def getFieldIndices(fields: Array[String]): Array[Int]
  
  /**
   * Gets flat field descriptors for field expression
   * @param fieldExpression Field expression string
   * @param offset Starting offset
   * @param result List to collect field descriptors
   */
  def getFlatFields(fieldExpression: String, offset: Int, result: java.util.List[FlatFieldDescriptor]): Unit
  
  /**
   * Gets type information for field at expression
   * @param fieldExpression Field expression
   * @return TypeInformation for field
   */
  def getTypeAt[X](fieldExpression: String): TypeInformation[X]
  
  /**
   * Gets all field names
   * @return Array of field names
   */
  def getFieldNames: Array[String]
  
  /**
   * Gets field index by name
   * @param fieldName Field name
   * @return Field index
   */
  def getFieldIndex(fieldName: String): Int
  
  /**
   * Creates type comparator builder
   * @return TypeComparatorBuilder for this type
   */
  def createTypeComparatorBuilder(): TypeComparatorBuilder[T]
}

Specialized Serializers

Type information and serializers for specific Scala types.

// Option serialization support
class OptionTypeInfo[A](valueTypeInfo: TypeInformation[A]) extends TypeInformation[Option[A]]
class OptionSerializer[A](valueSerializer: TypeSerializer[A]) extends TypeSerializer[Option[A]]
class OptionTypeComparator[A](ascending: Boolean, valueComparator: TypeComparator[A]) extends TypeComparator[Option[A]]

// Either serialization support  
class EitherTypeInfo[A, B](leftTypeInfo: TypeInformation[A], rightTypeInfo: TypeInformation[B]) extends TypeInformation[Either[A, B]]
class EitherSerializer[A, B](leftSerializer: TypeSerializer[A], rightSerializer: TypeSerializer[B]) extends TypeSerializer[Either[A, B]]

// Try serialization support
class TryTypeInfo[A](valueTypeInfo: TypeInformation[A]) extends TypeInformation[Try[A]]
class TrySerializer[A](valueSerializer: TypeSerializer[A]) extends TypeSerializer[Try[A]]

// Traversable collections support
class TraversableTypeInfo[CC[X] <: TraversableOnce[X], T](clazz: Class[CC[T]], elementTypeInfo: TypeInformation[T]) extends TypeInformation[CC[T]]
class TraversableSerializer[CC[X] <: TraversableOnce[X], T](clazz: Class[CC[T]], elementSerializer: TypeSerializer[T]) extends TypeSerializer[CC[T]]

// Enumeration support
class EnumValueTypeInfo[E <: Enumeration](enum: E, clazz: Class[E#Value]) extends TypeInformation[E#Value]
class EnumValueSerializer[E <: Enumeration](enum: E, clazz: Class[E#Value]) extends TypeSerializer[E#Value]
class EnumValueComparator[E <: Enumeration](ascending: Boolean, enum: E) extends TypeComparator[E#Value]

// Special types
class UnitTypeInfo extends TypeInformation[Unit]
class UnitSerializer extends TypeSerializer[Unit]
class ScalaNothingTypeInfo extends TypeInformation[Nothing]
class NothingSerializer extends TypeSerializer[Nothing]

Custom Type Registration

Methods for registering custom types and serializers with the type system.

class ExecutionEnvironment {
  /**
   * Registers a type with a Kryo serializer instance
   * @param clazz Class to register
   * @param serializer Serializer instance
   */
  def registerTypeWithKryoSerializer[T <: Serializer[_] with Serializable](clazz: Class[_], serializer: T): Unit
  
  /**
   * Registers a type with a Kryo serializer class
   * @param clazz Class to register  
   * @param serializer Serializer class
   */
  def registerTypeWithKryoSerializer(clazz: Class[_], serializer: Class[_ <: Serializer[_]]): Unit
  
  /**
   * Adds a default Kryo serializer for a type
   * @param clazz Class to register
   * @param serializer Serializer class
   */
  def addDefaultKryoSerializer(clazz: Class[_], serializer: Class[_ <: Serializer[_]]): Unit
  
  /**
   * Registers a type with Kryo (uses default serialization)
   * @param typeClass Class to register
   */
  def registerType(typeClass: Class[_]): Unit
}

Usage Examples:

import com.esotericsoftware.kryo.Serializer

// Register custom type with custom serializer
class MyData(val value: String)
class MyDataSerializer extends Serializer[MyData] {
  // Implementation
}

val env = ExecutionEnvironment.getExecutionEnvironment
env.registerTypeWithKryoSerializer(classOf[MyData], new MyDataSerializer)

// Register type with default Kryo serialization
env.registerType(classOf[MyData])

Type Utilities

Utility functions for working with type information.

object TypeUtils {
  /**
   * Macro for creating type information at compile time
   * @return TypeInformation for type T
   */
  def createTypeInfo[T]: TypeInformation[T] = macro // Implementation in macro
}

package object scala {
  /**
   * Wraps Java DataSet with Scala operations
   * @param set Java DataSet to wrap
   * @return Scala DataSet with type safety
   */
  private[flink] def wrap[R: ClassTag](set: JavaDataSet[R]): DataSet[R]
  
  /**
   * Gets explicit type information if available, otherwise uses provided type info
   * @param funcOrInputFormat Function or input format that might provide type info
   * @param typeInfo Fallback type information  
   * @return Explicit or fallback type information
   */
  private[flink] def explicitFirst[T](funcOrInputFormat: AnyRef, typeInfo: TypeInformation[T]): TypeInformation[T]
  
  /**
   * Converts field names to field indices for type information
   * @param typeInfo Type information containing field mapping
   * @param fields Array of field names
   * @return Array of field indices
   */
  private[flink] def fieldNames2Indices(typeInfo: TypeInformation[_], fields: Array[String]): Array[Int]
}

Types

trait TypeInformation[T] {
  /**
   * Gets the class represented by this type information
   * @return Class[T] for this type
   */
  def getTypeClass: Class[T]
  
  /**
   * Creates a serializer for this type
   * @param config Execution configuration
   * @return TypeSerializer for this type
   */
  def createSerializer(config: ExecutionConfig): TypeSerializer[T]
  
  /**
   * Gets total number of fields (for composite types)
   * @return Number of fields
   */
  def getTotalFields: Int
  
  /**
   * Gets arity (number of direct fields)
   * @return Arity of this type
   */
  def getArity: Int
  
  /**
   * Checks if this is a basic type
   * @return True if basic type
   */
  def isBasicType: Boolean
  
  /**
   * Checks if this is a tuple type
   * @return True if tuple type
   */
  def isTupleType: Boolean
  
  /**
   * Checks if this type can be used as a key
   * @return True if valid key type
   */
  def isKeyType: Boolean
}

abstract class TypeSerializer[T] {
  /**
   * Creates a copy of the given element
   * @param from Element to copy
   * @return Copy of the element
   */
  def copy(from: T): T
  
  /**
   * Creates a copy of the given element to the target object
   * @param from Source element
   * @param reuse Target object to copy to
   * @return Copy of the element
   */
  def copy(from: T, reuse: T): T
  
  /**
   * Gets the length of serialized data
   * @return Length in bytes, or -1 if variable length
   */
  def getLength: Int
  
  /**
   * Serializes the given record to the target stream
   * @param record Record to serialize
   * @param target Target data output stream
   */
  def serialize(record: T, target: DataOutputView): Unit
  
  /**
   * Deserializes a record from the source stream
   * @param source Source data input stream
   * @return Deserialized record
   */
  def deserialize(source: DataInputView): T
  
  /**
   * Deserializes a record from the source stream into reuse object
   * @param reuse Object to deserialize into
   * @param source Source data input stream
   * @return Deserialized record
   */
  def deserialize(reuse: T, source: DataInputView): T
}

abstract class CompositeType[T] extends TypeInformation[T] {
  /**
   * Gets type information for field at index
   * @param pos Field index
   * @return TypeInformation for field
   */
  def getTypeAt(pos: Int): TypeInformation[_]
  
  /**
   * Gets field names
   * @return Array of field names
   */
  def getFieldNames: Array[String]
  
  /**
   * Gets field index by name
   * @param fieldName Field name
   * @return Field index, or -1 if not found
   */
  def getFieldIndex(fieldName: String): Int
}