or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

data-sources-sinks.mdexecution-environment.mdgrouping-aggregation.mdhadoop-integration.mdindex.mditerations.mdjoins-cogroups.mdtransformations.mdtype-system.md
tile.json

type-system.mddocs/

Type System

Apache Flink Scala API provides a comprehensive type system with automatic type information generation for Scala types, ensuring type safety and efficient serialization for distributed computing.

Core Type Information

TypeInformation Generation

// Implicit type information generation via macro
implicit def createTypeInformation[T]: TypeInformation[T] = macro TypeUtils.createTypeInfo[T]

// Explicit type information for Nothing type
implicit val scalaNothingTypeInfo: TypeInformation[Nothing] = new ScalaNothingTypeInfo()

Base Type Information Classes

// Abstract base class for all type information
abstract class TypeInformation[T] {
  def getTypeClass: Class[T]
  def isBasicType: Boolean
  def isTupleType: Boolean
  def getArity: Int
  def getTotalFields: Int
  def createSerializer(config: ExecutionConfig): TypeSerializer[T]
}

// Type information for case classes
class CaseClassTypeInfo[T](
  clazz: Class[T],
  fieldTypes: Array[TypeInformation[_]],
  fieldNames: Array[String]
) extends TypeInformation[T]

Scala-Specific Type Information

Option Type Support

class OptionTypeInfo[T](
  elemType: TypeInformation[T]
) extends TypeInformation[Option[T]]

class OptionTypeComparator[T](
  elemComparator: TypeComparator[T]
) extends TypeComparator[Option[T]]

class OptionSerializer[T](
  elemSerializer: TypeSerializer[T]
) extends TypeSerializer[Option[T]]

Either Type Support

class EitherTypeInfo[A, B](
  leftType: TypeInformation[A],
  rightType: TypeInformation[B]
) extends TypeInformation[Either[A, B]]

class EitherSerializer[A, B](
  leftSerializer: TypeSerializer[A],
  rightSerializer: TypeSerializer[B]
) extends TypeSerializer[Either[A, B]]

Try Type Support

class TryTypeInfo[T](
  elemType: TypeInformation[T],
  throwableTypeInfo: TypeInformation[Throwable]
) extends TypeInformation[Try[T]]

class TrySerializer[T](
  elemSerializer: TypeSerializer[T],
  throwableSerializer: TypeSerializer[Throwable]
) extends TypeSerializer[Try[T]]

Collection Type Support

class TraversableTypeInfo[T](
  elemType: TypeInformation[T],
  traversableClass: Class[_]
) extends TypeInformation[T]

class TraversableSerializer[T](
  elemSerializer: TypeSerializer[T],
  traversableClass: Class[_]
) extends TypeSerializer[T]

Unit Type Support

class UnitTypeInfo extends TypeInformation[Unit]
class UnitSerializer extends TypeSerializer[Unit]

Enum Value Support

class EnumValueTypeInfo[E <: Enumeration](
  enum: E
) extends TypeInformation[E#Value]

class EnumValueSerializer[E <: Enumeration](
  enum: E
) extends TypeSerializer[E#Value]

class EnumValueComparator[E <: Enumeration](
  enum: E
) extends TypeComparator[E#Value]

Serialization System

Base Serializer Classes

// Abstract base class for serializers
abstract class TypeSerializer[T] {
  def duplicate(): TypeSerializer[T]
  def createInstance(): T
  def copy(from: T): T
  def copy(from: T, reuse: T): T
  def getLength: Int
  def serialize(record: T, target: DataOutputView): Unit
  def deserialize(source: DataInputView): T
  def deserialize(reuse: T, source: DataInputView): T
}

// Case class serialization
class CaseClassSerializer[T](
  clazz: Class[T],
  fieldSerializers: Array[TypeSerializer[_]]
) extends TypeSerializer[T]

Specialized Serializers

// Serializer for Nothing type  
class NothingSerializer extends TypeSerializer[Nothing]

// Tuple serialization helper
class Tuple2CaseClassSerializer[T1, T2](
  clazz: Class[(T1, T2)],
  fieldSerializers: Array[TypeSerializer[_]]
) extends CaseClassSerializer[(T1, T2)]

Type Utilities

TypeUtils Object

object TypeUtils {
  // Macro for creating type information
  def createTypeInfo[T]: TypeInformation[T] = macro TypeInformationGen.mkTypeInfo[T]
  
  // Helper methods for type analysis
  def createTuple2TypeInformation[T1, T2](
    t1: TypeInformation[T1],
    t2: TypeInformation[T2]
  ): TypeInformation[(T1, T2)]
}

Code Generation Support

// Macro-based code generation for type information
object TypeInformationGen {
  def mkTypeInfo[T: c.WeakTypeTag](c: Context): c.Expr[TypeInformation[T]]
}

// Type analysis utilities
object TypeAnalyzer {
  def getParameterType(tpe: c.Type, clazz: Class[_]): c.Type
}

// AST generation utilities  
object TreeGen {
  def mkMethodCall(receiver: c.Tree, method: String, args: c.Tree*): c.Tree
}

Usage Examples

Automatic Type Information

import org.apache.flink.api.scala._

val env = ExecutionEnvironment.getExecutionEnvironment

// Type information is automatically generated
case class Person(name: String, age: Int, city: String)

val people = env.fromElements(
  Person("Alice", 25, "New York"),
  Person("Bob", 30, "London")
)

// TypeInformation[Person] is automatically created via macro
val names = people.map(_.name) // TypeInformation[String] is automatic

Working with Option Types

import org.apache.flink.api.scala._

val env = ExecutionEnvironment.getExecutionEnvironment

case class PersonWithMiddleName(
  firstName: String, 
  middleName: Option[String], 
  lastName: String
)

val people = env.fromElements(
  PersonWithMiddleName("John", Some("David"), "Smith"),
  PersonWithMiddleName("Jane", None, "Doe")
)

// Filter people with middle names
val withMiddleNames = people.filter(_.middleName.isDefined)

// Extract middle names with default
val middleNames = people.map(p => p.middleName.getOrElse("N/A"))

Working with Either Types

import org.apache.flink.api.scala._

val env = ExecutionEnvironment.getExecutionEnvironment

// Either for error handling
case class ProcessingResult(
  id: String,
  result: Either[String, Int] // Left = error message, Right = success value
)

val results = env.fromElements(
  ProcessingResult("task1", Right(42)),
  ProcessingResult("task2", Left("Division by zero")),
  ProcessingResult("task3", Right(100))
)

// Separate successful and failed results
val successful = results.filter(_.result.isRight).map(r => (r.id, r.result.right.get))
val failed = results.filter(_.result.isLeft).map(r => (r.id, r.result.left.get))

Working with Try Types

import org.apache.flink.api.scala._
import scala.util.{Try, Success, Failure}

val env = ExecutionEnvironment.getExecutionEnvironment

case class SafeOperation(
  input: String,
  result: Try[Int]
)

val operations = env.fromElements(
  SafeOperation("42", Try("42".toInt)),
  SafeOperation("abc", Try("abc".toInt)), // This will be a Failure
  SafeOperation("100", Try("100".toInt))
)

// Process successful operations
val successResults = operations.flatMap { op =>
  op.result match {
    case Success(value) => Some((op.input, value))
    case Failure(_) => None
  }
}

// Count failures
val failureCount = operations.map(_.result).filter(_.isFailure).count()

Working with Collections

import org.apache.flink.api.scala._

val env = ExecutionEnvironment.getExecutionEnvironment

case class PersonWithHobbies(
  name: String,
  hobbies: List[String]
)

val people = env.fromElements(
  PersonWithHobbies("Alice", List("reading", "swimming", "coding")),
  PersonWithHobbies("Bob", List("gaming", "cooking")),
  PersonWithHobbies("Charlie", List())
)

// Flatten hobbies
val allHobbies = people.flatMap(_.hobbies)

// People with multiple hobbies
val socialPeople = people.filter(_.hobbies.length > 1)

Custom Case Class Serialization

import org.apache.flink.api.scala._

val env = ExecutionEnvironment.getExecutionEnvironment

// Complex nested case class
case class Address(street: String, city: String, zipCode: String)
case class Employee(
  id: Long,
  name: String,
  address: Address,
  salary: Option[Double],
  skills: List[String]
)

val employees = env.fromElements(
  Employee(
    1L, 
    "Alice Johnson", 
    Address("123 Main St", "New York", "10001"),
    Some(75000.0),
    List("Scala", "Flink", "Kafka")
  ),
  Employee(
    2L,
    "Bob Smith",
    Address("456 Oak Ave", "San Francisco", "94102"), 
    None,
    List("Java", "Spring")
  )
)

// Type information is automatically generated for nested structures
val highSkilled = employees.filter(_.skills.length >= 3)

Working with Enums

import org.apache.flink.api.scala._

object Status extends Enumeration {
  type Status = Value
  val Pending, Active, Inactive, Deleted = Value
}

case class User(id: Long, name: String, status: Status.Status)

val env = ExecutionEnvironment.getExecutionEnvironment

val users = env.fromElements(
  User(1L, "Alice", Status.Active),
  User(2L, "Bob", Status.Pending),
  User(3L, "Charlie", Status.Inactive)
)

// Filter by status
val activeUsers = users.filter(_.status == Status.Active)

// Group by status
val usersByStatus = users.groupBy(_.status.toString)

Explicit Type Information

import org.apache.flink.api.scala._
import org.apache.flink.api.common.typeinfo.TypeInformation

val env = ExecutionEnvironment.getExecutionEnvironment

// Sometimes you need explicit type information
def processGenericData[T: TypeInformation: ClassTag](data: DataSet[T]): DataSet[String] = {
  data.map(_.toString)
}

val numbers = env.fromElements(1, 2, 3, 4, 5)
val strings = processGenericData(numbers)

Best Practices

Type Safety Guidelines

  1. Use Case Classes: Preferred over tuples for better type safety and readability
  2. Leverage Automatic Type Generation: Let macros handle TypeInformation creation
  3. Handle Option Types Properly: Use flatMap/filter patterns for safe null handling
  4. Use Either for Error Handling: Better than exceptions in distributed computing
  5. Minimize Custom Serializers: Use built-in serialization when possible

Performance Considerations

  1. Avoid Deep Nesting: Complex nested structures can impact serialization performance
  2. Use Primitive Types When Possible: Better serialization performance than wrapper types
  3. Consider Kryo for Complex Types: When default serialization is insufficient
  4. Monitor Serialization Overhead: Profile serialization costs in performance-critical applications