or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

bridge.mdexpressions.mdfunctions.mdindex.mdoperations.mdserialization.mdtypeinfo.mdtypes.md
tile.json

serialization.mddocs/

Runtime Serialization Support

Kryo-based serialization configuration and specialized serializers for efficient handling of all Scala types including collections, tuples, and special types in distributed Flink execution.

FlinkScalaKryoInstantiator

Main Kryo configuration class providing pre-configured Kryo instances optimized for Scala types.

class FlinkScalaKryoInstantiator extends KryoInstantiator {
  def newKryo: Kryo
}

The newKryo method creates a Kryo instance pre-registered with serializers for:

  • All Scala collection types (List, Vector, Set, Map, etc.)
  • Scala tuples (Tuple1 through Tuple22)
  • Scala-specific types (Option, Either, Try, Unit, Symbol)
  • Case classes and products
  • Enumeration values

Usage example:

val kryoInstantiator = new FlinkScalaKryoInstantiator()
val kryo = kryoInstantiator.newKryo
// Kryo instance ready for efficient Scala type serialization

Core Serializers

CaseClassSerializer

Generic serializer for Scala case classes providing efficient field-by-field serialization.

class CaseClassSerializer[T <: Product](
  clazz: Class[T],
  scalaFieldSerializers: Array[TypeSerializer[_]],
  scalaFieldTypes: Array[TypeInformation[_]]
) extends TypeSerializer[T] {
  
  def duplicate(): TypeSerializer[T]
  def createInstance(): T
  def copy(from: T): T
  def copy(from: T, reuse: T): T
  def getLength: Int
  def serialize(value: T, target: DataOutputView): Unit
  def deserialize(source: DataInputView): T
  def deserialize(reuse: T, source: DataInputView): T
  def copy(source: DataInputView, target: DataOutputView): Unit
  def equals(obj: Any): Boolean
  def hashCode(): Int
  def snapshotConfiguration(): TypeSerializerSnapshot[T]
}

Features:

  • Field-level serialization for optimal performance
  • Null handling for optional fields
  • Version compatibility for schema evolution
  • Memory-efficient packed layouts

Tuple2CaseClassSerializer

Specialized optimized serializer for Tuple2 types.

class Tuple2CaseClassSerializer[T1, T2](
  tupleClass: Class[(T1, T2)],
  fieldSerializers: Array[TypeSerializer[_]]
) extends CaseClassSerializer[(T1, T2)] {
  
  // Optimized serialization for two-element tuples
  override def serialize(value: (T1, T2), target: DataOutputView): Unit
  override def deserialize(source: DataInputView): (T1, T2)
  override def copy(from: (T1, T2)): (T1, T2)
}

OptionSerializer

Efficient serializer for Scala Option types with null-bit optimization.

class OptionSerializer[A](elementSerializer: TypeSerializer[A]) extends TypeSerializer[Option[A]] {
  
  def duplicate(): TypeSerializer[Option[A]]
  def createInstance(): Option[A] = None
  def copy(from: Option[A]): Option[A]
  def copy(from: Option[A], reuse: Option[A]): Option[A]
  def getLength: Int = -1  // Variable length
  def serialize(value: Option[A], target: DataOutputView): Unit
  def deserialize(source: DataInputView): Option[A]
  def deserialize(reuse: Option[A], source: DataInputView): Option[A]
  def copy(source: DataInputView, target: DataOutputView): Unit
  def snapshotConfiguration(): TypeSerializerSnapshot[Option[A]]
}

Serialization format:

  • 1 byte: presence flag (0 = None, 1 = Some)
  • If Some: serialized element value

EitherSerializer

Serializer for Scala Either types supporting both Left and Right values.

class EitherSerializer[A, B](
  leftSerializer: TypeSerializer[A],
  rightSerializer: TypeSerializer[B]
) extends TypeSerializer[Either[A, B]] {
  
  def duplicate(): TypeSerializer[Either[A, B]]
  def createInstance(): Either[A, B] = Left(leftSerializer.createInstance())
  def copy(from: Either[A, B]): Either[A, B]
  def getLength: Int = -1  // Variable length
  def serialize(value: Either[A, B], target: DataOutputView): Unit
  def deserialize(source: DataInputView): Either[A, B]
  def snapshotConfiguration(): TypeSerializerSnapshot[Either[A, B]]
}

Serialization format:

  • 1 byte: discriminator (0 = Left, 1 = Right)
  • Serialized value using appropriate serializer

TrySerializer

Serializer for Scala Try types handling Success and Failure cases.

class TrySerializer[A](elementSerializer: TypeSerializer[A]) extends TypeSerializer[Try[A]] {
  
  def duplicate(): TypeSerializer[Try[A]]
  def createInstance(): Try[A] = Success(elementSerializer.createInstance())
  def copy(from: Try[A]): Try[A]
  def serialize(value: Try[A], target: DataOutputView): Unit
  def deserialize(source: DataInputView): Try[A]
  def snapshotConfiguration(): TypeSerializerSnapshot[Try[A]]
}

Serialization format:

  • 1 byte: success flag (0 = Failure, 1 = Success)
  • If Success: serialized value
  • If Failure: serialized exception information

Collection Serializers

TraversableSerializer

Base serializer for Scala collections implementing TraversableOnce.

class TraversableSerializer[T <: TraversableOnce[E], E](
  clazz: Class[T],
  elementSerializer: TypeSerializer[E]
) extends TypeSerializer[T] {
  
  def duplicate(): TypeSerializer[T]
  def copy(from: T): T
  def serialize(value: T, target: DataOutputView): Unit
  def deserialize(source: DataInputView): T
  def snapshotConfiguration(): TypeSerializerSnapshot[T]
}

Serialization format:

  • 4 bytes: collection size
  • Element values serialized sequentially

Supported collection types:

  • List[T], Vector[T], Array[T]
  • Set[T], HashSet[T], TreeSet[T]
  • Map[K,V], HashMap[K,V], TreeMap[K,V]
  • Seq[T], IndexedSeq[T], LinearSeq[T]

KryoTraversableSerializer

Kryo-based fallback serializer for complex collections.

class KryoTraversableSerializer[T <: TraversableOnce[_]](
  clazz: Class[T],
  kryo: Kryo
) extends TypeSerializer[T] {
  
  def serialize(value: T, target: DataOutputView): Unit
  def deserialize(source: DataInputView): T
}

Used for collections that don't have specialized serializers.

Special Type Serializers

EnumValueSerializer

Efficient serializer for Scala Enumeration values using ordinal-based serialization.

class EnumValueSerializer[E <: Enumeration](
  enum: E,
  valueClass: Class[E#Value]
) extends TypeSerializer[E#Value] {
  
  def serialize(value: E#Value, target: DataOutputView): Unit
  def deserialize(source: DataInputView): E#Value
  def copy(from: E#Value): E#Value = from  // Enumerations are immutable
  def getLength: Int = 4  // Fixed length (ordinal as int)
}

Serialization format:

  • 4 bytes: enumeration ordinal value

UnitSerializer

Serializer for Scala Unit type (no actual data serialized).

class UnitSerializer extends TypeSerializer[Unit] {
  def serialize(value: Unit, target: DataOutputView): Unit = {} // Nothing to serialize
  def deserialize(source: DataInputView): Unit = ()
  def copy(from: Unit): Unit = ()
  def getLength: Int = 0  // No bytes needed
  def createInstance(): Unit = ()
}

NothingSerializer

Serializer for Scala Nothing type (never actually used since Nothing cannot be instantiated).

class NothingSerializer extends TypeSerializer[Nothing] {
  def serialize(value: Nothing, target: DataOutputView): Unit = 
    throw new RuntimeException("Cannot serialize Nothing")
  def deserialize(source: DataInputView): Nothing = 
    throw new RuntimeException("Cannot deserialize Nothing")
  def createInstance(): Nothing = 
    throw new RuntimeException("Cannot create Nothing instance")
}

Runtime Type Serializers

SymbolSerializer

Serializer for Scala Symbol objects with string interning.

class SymbolSerializer extends TypeSerializer[Symbol] {
  def serialize(value: Symbol, target: DataOutputView): Unit
  def deserialize(source: DataInputView): Symbol
  def copy(from: Symbol): Symbol = from  // Symbols are interned/immutable
}

SingletonSerializer

Generic serializer for Scala singleton objects.

class SingletonSerializer[T](instance: T) extends TypeSerializer[T] {
  def serialize(value: T, target: DataOutputView): Unit = {} // Nothing to serialize
  def deserialize(source: DataInputView): T = instance
  def copy(from: T): T = instance
  def createInstance(): T = instance
}

SomeSerializer

Specialized serializer for Some instances (part of Option serialization).

class SomeSerializer[T](elementSerializer: TypeSerializer[T]) extends TypeSerializer[Some[T]] {
  def serialize(value: Some[T], target: DataOutputView): Unit
  def deserialize(source: DataInputView): Some[T]
  def copy(from: Some[T]): Some[T]
}

Tuple Serializers

Specialized serializers for all Scala tuple types:

class Tuple1Serializer[T1](s1: TypeSerializer[T1]) extends TypeSerializer[Tuple1[T1]]
class Tuple2Serializer[T1, T2](s1: TypeSerializer[T1], s2: TypeSerializer[T2]) extends TypeSerializer[(T1, T2)]
class Tuple3Serializer[T1, T2, T3](...) extends TypeSerializer[(T1, T2, T3)]
// ... up to Tuple22Serializer

Each tuple serializer:

  • Serializes elements in order
  • Uses fixed-length layout when possible
  • Supports efficient copy operations
  • Maintains type safety for each element

Wrapper Serializers

JavaIterableWrapperSerializer

Wraps Java collections for compatibility with Scala collection serializers.

class JavaIterableWrapperSerializer[T](
  elementSerializer: TypeSerializer[T]
) extends TypeSerializer[java.lang.Iterable[T]] {
  
  def serialize(value: java.lang.Iterable[T], target: DataOutputView): Unit
  def deserialize(source: DataInputView): java.lang.Iterable[T]
}

ClassTagSerializer

Handles Scala ClassTag serialization for generic type information preservation.

class ClassTagSerializer[T](classTag: ClassTag[T]) extends TypeSerializer[ClassTag[T]] {
  def serialize(value: ClassTag[T], target: DataOutputView): Unit
  def deserialize(source: DataInputView): ClassTag[T]
}

Performance Characteristics

Serialization Performance

  • Case Classes: Field-level serialization with minimal overhead
  • Collections: Bulk serialization with size prefixing
  • Option/Either/Try: Single-byte discriminators with payload
  • Enumerations: 4-byte ordinal serialization
  • Tuples: Packed field layouts with no metadata overhead

Memory Efficiency

  • Null-bit vectors for optional fields
  • Shared string interning for symbols
  • Reference deduplication for immutable objects
  • Compact encoding for small collections

Deserialization Speed

  • Direct object creation without reflection
  • Reusable buffer allocation
  • Lazy evaluation for large collections
  • Parallel deserialization support

Configuration and Usage

Custom Kryo Registration

val kryoInstantiator = new FlinkScalaKryoInstantiator() {
  override def newKryo: Kryo = {
    val kryo = super.newKryo
    // Add custom serializers
    kryo.register(classOf[MyCustomClass], new MyCustomSerializer())
    kryo
  }
}

Serializer Configuration

// Configure execution environment with Scala serializers
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.setDefaultKryoInstantiator(classOf[FlinkScalaKryoInstantiator])

// Enable Kryo for generic types
env.getConfig.enableGenericTypes()

Type Information Integration

// Serializers are automatically selected based on type information
case class MyData(values: List[Option[String]])

val dataTypeInfo = Types.CASE_CLASS[MyData]
val serializer = dataTypeInfo.createSerializer(env.getConfig.getSerializerConfig)
// Returns CaseClassSerializer with nested OptionalSerializer and TraversableSerializer

Schema Evolution

Serializer Snapshots

All serializers provide snapshot support for schema evolution:

trait TypeSerializerSnapshot[T] {
  def getCurrentVersion: Int
  def writeSnapshot(out: DataOutputView): Unit
  def readSnapshot(in: DataInputView, userCodeClassLoader: ClassLoader): Unit
  def restoreSerializer(): TypeSerializer[T]
  def resolveSchemaCompatibility(newSerializer: TypeSerializer[T]): TypeSerializerSchemaCompatibility[T]
}

Compatibility Resolution

  • Compatible: Serializer can read old data
  • Incompatible: Migration required
  • RequiresMigration: Automatic migration available

Migration Strategies

// Graceful handling of schema changes
val oldData: DataStream[OldCaseClass] = ...
val migratedData: DataStream[NewCaseClass] = oldData.map { old =>
  NewCaseClass(
    old.existingField,
    "defaultValue",  // New field with default
    old.modifiedField.map(transform)  // Transform existing field
  )
}

Best Practices

Performance Optimization

// Prefer specialized serializers over generic Kryo
case class OptimizedData(id: Int, values: Array[Double])  // Fast array serialization
// Instead of: case class GenericData(id: Int, values: List[Any])  // Slow generic serialization

Memory Management

// Reuse serializer instances when possible
val serializer = typeInfo.createSerializer(config)
// Use same serializer instance for multiple operations

Error Handling

// Handle serialization failures gracefully
try {
  serializer.serialize(value, output)
} catch {
  case e: IOException => 
    logger.error(s"Serialization failed for value: $value", e)
    // Handle error or use fallback serialization
}

Custom Serializer Integration

// Implement custom serializers for domain-specific types
class MoneySerializer extends TypeSerializer[Money] {
  def serialize(money: Money, target: DataOutputView): Unit = {
    target.writeLong(money.amount)
    target.writeUTF(money.currency)
  }
  
  def deserialize(source: DataInputView): Money = {
    Money(source.readLong(), source.readUTF())
  }
}