The Flink Scala API provides a comprehensive type system that automatically generates TypeInformation for Scala types, ensuring type safety and efficient serialization.
import org.apache.flink.api.common.typeinfo.TypeInformation
// Implicit type information generation (macro-based)
implicit def createTypeInformation[T]: TypeInformation[T] = macro TypeUtils.createTypeInfo[T]
// Explicit type information for Nothing (compiler workaround)
implicit val scalaNothingTypeInfo: TypeInformation[Nothing] = new ScalaNothingTypeInfo()The Types object provides factory methods and constants for common TypeInformation instances.
object Types {
// Primitive types
val NOTHING: TypeInformation[Nothing]
val UNIT: TypeInformation[Unit]
val STRING: TypeInformation[String]
val BYTE: TypeInformation[Byte]
val BOOLEAN: TypeInformation[Boolean]
val SHORT: TypeInformation[Short]
val INT: TypeInformation[Int]
val LONG: TypeInformation[Long]
val FLOAT: TypeInformation[Float]
val DOUBLE: TypeInformation[Double]
val CHAR: TypeInformation[Char]
// Date and time types
val SQL_DATE: TypeInformation[java.sql.Date]
val SQL_TIME: TypeInformation[java.sql.Time]
val SQL_TIMESTAMP: TypeInformation[java.sql.Timestamp]
val LOCAL_DATE: TypeInformation[java.time.LocalDate]
val LOCAL_TIME: TypeInformation[java.time.LocalTime]
val LOCAL_DATE_TIME: TypeInformation[java.time.LocalDateTime]
val INSTANT: TypeInformation[java.time.Instant]
}import org.apache.flink.api.scala.typeutils.Types
// Using type constants
val stringDataSet: DataSet[String] = env.fromElements("hello", "world")
val typeInfo: TypeInformation[String] = Types.STRING
// Explicit type specification when needed
val numbers = env.fromCollection(List(1, 2, 3))(Types.INT, implicitly[ClassTag[Int]])object Types {
def of[T: TypeInformation]: TypeInformation[T]
def ROW(types: TypeInformation[_]*): TypeInformation[Row]
def POJO[T](pojoClass: Class[T]): TypeInformation[T]
def CASE_CLASS[T: TypeInformation]: TypeInformation[T]
def TUPLE[T: TypeInformation]: TypeInformation[T]
def EITHER[A, B](leftType: TypeInformation[A], rightType: TypeInformation[B]): TypeInformation[Either[A, B]]
def OPTION[A, T <: Option[A]](valueType: TypeInformation[A]): TypeInformation[T]
def TRY[A, T <: Try[A]](valueType: TypeInformation[A]): TypeInformation[T]
def ENUMERATION[E <: Enumeration](enum: E, valueClass: Class[E#Value]): TypeInformation[E#Value]
}import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.types.Row
// Row types
val rowType = Types.ROW(Types.STRING, Types.INT, Types.DOUBLE)
val rowData: DataSet[Row] = env.fromElements(
Row.of("Alice", Int.box(25), Double.box(1000.0)),
Row.of("Bob", Int.box(30), Double.box(1500.0))
)
// POJO types
class Person {
var name: String = _
var age: Int = _
def this(name: String, age: Int) = {
this()
this.name = name
this.age = age
}
}
val pojoType = Types.POJO(classOf[Person])
// Case class types (usually inferred automatically)
case class Employee(name: String, salary: Double)
val caseClassType = Types.CASE_CLASS[Employee]
// Option types
val optionType = Types.OPTION[String, Option[String]](Types.STRING)
val optionalData: DataSet[Option[String]] = env.fromElements(Some("hello"), None, Some("world"))
// Either types
val eitherType = Types.EITHER[String, Int](Types.STRING, Types.INT)
val eitherData: DataSet[Either[String, Int]] = env.fromElements(Left("error"), Right(42))
// Try types
import scala.util.{Try, Success, Failure}
val tryType = Types.TRY[Int, Try[Int]](Types.INT)
val tryData: DataSet[Try[Int]] = env.fromElements(Success(42), Failure(new Exception("error")))class CaseClassTypeInfo[T](
clazz: Class[T],
typeParams: Array[TypeInformation[_]],
fieldTypes: Seq[TypeInformation[_]],
fieldNames: Array[String]
) extends CompositeType[T]Case classes are automatically supported with field-level access:
case class Person(name: String, age: Int, email: Option[String])
val people = env.fromElements(
Person("Alice", 25, Some("alice@example.com")),
Person("Bob", 30, None)
)
// Field access by name
val names = people.map(_.name)
// Field access by position (for grouping/sorting)
val groupedByAge = people.groupBy(1) // Group by age field
// Type information is automatically generated
val typeInfo = people.getType
println(s"Type: ${typeInfo}")// Tuples up to Tuple22 are supported
val tuples = env.fromElements(
("Alice", 25, true),
("Bob", 30, false)
)
// Access by position
val names = tuples.map(_._1)
val ages = tuples.map(_._2)
// Grouping by tuple elements
val groupedByAge = tuples.groupBy(1) // Group by second element (age)class OptionTypeInfo[T](valueTypeInfo: TypeInformation[T]) extends TypeInformation[Option[T]]val optionalStrings: DataSet[Option[String]] = env.fromElements(
Some("hello"),
None,
Some("world")
)
// Working with Option types
val presentValues = optionalStrings.filter(_.isDefined).map(_.get)
val defaultedValues = optionalStrings.map(_.getOrElse("N/A"))class EitherTypeInfo[A, B](
leftTypeInfo: TypeInformation[A],
rightTypeInfo: TypeInformation[B]
) extends TypeInformation[Either[A, B]]val results: DataSet[Either[String, Int]] = env.fromElements(
Left("Error: Invalid input"),
Right(42),
Left("Error: Network timeout"),
Right(100)
)
// Process Either values
val errors = results.filter(_.isLeft).map(_.left.get)
val values = results.filter(_.isRight).map(_.right.get)class TryTypeInfo[T](valueTypeInfo: TypeInformation[T]) extends TypeInformation[Try[T]]import scala.util.{Try, Success, Failure}
val attempts: DataSet[Try[Int]] = env.fromElements(
Success(42),
Failure(new NumberFormatException("Invalid number")),
Success(100)
)
// Process Try values
val successful = attempts.filter(_.isSuccess).map(_.get)
val failures = attempts.filter(_.isFailure).map(_.failed.get.getMessage)class TraversableTypeInfo[T, C[_] <: TraversableOnce[_]](
elementTypeInfo: TypeInformation[T]
) extends TypeInformation[C[T]]// Scala collections are supported
val listData: DataSet[List[String]] = env.fromElements(
List("a", "b", "c"),
List("x", "y", "z")
)
val setData: DataSet[Set[Int]] = env.fromElements(
Set(1, 2, 3),
Set(4, 5, 6)
)
// Working with collections
val flattened = listData.flatMap(identity)
val sizes = listData.map(_.size)class EnumValueTypeInfo[E <: Enumeration](
enum: E,
valueClass: Class[E#Value]
) extends TypeInformation[E#Value]object Status extends Enumeration {
type Status = Value
val ACTIVE, INACTIVE, PENDING = Value
}
import Status._
// Explicit type information for enumerations
implicit val statusTypeInfo = Types.ENUMERATION(Status, classOf[Status.Value])
val statuses: DataSet[Status] = env.fromElements(ACTIVE, INACTIVE, PENDING)
// Working with enumerations
val activeOnly = statuses.filter(_ == ACTIVE)
val statusNames = statuses.map(_.toString)// For custom classes, you may need to register them with Kryo
val env = ExecutionEnvironment.getExecutionEnvironment
val config = env.getConfig
// Register custom types
config.registerKryoType(classOf[MyCustomClass])
config.registerTypeWithKryoSerializer(classOf[MyCustomClass], classOf[MyCustomClassSerializer])
// Add default Kryo serializers
config.addDefaultKryoSerializer(classOf[DateTime], classOf[DateTimeSerializer])import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.TypeSerializer
class MyCustomTypeInfo extends TypeInformation[MyCustomClass] {
override def isBasicType: Boolean = false
override def isTupleType: Boolean = false
override def getArity: Int = 1
override def getTotalFields: Int = 1
override def getTypeClass: Class[MyCustomClass] = classOf[MyCustomClass]
override def isKeyType: Boolean = true
override def createSerializer(config: SerializerConfig): TypeSerializer[MyCustomClass] = {
new MyCustomTypeSerializer()
}
override def toString: String = "MyCustomTypeInfo"
override def equals(obj: Any): Boolean = obj.isInstanceOf[MyCustomTypeInfo]
override def hashCode(): Int = classOf[MyCustomTypeInfo].hashCode()
override def canEqual(obj: Any): Boolean = obj.isInstanceOf[MyCustomTypeInfo]
}
// Make it implicitly available
implicit val myCustomTypeInfo: TypeInformation[MyCustomClass] = new MyCustomTypeInfo()// Package object utilities
def fieldNames2Indices(typeInfo: TypeInformation[_], fields: Array[String]): Array[Int]case class Record(id: Int, name: String, value: Double)
val records = env.fromElements(Record(1, "A", 1.0), Record(2, "B", 2.0))
val typeInfo = records.getType
// Convert field names to indices (internal utility)
val indices = fieldNames2Indices(typeInfo, Array("name", "value"))
// Result: Array(1, 2) - positions of name and value fieldsdef createTuple2TypeInformation[T1, T2](
t1: TypeInformation[T1],
t2: TypeInformation[T2]
): TypeInformation[(T1, T2)]// Create tuple type information manually
val tupleType = createTuple2TypeInformation(Types.STRING, Types.INT)
val tupleData: DataSet[(String, Int)] = env.fromCollection(
List(("Alice", 25), ("Bob", 30))
)(tupleType, implicitly[ClassTag[(String, Int)]])// Case class serializer for 2-tuples
class Tuple2CaseClassSerializer[T1, T2](
clazz: Class[(T1, T2)],
fieldSerializers: Array[TypeSerializer[_]]
) extends ScalaCaseClassSerializer[(T1, T2)]
// Base class for Scala case class serialization
abstract class ScalaCaseClassSerializer[T](
clazz: Class[T],
fieldSerializers: Array[TypeSerializer[_]]
) extends CaseClassSerializer[T]import org.apache.flink.api.java.{DataSet => JavaDataSet}
// Convert between Scala and Java DataSets
val scalaDataSet: DataSet[String] = env.fromElements("hello", "world")
val javaDataSet: JavaDataSet[String] = scalaDataSet.javaSet
// Wrap Java DataSet as Scala DataSet (internal)
val wrappedScalaDataSet = wrap(javaDataSet)// Nested case classes
case class Address(street: String, city: String, zipCode: String)
case class Person(name: String, age: Int, address: Address)
val people = env.fromElements(
Person("Alice", 25, Address("123 Main St", "NYC", "10001")),
Person("Bob", 30, Address("456 Oak Ave", "LA", "90210"))
)
// Nested field access
val cities = people.map(_.address.city)
val zipcodes = people.groupBy(_.address.zipCode)// Generic case classes
case class Container[T](id: String, value: T)
// Type information must be available for T
def processContainer[T: TypeInformation: ClassTag](
containers: DataSet[Container[T]]
): DataSet[T] = {
containers.map(_.value)
}
val stringContainers = env.fromElements(
Container("1", "hello"),
Container("2", "world")
)
val values = processContainer(stringContainers)// Type information is generated once per type and cached
// Avoid creating multiple DataSets with explicit type information when not needed
// Good - lets macro generate type information
val data1 = env.fromElements(1, 2, 3)
// Unnecessary - macro would generate the same type info
val data2 = env.fromElements(1, 2, 3)(Types.INT, ClassTag.Int)// Case classes are efficiently serialized
case class EfficientRecord(id: Long, value: String) // Good
// POJOs require more overhead
class LessEfficientRecord { // Less efficient
var id: Long = _
var value: String = _
}
// Tuples are very efficient for simple data
val tupleData = env.fromElements((1L, "value")) // Very efficient