Kryo-based serialization configuration and specialized serializers for efficient handling of all Scala types including collections, tuples, and special types in distributed Flink execution.
Main Kryo configuration class providing pre-configured Kryo instances optimized for Scala types.
class FlinkScalaKryoInstantiator extends KryoInstantiator {
def newKryo: Kryo
}The newKryo method creates a Kryo instance pre-registered with serializers for:
Usage example:
val kryoInstantiator = new FlinkScalaKryoInstantiator()
val kryo = kryoInstantiator.newKryo
// Kryo instance ready for efficient Scala type serializationGeneric serializer for Scala case classes providing efficient field-by-field serialization.
class CaseClassSerializer[T <: Product](
clazz: Class[T],
scalaFieldSerializers: Array[TypeSerializer[_]],
scalaFieldTypes: Array[TypeInformation[_]]
) extends TypeSerializer[T] {
def duplicate(): TypeSerializer[T]
def createInstance(): T
def copy(from: T): T
def copy(from: T, reuse: T): T
def getLength: Int
def serialize(value: T, target: DataOutputView): Unit
def deserialize(source: DataInputView): T
def deserialize(reuse: T, source: DataInputView): T
def copy(source: DataInputView, target: DataOutputView): Unit
def equals(obj: Any): Boolean
def hashCode(): Int
def snapshotConfiguration(): TypeSerializerSnapshot[T]
}Features:
Specialized optimized serializer for Tuple2 types.
class Tuple2CaseClassSerializer[T1, T2](
tupleClass: Class[(T1, T2)],
fieldSerializers: Array[TypeSerializer[_]]
) extends CaseClassSerializer[(T1, T2)] {
// Optimized serialization for two-element tuples
override def serialize(value: (T1, T2), target: DataOutputView): Unit
override def deserialize(source: DataInputView): (T1, T2)
override def copy(from: (T1, T2)): (T1, T2)
}Efficient serializer for Scala Option types with null-bit optimization.
class OptionSerializer[A](elementSerializer: TypeSerializer[A]) extends TypeSerializer[Option[A]] {
def duplicate(): TypeSerializer[Option[A]]
def createInstance(): Option[A] = None
def copy(from: Option[A]): Option[A]
def copy(from: Option[A], reuse: Option[A]): Option[A]
def getLength: Int = -1 // Variable length
def serialize(value: Option[A], target: DataOutputView): Unit
def deserialize(source: DataInputView): Option[A]
def deserialize(reuse: Option[A], source: DataInputView): Option[A]
def copy(source: DataInputView, target: DataOutputView): Unit
def snapshotConfiguration(): TypeSerializerSnapshot[Option[A]]
}Serialization format:
Serializer for Scala Either types supporting both Left and Right values.
class EitherSerializer[A, B](
leftSerializer: TypeSerializer[A],
rightSerializer: TypeSerializer[B]
) extends TypeSerializer[Either[A, B]] {
def duplicate(): TypeSerializer[Either[A, B]]
def createInstance(): Either[A, B] = Left(leftSerializer.createInstance())
def copy(from: Either[A, B]): Either[A, B]
def getLength: Int = -1 // Variable length
def serialize(value: Either[A, B], target: DataOutputView): Unit
def deserialize(source: DataInputView): Either[A, B]
def snapshotConfiguration(): TypeSerializerSnapshot[Either[A, B]]
}Serialization format:
Serializer for Scala Try types handling Success and Failure cases.
class TrySerializer[A](elementSerializer: TypeSerializer[A]) extends TypeSerializer[Try[A]] {
def duplicate(): TypeSerializer[Try[A]]
def createInstance(): Try[A] = Success(elementSerializer.createInstance())
def copy(from: Try[A]): Try[A]
def serialize(value: Try[A], target: DataOutputView): Unit
def deserialize(source: DataInputView): Try[A]
def snapshotConfiguration(): TypeSerializerSnapshot[Try[A]]
}Serialization format:
Base serializer for Scala collections implementing TraversableOnce.
class TraversableSerializer[T <: TraversableOnce[E], E](
clazz: Class[T],
elementSerializer: TypeSerializer[E]
) extends TypeSerializer[T] {
def duplicate(): TypeSerializer[T]
def copy(from: T): T
def serialize(value: T, target: DataOutputView): Unit
def deserialize(source: DataInputView): T
def snapshotConfiguration(): TypeSerializerSnapshot[T]
}Serialization format:
Supported collection types:
List[T], Vector[T], Array[T]Set[T], HashSet[T], TreeSet[T]Map[K,V], HashMap[K,V], TreeMap[K,V]Seq[T], IndexedSeq[T], LinearSeq[T]Kryo-based fallback serializer for complex collections.
class KryoTraversableSerializer[T <: TraversableOnce[_]](
clazz: Class[T],
kryo: Kryo
) extends TypeSerializer[T] {
def serialize(value: T, target: DataOutputView): Unit
def deserialize(source: DataInputView): T
}Used for collections that don't have specialized serializers.
Efficient serializer for Scala Enumeration values using ordinal-based serialization.
class EnumValueSerializer[E <: Enumeration](
enum: E,
valueClass: Class[E#Value]
) extends TypeSerializer[E#Value] {
def serialize(value: E#Value, target: DataOutputView): Unit
def deserialize(source: DataInputView): E#Value
def copy(from: E#Value): E#Value = from // Enumerations are immutable
def getLength: Int = 4 // Fixed length (ordinal as int)
}Serialization format:
Serializer for Scala Unit type (no actual data serialized).
class UnitSerializer extends TypeSerializer[Unit] {
def serialize(value: Unit, target: DataOutputView): Unit = {} // Nothing to serialize
def deserialize(source: DataInputView): Unit = ()
def copy(from: Unit): Unit = ()
def getLength: Int = 0 // No bytes needed
def createInstance(): Unit = ()
}Serializer for Scala Nothing type (never actually used since Nothing cannot be instantiated).
class NothingSerializer extends TypeSerializer[Nothing] {
def serialize(value: Nothing, target: DataOutputView): Unit =
throw new RuntimeException("Cannot serialize Nothing")
def deserialize(source: DataInputView): Nothing =
throw new RuntimeException("Cannot deserialize Nothing")
def createInstance(): Nothing =
throw new RuntimeException("Cannot create Nothing instance")
}Serializer for Scala Symbol objects with string interning.
class SymbolSerializer extends TypeSerializer[Symbol] {
def serialize(value: Symbol, target: DataOutputView): Unit
def deserialize(source: DataInputView): Symbol
def copy(from: Symbol): Symbol = from // Symbols are interned/immutable
}Generic serializer for Scala singleton objects.
class SingletonSerializer[T](instance: T) extends TypeSerializer[T] {
def serialize(value: T, target: DataOutputView): Unit = {} // Nothing to serialize
def deserialize(source: DataInputView): T = instance
def copy(from: T): T = instance
def createInstance(): T = instance
}Specialized serializer for Some instances (part of Option serialization).
class SomeSerializer[T](elementSerializer: TypeSerializer[T]) extends TypeSerializer[Some[T]] {
def serialize(value: Some[T], target: DataOutputView): Unit
def deserialize(source: DataInputView): Some[T]
def copy(from: Some[T]): Some[T]
}Specialized serializers for all Scala tuple types:
class Tuple1Serializer[T1](s1: TypeSerializer[T1]) extends TypeSerializer[Tuple1[T1]]
class Tuple2Serializer[T1, T2](s1: TypeSerializer[T1], s2: TypeSerializer[T2]) extends TypeSerializer[(T1, T2)]
class Tuple3Serializer[T1, T2, T3](...) extends TypeSerializer[(T1, T2, T3)]
// ... up to Tuple22SerializerEach tuple serializer:
Wraps Java collections for compatibility with Scala collection serializers.
class JavaIterableWrapperSerializer[T](
elementSerializer: TypeSerializer[T]
) extends TypeSerializer[java.lang.Iterable[T]] {
def serialize(value: java.lang.Iterable[T], target: DataOutputView): Unit
def deserialize(source: DataInputView): java.lang.Iterable[T]
}Handles Scala ClassTag serialization for generic type information preservation.
class ClassTagSerializer[T](classTag: ClassTag[T]) extends TypeSerializer[ClassTag[T]] {
def serialize(value: ClassTag[T], target: DataOutputView): Unit
def deserialize(source: DataInputView): ClassTag[T]
}val kryoInstantiator = new FlinkScalaKryoInstantiator() {
override def newKryo: Kryo = {
val kryo = super.newKryo
// Add custom serializers
kryo.register(classOf[MyCustomClass], new MyCustomSerializer())
kryo
}
}// Configure execution environment with Scala serializers
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.setDefaultKryoInstantiator(classOf[FlinkScalaKryoInstantiator])
// Enable Kryo for generic types
env.getConfig.enableGenericTypes()// Serializers are automatically selected based on type information
case class MyData(values: List[Option[String]])
val dataTypeInfo = Types.CASE_CLASS[MyData]
val serializer = dataTypeInfo.createSerializer(env.getConfig.getSerializerConfig)
// Returns CaseClassSerializer with nested OptionalSerializer and TraversableSerializerAll serializers provide snapshot support for schema evolution:
trait TypeSerializerSnapshot[T] {
def getCurrentVersion: Int
def writeSnapshot(out: DataOutputView): Unit
def readSnapshot(in: DataInputView, userCodeClassLoader: ClassLoader): Unit
def restoreSerializer(): TypeSerializer[T]
def resolveSchemaCompatibility(newSerializer: TypeSerializer[T]): TypeSerializerSchemaCompatibility[T]
}// Graceful handling of schema changes
val oldData: DataStream[OldCaseClass] = ...
val migratedData: DataStream[NewCaseClass] = oldData.map { old =>
NewCaseClass(
old.existingField,
"defaultValue", // New field with default
old.modifiedField.map(transform) // Transform existing field
)
}// Prefer specialized serializers over generic Kryo
case class OptimizedData(id: Int, values: Array[Double]) // Fast array serialization
// Instead of: case class GenericData(id: Int, values: List[Any]) // Slow generic serialization// Reuse serializer instances when possible
val serializer = typeInfo.createSerializer(config)
// Use same serializer instance for multiple operations// Handle serialization failures gracefully
try {
serializer.serialize(value, output)
} catch {
case e: IOException =>
logger.error(s"Serialization failed for value: $value", e)
// Handle error or use fallback serialization
}// Implement custom serializers for domain-specific types
class MoneySerializer extends TypeSerializer[Money] {
def serialize(money: Money, target: DataOutputView): Unit = {
target.writeLong(money.amount)
target.writeUTF(money.currency)
}
def deserialize(source: DataInputView): Money = {
Money(source.readLong(), source.readUTF())
}
}