0
# Runtime Serialization Support
1
2
Kryo-based serialization configuration and specialized serializers for efficient handling of all Scala types including collections, tuples, and special types in distributed Flink execution.
3
4
## FlinkScalaKryoInstantiator
5
6
Main Kryo configuration class providing pre-configured Kryo instances optimized for Scala types.
7
8
```scala { .api }
9
class FlinkScalaKryoInstantiator extends KryoInstantiator {
10
def newKryo: Kryo
11
}
12
```
13
14
The `newKryo` method creates a Kryo instance pre-registered with serializers for:
15
- All Scala collection types (List, Vector, Set, Map, etc.)
16
- Scala tuples (Tuple1 through Tuple22)
17
- Scala-specific types (Option, Either, Try, Unit, Symbol)
18
- Case classes and products
19
- Enumeration values
20
21
Usage example:
22
```scala
23
val kryoInstantiator = new FlinkScalaKryoInstantiator()
24
val kryo = kryoInstantiator.newKryo
25
// Kryo instance ready for efficient Scala type serialization
26
```
27
28
## Core Serializers
29
30
### CaseClassSerializer
31
32
Generic serializer for Scala case classes providing efficient field-by-field serialization.
33
34
```scala { .api }
35
class CaseClassSerializer[T <: Product](
36
clazz: Class[T],
37
scalaFieldSerializers: Array[TypeSerializer[_]],
38
scalaFieldTypes: Array[TypeInformation[_]]
39
) extends TypeSerializer[T] {
40
41
def duplicate(): TypeSerializer[T]
42
def createInstance(): T
43
def copy(from: T): T
44
def copy(from: T, reuse: T): T
45
def getLength: Int
46
def serialize(value: T, target: DataOutputView): Unit
47
def deserialize(source: DataInputView): T
48
def deserialize(reuse: T, source: DataInputView): T
49
def copy(source: DataInputView, target: DataOutputView): Unit
50
def equals(obj: Any): Boolean
51
def hashCode(): Int
52
def snapshotConfiguration(): TypeSerializerSnapshot[T]
53
}
54
```
55
56
Features:
57
- Field-level serialization for optimal performance
58
- Null handling for optional fields
59
- Version compatibility for schema evolution
60
- Memory-efficient packed layouts
61
62
### Tuple2CaseClassSerializer
63
64
Specialized optimized serializer for Tuple2 types.
65
66
```scala { .api }
67
class Tuple2CaseClassSerializer[T1, T2](
68
tupleClass: Class[(T1, T2)],
69
fieldSerializers: Array[TypeSerializer[_]]
70
) extends CaseClassSerializer[(T1, T2)] {
71
72
// Optimized serialization for two-element tuples
73
override def serialize(value: (T1, T2), target: DataOutputView): Unit
74
override def deserialize(source: DataInputView): (T1, T2)
75
override def copy(from: (T1, T2)): (T1, T2)
76
}
77
```
78
79
### OptionSerializer
80
81
Efficient serializer for Scala Option types with null-bit optimization.
82
83
```scala { .api }
84
class OptionSerializer[A](elementSerializer: TypeSerializer[A]) extends TypeSerializer[Option[A]] {
85
86
def duplicate(): TypeSerializer[Option[A]]
87
def createInstance(): Option[A] = None
88
def copy(from: Option[A]): Option[A]
89
def copy(from: Option[A], reuse: Option[A]): Option[A]
90
def getLength: Int = -1 // Variable length
91
def serialize(value: Option[A], target: DataOutputView): Unit
92
def deserialize(source: DataInputView): Option[A]
93
def deserialize(reuse: Option[A], source: DataInputView): Option[A]
94
def copy(source: DataInputView, target: DataOutputView): Unit
95
def snapshotConfiguration(): TypeSerializerSnapshot[Option[A]]
96
}
97
```
98
99
Serialization format:
100
- 1 byte: presence flag (0 = None, 1 = Some)
101
- If Some: serialized element value
102
103
### EitherSerializer
104
105
Serializer for Scala Either types supporting both Left and Right values.
106
107
```scala { .api }
108
class EitherSerializer[A, B](
109
leftSerializer: TypeSerializer[A],
110
rightSerializer: TypeSerializer[B]
111
) extends TypeSerializer[Either[A, B]] {
112
113
def duplicate(): TypeSerializer[Either[A, B]]
114
def createInstance(): Either[A, B] = Left(leftSerializer.createInstance())
115
def copy(from: Either[A, B]): Either[A, B]
116
def getLength: Int = -1 // Variable length
117
def serialize(value: Either[A, B], target: DataOutputView): Unit
118
def deserialize(source: DataInputView): Either[A, B]
119
def snapshotConfiguration(): TypeSerializerSnapshot[Either[A, B]]
120
}
121
```
122
123
Serialization format:
124
- 1 byte: discriminator (0 = Left, 1 = Right)
125
- Serialized value using appropriate serializer
126
127
### TrySerializer
128
129
Serializer for Scala Try types handling Success and Failure cases.
130
131
```scala { .api }
132
class TrySerializer[A](elementSerializer: TypeSerializer[A]) extends TypeSerializer[Try[A]] {
133
134
def duplicate(): TypeSerializer[Try[A]]
135
def createInstance(): Try[A] = Success(elementSerializer.createInstance())
136
def copy(from: Try[A]): Try[A]
137
def serialize(value: Try[A], target: DataOutputView): Unit
138
def deserialize(source: DataInputView): Try[A]
139
def snapshotConfiguration(): TypeSerializerSnapshot[Try[A]]
140
}
141
```
142
143
Serialization format:
144
- 1 byte: success flag (0 = Failure, 1 = Success)
145
- If Success: serialized value
146
- If Failure: serialized exception information
147
148
## Collection Serializers
149
150
### TraversableSerializer
151
152
Base serializer for Scala collections implementing TraversableOnce.
153
154
```scala { .api }
155
class TraversableSerializer[T <: TraversableOnce[E], E](
156
clazz: Class[T],
157
elementSerializer: TypeSerializer[E]
158
) extends TypeSerializer[T] {
159
160
def duplicate(): TypeSerializer[T]
161
def copy(from: T): T
162
def serialize(value: T, target: DataOutputView): Unit
163
def deserialize(source: DataInputView): T
164
def snapshotConfiguration(): TypeSerializerSnapshot[T]
165
}
166
```
167
168
Serialization format:
169
- 4 bytes: collection size
170
- Element values serialized sequentially
171
172
Supported collection types:
173
- `List[T]`, `Vector[T]`, `Array[T]`
174
- `Set[T]`, `HashSet[T]`, `TreeSet[T]`
175
- `Map[K,V]`, `HashMap[K,V]`, `TreeMap[K,V]`
176
- `Seq[T]`, `IndexedSeq[T]`, `LinearSeq[T]`
177
178
### KryoTraversableSerializer
179
180
Kryo-based fallback serializer for complex collections.
181
182
```scala { .api }
183
class KryoTraversableSerializer[T <: TraversableOnce[_]](
184
clazz: Class[T],
185
kryo: Kryo
186
) extends TypeSerializer[T] {
187
188
def serialize(value: T, target: DataOutputView): Unit
189
def deserialize(source: DataInputView): T
190
}
191
```
192
193
Used for collections that don't have specialized serializers.
194
195
## Special Type Serializers
196
197
### EnumValueSerializer
198
199
Efficient serializer for Scala Enumeration values using ordinal-based serialization.
200
201
```scala { .api }
202
class EnumValueSerializer[E <: Enumeration](
203
enum: E,
204
valueClass: Class[E#Value]
205
) extends TypeSerializer[E#Value] {
206
207
def serialize(value: E#Value, target: DataOutputView): Unit
208
def deserialize(source: DataInputView): E#Value
209
def copy(from: E#Value): E#Value = from // Enumerations are immutable
210
def getLength: Int = 4 // Fixed length (ordinal as int)
211
}
212
```
213
214
Serialization format:
215
- 4 bytes: enumeration ordinal value
216
217
### UnitSerializer
218
219
Serializer for Scala Unit type (no actual data serialized).
220
221
```scala { .api }
222
class UnitSerializer extends TypeSerializer[Unit] {
223
def serialize(value: Unit, target: DataOutputView): Unit = {} // Nothing to serialize
224
def deserialize(source: DataInputView): Unit = ()
225
def copy(from: Unit): Unit = ()
226
def getLength: Int = 0 // No bytes needed
227
def createInstance(): Unit = ()
228
}
229
```
230
231
### NothingSerializer
232
233
Serializer for Scala Nothing type (never actually used since Nothing cannot be instantiated).
234
235
```scala { .api }
236
class NothingSerializer extends TypeSerializer[Nothing] {
237
def serialize(value: Nothing, target: DataOutputView): Unit =
238
throw new RuntimeException("Cannot serialize Nothing")
239
def deserialize(source: DataInputView): Nothing =
240
throw new RuntimeException("Cannot deserialize Nothing")
241
def createInstance(): Nothing =
242
throw new RuntimeException("Cannot create Nothing instance")
243
}
244
```
245
246
## Runtime Type Serializers
247
248
### SymbolSerializer
249
250
Serializer for Scala Symbol objects with string interning.
251
252
```scala { .api }
253
class SymbolSerializer extends TypeSerializer[Symbol] {
254
def serialize(value: Symbol, target: DataOutputView): Unit
255
def deserialize(source: DataInputView): Symbol
256
def copy(from: Symbol): Symbol = from // Symbols are interned/immutable
257
}
258
```
259
260
### SingletonSerializer
261
262
Generic serializer for Scala singleton objects.
263
264
```scala { .api }
265
class SingletonSerializer[T](instance: T) extends TypeSerializer[T] {
266
def serialize(value: T, target: DataOutputView): Unit = {} // Nothing to serialize
267
def deserialize(source: DataInputView): T = instance
268
def copy(from: T): T = instance
269
def createInstance(): T = instance
270
}
271
```
272
273
### SomeSerializer
274
275
Specialized serializer for Some instances (part of Option serialization).
276
277
```scala { .api }
278
class SomeSerializer[T](elementSerializer: TypeSerializer[T]) extends TypeSerializer[Some[T]] {
279
def serialize(value: Some[T], target: DataOutputView): Unit
280
def deserialize(source: DataInputView): Some[T]
281
def copy(from: Some[T]): Some[T]
282
}
283
```
284
285
## Tuple Serializers
286
287
Specialized serializers for all Scala tuple types:
288
289
```scala { .api }
290
class Tuple1Serializer[T1](s1: TypeSerializer[T1]) extends TypeSerializer[Tuple1[T1]]
291
class Tuple2Serializer[T1, T2](s1: TypeSerializer[T1], s2: TypeSerializer[T2]) extends TypeSerializer[(T1, T2)]
292
class Tuple3Serializer[T1, T2, T3](...) extends TypeSerializer[(T1, T2, T3)]
293
// ... up to Tuple22Serializer
294
```
295
296
Each tuple serializer:
297
- Serializes elements in order
298
- Uses fixed-length layout when possible
299
- Supports efficient copy operations
300
- Maintains type safety for each element
301
302
## Wrapper Serializers
303
304
### JavaIterableWrapperSerializer
305
306
Wraps Java collections for compatibility with Scala collection serializers.
307
308
```scala { .api }
309
class JavaIterableWrapperSerializer[T](
310
elementSerializer: TypeSerializer[T]
311
) extends TypeSerializer[java.lang.Iterable[T]] {
312
313
def serialize(value: java.lang.Iterable[T], target: DataOutputView): Unit
314
def deserialize(source: DataInputView): java.lang.Iterable[T]
315
}
316
```
317
318
### ClassTagSerializer
319
320
Handles Scala ClassTag serialization for generic type information preservation.
321
322
```scala { .api }
323
class ClassTagSerializer[T](classTag: ClassTag[T]) extends TypeSerializer[ClassTag[T]] {
324
def serialize(value: ClassTag[T], target: DataOutputView): Unit
325
def deserialize(source: DataInputView): ClassTag[T]
326
}
327
```
328
329
## Performance Characteristics
330
331
### Serialization Performance
332
333
- **Case Classes**: Field-level serialization with minimal overhead
334
- **Collections**: Bulk serialization with size prefixing
335
- **Option/Either/Try**: Single-byte discriminators with payload
336
- **Enumerations**: 4-byte ordinal serialization
337
- **Tuples**: Packed field layouts with no metadata overhead
338
339
### Memory Efficiency
340
341
- Null-bit vectors for optional fields
342
- Shared string interning for symbols
343
- Reference deduplication for immutable objects
344
- Compact encoding for small collections
345
346
### Deserialization Speed
347
348
- Direct object creation without reflection
349
- Reusable buffer allocation
350
- Lazy evaluation for large collections
351
- Parallel deserialization support
352
353
## Configuration and Usage
354
355
### Custom Kryo Registration
356
357
```scala
358
val kryoInstantiator = new FlinkScalaKryoInstantiator() {
359
override def newKryo: Kryo = {
360
val kryo = super.newKryo
361
// Add custom serializers
362
kryo.register(classOf[MyCustomClass], new MyCustomSerializer())
363
kryo
364
}
365
}
366
```
367
368
### Serializer Configuration
369
370
```scala
371
// Configure execution environment with Scala serializers
372
val env = StreamExecutionEnvironment.getExecutionEnvironment
373
env.getConfig.setDefaultKryoInstantiator(classOf[FlinkScalaKryoInstantiator])
374
375
// Enable Kryo for generic types
376
env.getConfig.enableGenericTypes()
377
```
378
379
### Type Information Integration
380
381
```scala
382
// Serializers are automatically selected based on type information
383
case class MyData(values: List[Option[String]])
384
385
val dataTypeInfo = Types.CASE_CLASS[MyData]
386
val serializer = dataTypeInfo.createSerializer(env.getConfig.getSerializerConfig)
387
// Returns CaseClassSerializer with nested OptionalSerializer and TraversableSerializer
388
```
389
390
## Schema Evolution
391
392
### Serializer Snapshots
393
394
All serializers provide snapshot support for schema evolution:
395
396
```scala { .api }
397
trait TypeSerializerSnapshot[T] {
398
def getCurrentVersion: Int
399
def writeSnapshot(out: DataOutputView): Unit
400
def readSnapshot(in: DataInputView, userCodeClassLoader: ClassLoader): Unit
401
def restoreSerializer(): TypeSerializer[T]
402
def resolveSchemaCompatibility(newSerializer: TypeSerializer[T]): TypeSerializerSchemaCompatibility[T]
403
}
404
```
405
406
### Compatibility Resolution
407
408
- **Compatible**: Serializer can read old data
409
- **Incompatible**: Migration required
410
- **RequiresMigration**: Automatic migration available
411
412
### Migration Strategies
413
414
```scala
415
// Graceful handling of schema changes
416
val oldData: DataStream[OldCaseClass] = ...
417
val migratedData: DataStream[NewCaseClass] = oldData.map { old =>
418
NewCaseClass(
419
old.existingField,
420
"defaultValue", // New field with default
421
old.modifiedField.map(transform) // Transform existing field
422
)
423
}
424
```
425
426
## Best Practices
427
428
### Performance Optimization
429
430
```scala
431
// Prefer specialized serializers over generic Kryo
432
case class OptimizedData(id: Int, values: Array[Double]) // Fast array serialization
433
// Instead of: case class GenericData(id: Int, values: List[Any]) // Slow generic serialization
434
```
435
436
### Memory Management
437
438
```scala
439
// Reuse serializer instances when possible
440
val serializer = typeInfo.createSerializer(config)
441
// Use same serializer instance for multiple operations
442
```
443
444
### Error Handling
445
446
```scala
447
// Handle serialization failures gracefully
448
try {
449
serializer.serialize(value, output)
450
} catch {
451
case e: IOException =>
452
logger.error(s"Serialization failed for value: $value", e)
453
// Handle error or use fallback serialization
454
}
455
```
456
457
### Custom Serializer Integration
458
459
```scala
460
// Implement custom serializers for domain-specific types
461
class MoneySerializer extends TypeSerializer[Money] {
462
def serialize(money: Money, target: DataOutputView): Unit = {
463
target.writeLong(money.amount)
464
target.writeUTF(money.currency)
465
}
466
467
def deserialize(source: DataInputView): Money = {
468
Money(source.readLong(), source.readUTF())
469
}
470
}
471
```