Comprehensive type information system and Scala-specific serialization support. The type system provides automatic type inference and specialized serializers for Scala's rich type hierarchy.
Macro-based automatic type information generation for seamless type safety.
package object scala {
/**
* Automatic type information generation via Scala macros
* Provides type information for any type T at compile time
* @return TypeInformation[T] for the specified type
*/
implicit def createTypeInformation[T]: TypeInformation[T] = macro TypeUtils.createTypeInfo[T]
/**
* Special case type information for Nothing type
* @return TypeInformation for Nothing
*/
implicit val scalaNothingTypeInfo: TypeInformation[Nothing]
/**
* Creates type information for Tuple2 types
* @param t1 Type information for first element
* @param t2 Type information for second element
* @return TypeInformation for (T1, T2) tuple
*/
def createTuple2TypeInformation[T1, T2](
t1: TypeInformation[T1],
t2: TypeInformation[T2]
): TypeInformation[(T1, T2)]
}Central access point for Scala-specific type information with extensive built-in type support.
object Types {
// Basic type constants
val NOTHING: TypeInformation[Nothing]
val UNIT: TypeInformation[Unit]
val STRING: TypeInformation[String]
val BYTE: TypeInformation[java.lang.Byte]
val BOOLEAN: TypeInformation[java.lang.Boolean]
val SHORT: TypeInformation[java.lang.Short]
val INT: TypeInformation[java.lang.Integer]
val LONG: TypeInformation[java.lang.Long]
val FLOAT: TypeInformation[java.lang.Float]
val DOUBLE: TypeInformation[java.lang.Double]
val CHAR: TypeInformation[java.lang.Character]
// BigDecimal and BigInteger types
val JAVA_BIG_DEC: TypeInformation[java.math.BigDecimal]
val JAVA_BIG_INT: TypeInformation[java.math.BigInteger]
// Date and time types
val SQL_DATE: TypeInformation[java.sql.Date]
val SQL_TIME: TypeInformation[java.sql.Time]
val SQL_TIMESTAMP: TypeInformation[java.sql.Timestamp]
val LOCAL_DATE: TypeInformation[java.time.LocalDate]
val LOCAL_TIME: TypeInformation[java.time.LocalTime]
val LOCAL_DATE_TIME: TypeInformation[java.time.LocalDateTime]
val INSTANT: TypeInformation[java.time.Instant]
/**
* Creates type information using implicit TypeInformation
* @return TypeInformation for type T
*/
def of[T: TypeInformation]: TypeInformation[T]
}Methods for creating type information for complex data structures.
object Types {
/**
* Creates type information for Row types
* @param types Type information for each field
* @return TypeInformation for Row
*/
def ROW(types: TypeInformation[_]*): TypeInformation[Row]
/**
* Creates type information for Row with field names
* @param fieldNames Array of field names
* @param types Array of field type information
* @return TypeInformation for named Row
*/
def ROW(fieldNames: Array[String], types: Array[TypeInformation[_]]): TypeInformation[Row]
/**
* Creates type information for POJO classes
* @param pojoClass POJO class
* @return TypeInformation for POJO
*/
def POJO[T](pojoClass: Class[T]): TypeInformation[T]
/**
* Creates type information for POJO with field mapping
* @param pojoClass POJO class
* @param fields Map of field names to type information
* @return TypeInformation for POJO with field mapping
*/
def POJO[T](pojoClass: Class[T], fields: Map[String, TypeInformation[_]]): TypeInformation[T]
/**
* Creates generic type information using Kryo serialization
* @param genericClass Class for generic serialization
* @return TypeInformation using Kryo
*/
def GENERIC[T](genericClass: Class[T]): TypeInformation[T]
/**
* Creates type information for case classes
* @return TypeInformation for case class T
*/
def CASE_CLASS[T: TypeInformation]: TypeInformation[T]
/**
* Creates type information for tuple types
* @return TypeInformation for tuple T
*/
def TUPLE[T: TypeInformation]: TypeInformation[T]
}Type information for primitive and object arrays.
object Types {
/**
* Creates type information for primitive arrays
* @param elementType Type information for array elements
* @return TypeInformation for primitive array
*/
def PRIMITIVE_ARRAY(elementType: TypeInformation[_]): TypeInformation[_]
/**
* Creates type information for object arrays
* @param elementType Type information for array elements
* @return TypeInformation for object array
*/
def OBJECT_ARRAY[E <: AnyRef](elementType: TypeInformation[E]): TypeInformation[Array[E]]
}Specialized type information for Scala's collection types and monads.
object Types {
/**
* Creates type information for Either types
* @param leftType Type information for Left value
* @param rightType Type information for Right value
* @return TypeInformation for Either[A, B]
*/
def EITHER[A, B](leftType: TypeInformation[A], rightType: TypeInformation[B]): TypeInformation[Either[A, B]]
/**
* Creates type information for Option types
* @param valueType Type information for Option value
* @return TypeInformation for Option[A]
*/
def OPTION[A, T <: Option[A]](valueType: TypeInformation[A]): TypeInformation[T]
/**
* Creates type information for Try types
* @param valueType Type information for Try value
* @return TypeInformation for Try[A]
*/
def TRY[A, T <: Try[A]](valueType: TypeInformation[A]): TypeInformation[T]
/**
* Creates type information for Enumeration types
* @param enum Enumeration instance
* @param valueClass Class for enumeration values
* @return TypeInformation for enumeration values
*/
def ENUMERATION[E <: Enumeration](enum: E, valueClass: Class[E#Value]): TypeInformation[E#Value]
/**
* Creates type information for Traversable collections
* @return TypeInformation for Traversable[T]
*/
def TRAVERSABLE[T: TypeInformation]: TypeInformation[T]
}Usage Examples:
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.typeutils.Types
// Automatic type information (most common usage)
val data = env.fromElements(1, 2, 3, 4, 5) // TypeInformation[Int] inferred
// Explicit type information using Types object
val stringType: TypeInformation[String] = Types.STRING
val intType: TypeInformation[Int] = Types.of[Int]
// Complex types
case class Person(name: String, age: Int)
val personType: TypeInformation[Person] = Types.CASE_CLASS[Person]
// Option types
val optionStringType: TypeInformation[Option[String]] = Types.OPTION(Types.STRING)
// Either types
val eitherType: TypeInformation[Either[String, Int]] = Types.EITHER(Types.STRING, Types.INT)
// Array types
val stringArrayType: TypeInformation[Array[String]] = Types.OBJECT_ARRAY(Types.STRING)Specialized type information for Scala case classes with field-level access.
abstract class CaseClassTypeInfo[T <: Product] extends CompositeType[T] {
/**
* Gets generic type parameters
* @return Map of generic parameter names to type information
*/
def getGenericParameters: java.util.Map[String, TypeInformation[_]]
/**
* Gets field indices for specified field names
* @param fields Array of field names
* @return Array of field indices
*/
def getFieldIndices(fields: Array[String]): Array[Int]
/**
* Gets flat field descriptors for field expression
* @param fieldExpression Field expression string
* @param offset Starting offset
* @param result List to collect field descriptors
*/
def getFlatFields(fieldExpression: String, offset: Int, result: java.util.List[FlatFieldDescriptor]): Unit
/**
* Gets type information for field at expression
* @param fieldExpression Field expression
* @return TypeInformation for field
*/
def getTypeAt[X](fieldExpression: String): TypeInformation[X]
/**
* Gets all field names
* @return Array of field names
*/
def getFieldNames: Array[String]
/**
* Gets field index by name
* @param fieldName Field name
* @return Field index
*/
def getFieldIndex(fieldName: String): Int
/**
* Creates type comparator builder
* @return TypeComparatorBuilder for this type
*/
def createTypeComparatorBuilder(): TypeComparatorBuilder[T]
}Type information and serializers for specific Scala types.
// Option serialization support
class OptionTypeInfo[A](valueTypeInfo: TypeInformation[A]) extends TypeInformation[Option[A]]
class OptionSerializer[A](valueSerializer: TypeSerializer[A]) extends TypeSerializer[Option[A]]
class OptionTypeComparator[A](ascending: Boolean, valueComparator: TypeComparator[A]) extends TypeComparator[Option[A]]
// Either serialization support
class EitherTypeInfo[A, B](leftTypeInfo: TypeInformation[A], rightTypeInfo: TypeInformation[B]) extends TypeInformation[Either[A, B]]
class EitherSerializer[A, B](leftSerializer: TypeSerializer[A], rightSerializer: TypeSerializer[B]) extends TypeSerializer[Either[A, B]]
// Try serialization support
class TryTypeInfo[A](valueTypeInfo: TypeInformation[A]) extends TypeInformation[Try[A]]
class TrySerializer[A](valueSerializer: TypeSerializer[A]) extends TypeSerializer[Try[A]]
// Traversable collections support
class TraversableTypeInfo[CC[X] <: TraversableOnce[X], T](clazz: Class[CC[T]], elementTypeInfo: TypeInformation[T]) extends TypeInformation[CC[T]]
class TraversableSerializer[CC[X] <: TraversableOnce[X], T](clazz: Class[CC[T]], elementSerializer: TypeSerializer[T]) extends TypeSerializer[CC[T]]
// Enumeration support
class EnumValueTypeInfo[E <: Enumeration](enum: E, clazz: Class[E#Value]) extends TypeInformation[E#Value]
class EnumValueSerializer[E <: Enumeration](enum: E, clazz: Class[E#Value]) extends TypeSerializer[E#Value]
class EnumValueComparator[E <: Enumeration](ascending: Boolean, enum: E) extends TypeComparator[E#Value]
// Special types
class UnitTypeInfo extends TypeInformation[Unit]
class UnitSerializer extends TypeSerializer[Unit]
class ScalaNothingTypeInfo extends TypeInformation[Nothing]
class NothingSerializer extends TypeSerializer[Nothing]Methods for registering custom types and serializers with the type system.
class ExecutionEnvironment {
/**
* Registers a type with a Kryo serializer instance
* @param clazz Class to register
* @param serializer Serializer instance
*/
def registerTypeWithKryoSerializer[T <: Serializer[_] with Serializable](clazz: Class[_], serializer: T): Unit
/**
* Registers a type with a Kryo serializer class
* @param clazz Class to register
* @param serializer Serializer class
*/
def registerTypeWithKryoSerializer(clazz: Class[_], serializer: Class[_ <: Serializer[_]]): Unit
/**
* Adds a default Kryo serializer for a type
* @param clazz Class to register
* @param serializer Serializer class
*/
def addDefaultKryoSerializer(clazz: Class[_], serializer: Class[_ <: Serializer[_]]): Unit
/**
* Registers a type with Kryo (uses default serialization)
* @param typeClass Class to register
*/
def registerType(typeClass: Class[_]): Unit
}Usage Examples:
import com.esotericsoftware.kryo.Serializer
// Register custom type with custom serializer
class MyData(val value: String)
class MyDataSerializer extends Serializer[MyData] {
// Implementation
}
val env = ExecutionEnvironment.getExecutionEnvironment
env.registerTypeWithKryoSerializer(classOf[MyData], new MyDataSerializer)
// Register type with default Kryo serialization
env.registerType(classOf[MyData])Utility functions for working with type information.
object TypeUtils {
/**
* Macro for creating type information at compile time
* @return TypeInformation for type T
*/
def createTypeInfo[T]: TypeInformation[T] = macro // Implementation in macro
}
package object scala {
/**
* Wraps Java DataSet with Scala operations
* @param set Java DataSet to wrap
* @return Scala DataSet with type safety
*/
private[flink] def wrap[R: ClassTag](set: JavaDataSet[R]): DataSet[R]
/**
* Gets explicit type information if available, otherwise uses provided type info
* @param funcOrInputFormat Function or input format that might provide type info
* @param typeInfo Fallback type information
* @return Explicit or fallback type information
*/
private[flink] def explicitFirst[T](funcOrInputFormat: AnyRef, typeInfo: TypeInformation[T]): TypeInformation[T]
/**
* Converts field names to field indices for type information
* @param typeInfo Type information containing field mapping
* @param fields Array of field names
* @return Array of field indices
*/
private[flink] def fieldNames2Indices(typeInfo: TypeInformation[_], fields: Array[String]): Array[Int]
}trait TypeInformation[T] {
/**
* Gets the class represented by this type information
* @return Class[T] for this type
*/
def getTypeClass: Class[T]
/**
* Creates a serializer for this type
* @param config Execution configuration
* @return TypeSerializer for this type
*/
def createSerializer(config: ExecutionConfig): TypeSerializer[T]
/**
* Gets total number of fields (for composite types)
* @return Number of fields
*/
def getTotalFields: Int
/**
* Gets arity (number of direct fields)
* @return Arity of this type
*/
def getArity: Int
/**
* Checks if this is a basic type
* @return True if basic type
*/
def isBasicType: Boolean
/**
* Checks if this is a tuple type
* @return True if tuple type
*/
def isTupleType: Boolean
/**
* Checks if this type can be used as a key
* @return True if valid key type
*/
def isKeyType: Boolean
}
abstract class TypeSerializer[T] {
/**
* Creates a copy of the given element
* @param from Element to copy
* @return Copy of the element
*/
def copy(from: T): T
/**
* Creates a copy of the given element to the target object
* @param from Source element
* @param reuse Target object to copy to
* @return Copy of the element
*/
def copy(from: T, reuse: T): T
/**
* Gets the length of serialized data
* @return Length in bytes, or -1 if variable length
*/
def getLength: Int
/**
* Serializes the given record to the target stream
* @param record Record to serialize
* @param target Target data output stream
*/
def serialize(record: T, target: DataOutputView): Unit
/**
* Deserializes a record from the source stream
* @param source Source data input stream
* @return Deserialized record
*/
def deserialize(source: DataInputView): T
/**
* Deserializes a record from the source stream into reuse object
* @param reuse Object to deserialize into
* @param source Source data input stream
* @return Deserialized record
*/
def deserialize(reuse: T, source: DataInputView): T
}
abstract class CompositeType[T] extends TypeInformation[T] {
/**
* Gets type information for field at index
* @param pos Field index
* @return TypeInformation for field
*/
def getTypeAt(pos: Int): TypeInformation[_]
/**
* Gets field names
* @return Array of field names
*/
def getFieldNames: Array[String]
/**
* Gets field index by name
* @param fieldName Field name
* @return Field index, or -1 if not found
*/
def getFieldIndex(fieldName: String): Int
}