or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

dataset-operations.mdexecution-environment.mdgrouped-dataset-operations.mdindex.mdjoin-operations.mdtype-system.mdutility-functions.md
tile.json

type-system.mddocs/

Type System

The Flink Scala API provides a comprehensive type system that automatically generates TypeInformation for Scala types, ensuring type safety and efficient serialization.

Type Information Basics

import org.apache.flink.api.common.typeinfo.TypeInformation

// Implicit type information generation (macro-based)
implicit def createTypeInformation[T]: TypeInformation[T] = macro TypeUtils.createTypeInfo[T]

// Explicit type information for Nothing (compiler workaround)
implicit val scalaNothingTypeInfo: TypeInformation[Nothing] = new ScalaNothingTypeInfo()

Types Object

The Types object provides factory methods and constants for common TypeInformation instances.

Basic Type Constants

object Types {
  // Primitive types
  val NOTHING: TypeInformation[Nothing]
  val UNIT: TypeInformation[Unit]  
  val STRING: TypeInformation[String]
  val BYTE: TypeInformation[Byte]
  val BOOLEAN: TypeInformation[Boolean]
  val SHORT: TypeInformation[Short]
  val INT: TypeInformation[Int]
  val LONG: TypeInformation[Long]
  val FLOAT: TypeInformation[Float]
  val DOUBLE: TypeInformation[Double]
  val CHAR: TypeInformation[Char]
  
  // Date and time types
  val SQL_DATE: TypeInformation[java.sql.Date]
  val SQL_TIME: TypeInformation[java.sql.Time] 
  val SQL_TIMESTAMP: TypeInformation[java.sql.Timestamp]
  val LOCAL_DATE: TypeInformation[java.time.LocalDate]
  val LOCAL_TIME: TypeInformation[java.time.LocalTime]
  val LOCAL_DATE_TIME: TypeInformation[java.time.LocalDateTime]
  val INSTANT: TypeInformation[java.time.Instant]
}

Usage Examples

import org.apache.flink.api.scala.typeutils.Types

// Using type constants
val stringDataSet: DataSet[String] = env.fromElements("hello", "world")
val typeInfo: TypeInformation[String] = Types.STRING

// Explicit type specification when needed
val numbers = env.fromCollection(List(1, 2, 3))(Types.INT, implicitly[ClassTag[Int]])

Factory Methods

object Types {
  def of[T: TypeInformation]: TypeInformation[T]
  def ROW(types: TypeInformation[_]*): TypeInformation[Row]
  def POJO[T](pojoClass: Class[T]): TypeInformation[T]
  def CASE_CLASS[T: TypeInformation]: TypeInformation[T]
  def TUPLE[T: TypeInformation]: TypeInformation[T]
  def EITHER[A, B](leftType: TypeInformation[A], rightType: TypeInformation[B]): TypeInformation[Either[A, B]]
  def OPTION[A, T <: Option[A]](valueType: TypeInformation[A]): TypeInformation[T]
  def TRY[A, T <: Try[A]](valueType: TypeInformation[A]): TypeInformation[T]
  def ENUMERATION[E <: Enumeration](enum: E, valueClass: Class[E#Value]): TypeInformation[E#Value]
}

Factory Method Examples

import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.types.Row

// Row types
val rowType = Types.ROW(Types.STRING, Types.INT, Types.DOUBLE)
val rowData: DataSet[Row] = env.fromElements(
  Row.of("Alice", Int.box(25), Double.box(1000.0)),
  Row.of("Bob", Int.box(30), Double.box(1500.0))
)

// POJO types
class Person {
  var name: String = _
  var age: Int = _
  def this(name: String, age: Int) = {
    this()
    this.name = name
    this.age = age
  }
}

val pojoType = Types.POJO(classOf[Person])

// Case class types (usually inferred automatically)
case class Employee(name: String, salary: Double)
val caseClassType = Types.CASE_CLASS[Employee]

// Option types
val optionType = Types.OPTION[String, Option[String]](Types.STRING)
val optionalData: DataSet[Option[String]] = env.fromElements(Some("hello"), None, Some("world"))

// Either types
val eitherType = Types.EITHER[String, Int](Types.STRING, Types.INT)
val eitherData: DataSet[Either[String, Int]] = env.fromElements(Left("error"), Right(42))

// Try types  
import scala.util.{Try, Success, Failure}
val tryType = Types.TRY[Int, Try[Int]](Types.INT)
val tryData: DataSet[Try[Int]] = env.fromElements(Success(42), Failure(new Exception("error")))

Scala-Specific TypeInformation

Case Class Type Information

class CaseClassTypeInfo[T](
  clazz: Class[T],
  typeParams: Array[TypeInformation[_]],
  fieldTypes: Seq[TypeInformation[_]],
  fieldNames: Array[String]
) extends CompositeType[T]

Case classes are automatically supported with field-level access:

case class Person(name: String, age: Int, email: Option[String])

val people = env.fromElements(
  Person("Alice", 25, Some("alice@example.com")),
  Person("Bob", 30, None)
)

// Field access by name
val names = people.map(_.name)

// Field access by position (for grouping/sorting)
val groupedByAge = people.groupBy(1) // Group by age field

// Type information is automatically generated
val typeInfo = people.getType
println(s"Type: ${typeInfo}")

Tuple Type Information

// Tuples up to Tuple22 are supported
val tuples = env.fromElements(
  ("Alice", 25, true),
  ("Bob", 30, false)
)

// Access by position
val names = tuples.map(_._1)
val ages = tuples.map(_._2)

// Grouping by tuple elements
val groupedByAge = tuples.groupBy(1) // Group by second element (age)

Option Type Information

class OptionTypeInfo[T](valueTypeInfo: TypeInformation[T]) extends TypeInformation[Option[T]]
val optionalStrings: DataSet[Option[String]] = env.fromElements(
  Some("hello"), 
  None, 
  Some("world")
)

// Working with Option types
val presentValues = optionalStrings.filter(_.isDefined).map(_.get)
val defaultedValues = optionalStrings.map(_.getOrElse("N/A"))

Either Type Information

class EitherTypeInfo[A, B](
  leftTypeInfo: TypeInformation[A], 
  rightTypeInfo: TypeInformation[B]
) extends TypeInformation[Either[A, B]]
val results: DataSet[Either[String, Int]] = env.fromElements(
  Left("Error: Invalid input"),
  Right(42),
  Left("Error: Network timeout"),
  Right(100)
)

// Process Either values
val errors = results.filter(_.isLeft).map(_.left.get)
val values = results.filter(_.isRight).map(_.right.get)

Try Type Information

class TryTypeInfo[T](valueTypeInfo: TypeInformation[T]) extends TypeInformation[Try[T]]
import scala.util.{Try, Success, Failure}

val attempts: DataSet[Try[Int]] = env.fromElements(
  Success(42),
  Failure(new NumberFormatException("Invalid number")),
  Success(100)
)

// Process Try values
val successful = attempts.filter(_.isSuccess).map(_.get)
val failures = attempts.filter(_.isFailure).map(_.failed.get.getMessage)

Collection Type Information

class TraversableTypeInfo[T, C[_] <: TraversableOnce[_]](
  elementTypeInfo: TypeInformation[T]
) extends TypeInformation[C[T]]
// Scala collections are supported
val listData: DataSet[List[String]] = env.fromElements(
  List("a", "b", "c"),
  List("x", "y", "z")
)

val setData: DataSet[Set[Int]] = env.fromElements(
  Set(1, 2, 3),
  Set(4, 5, 6)
)

// Working with collections
val flattened = listData.flatMap(identity)
val sizes = listData.map(_.size)

Enumeration Type Information

class EnumValueTypeInfo[E <: Enumeration](
  enum: E, 
  valueClass: Class[E#Value]
) extends TypeInformation[E#Value]
object Status extends Enumeration {
  type Status = Value
  val ACTIVE, INACTIVE, PENDING = Value
}

import Status._

// Explicit type information for enumerations
implicit val statusTypeInfo = Types.ENUMERATION(Status, classOf[Status.Value])

val statuses: DataSet[Status] = env.fromElements(ACTIVE, INACTIVE, PENDING)

// Working with enumerations
val activeOnly = statuses.filter(_ == ACTIVE)
val statusNames = statuses.map(_.toString)

Custom Type Information

Registering Custom Types

// For custom classes, you may need to register them with Kryo
val env = ExecutionEnvironment.getExecutionEnvironment
val config = env.getConfig

// Register custom types
config.registerKryoType(classOf[MyCustomClass])
config.registerTypeWithKryoSerializer(classOf[MyCustomClass], classOf[MyCustomClassSerializer])

// Add default Kryo serializers
config.addDefaultKryoSerializer(classOf[DateTime], classOf[DateTimeSerializer])

Creating Custom TypeInformation

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.TypeSerializer

class MyCustomTypeInfo extends TypeInformation[MyCustomClass] {
  override def isBasicType: Boolean = false
  override def isTupleType: Boolean = false
  override def getArity: Int = 1
  override def getTotalFields: Int = 1
  override def getTypeClass: Class[MyCustomClass] = classOf[MyCustomClass]
  override def isKeyType: Boolean = true
  
  override def createSerializer(config: SerializerConfig): TypeSerializer[MyCustomClass] = {
    new MyCustomTypeSerializer()
  }
  
  override def toString: String = "MyCustomTypeInfo"
  override def equals(obj: Any): Boolean = obj.isInstanceOf[MyCustomTypeInfo]
  override def hashCode(): Int = classOf[MyCustomTypeInfo].hashCode()
  override def canEqual(obj: Any): Boolean = obj.isInstanceOf[MyCustomTypeInfo]
}

// Make it implicitly available
implicit val myCustomTypeInfo: TypeInformation[MyCustomClass] = new MyCustomTypeInfo()

Type Information Utilities

Field Access

// Package object utilities
def fieldNames2Indices(typeInfo: TypeInformation[_], fields: Array[String]): Array[Int]
case class Record(id: Int, name: String, value: Double)

val records = env.fromElements(Record(1, "A", 1.0), Record(2, "B", 2.0))
val typeInfo = records.getType

// Convert field names to indices (internal utility)
val indices = fieldNames2Indices(typeInfo, Array("name", "value"))
// Result: Array(1, 2) - positions of name and value fields

Tuple Type Creation

def createTuple2TypeInformation[T1, T2](
  t1: TypeInformation[T1], 
  t2: TypeInformation[T2]
): TypeInformation[(T1, T2)]
// Create tuple type information manually
val tupleType = createTuple2TypeInformation(Types.STRING, Types.INT)
val tupleData: DataSet[(String, Int)] = env.fromCollection(
  List(("Alice", 25), ("Bob", 30))
)(tupleType, implicitly[ClassTag[(String, Int)]])

Serialization

Scala-Specific Serializers

// Case class serializer for 2-tuples
class Tuple2CaseClassSerializer[T1, T2](
  clazz: Class[(T1, T2)],
  fieldSerializers: Array[TypeSerializer[_]]
) extends ScalaCaseClassSerializer[(T1, T2)]

// Base class for Scala case class serialization
abstract class ScalaCaseClassSerializer[T](
  clazz: Class[T],
  fieldSerializers: Array[TypeSerializer[_]]
) extends CaseClassSerializer[T]

Type Conversion

Java Interoperability

import org.apache.flink.api.java.{DataSet => JavaDataSet}

// Convert between Scala and Java DataSets
val scalaDataSet: DataSet[String] = env.fromElements("hello", "world")
val javaDataSet: JavaDataSet[String] = scalaDataSet.javaSet

// Wrap Java DataSet as Scala DataSet (internal)
val wrappedScalaDataSet = wrap(javaDataSet)

Common Type Patterns

Working with Complex Types

// Nested case classes
case class Address(street: String, city: String, zipCode: String)
case class Person(name: String, age: Int, address: Address)

val people = env.fromElements(
  Person("Alice", 25, Address("123 Main St", "NYC", "10001")),
  Person("Bob", 30, Address("456 Oak Ave", "LA", "90210"))
)

// Nested field access
val cities = people.map(_.address.city)
val zipcodes = people.groupBy(_.address.zipCode)

Generic Types

// Generic case classes
case class Container[T](id: String, value: T)

// Type information must be available for T
def processContainer[T: TypeInformation: ClassTag](
  containers: DataSet[Container[T]]
): DataSet[T] = {
  containers.map(_.value)
}

val stringContainers = env.fromElements(
  Container("1", "hello"),
  Container("2", "world")
)

val values = processContainer(stringContainers)

Performance Considerations

Type Information Caching

// Type information is generated once per type and cached
// Avoid creating multiple DataSets with explicit type information when not needed

// Good - lets macro generate type information
val data1 = env.fromElements(1, 2, 3)

// Unnecessary - macro would generate the same type info
val data2 = env.fromElements(1, 2, 3)(Types.INT, ClassTag.Int)

Serialization Efficiency

// Case classes are efficiently serialized
case class EfficientRecord(id: Long, value: String)  // Good

// POJOs require more overhead
class LessEfficientRecord {  // Less efficient
  var id: Long = _
  var value: String = _
}

// Tuples are very efficient for simple data
val tupleData = env.fromElements((1L, "value"))  // Very efficient