or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bridge.mdexpressions.mdfunctions.mdindex.mdoperations.mdserialization.mdtypeinfo.mdtypes.md

serialization.mddocs/

0

# Runtime Serialization Support

1

2

Kryo-based serialization configuration and specialized serializers for efficient handling of all Scala types including collections, tuples, and special types in distributed Flink execution.

3

4

## FlinkScalaKryoInstantiator

5

6

Main Kryo configuration class providing pre-configured Kryo instances optimized for Scala types.

7

8

```scala { .api }

9

class FlinkScalaKryoInstantiator extends KryoInstantiator {

10

def newKryo: Kryo

11

}

12

```

13

14

The `newKryo` method creates a Kryo instance pre-registered with serializers for:

15

- All Scala collection types (List, Vector, Set, Map, etc.)

16

- Scala tuples (Tuple1 through Tuple22)

17

- Scala-specific types (Option, Either, Try, Unit, Symbol)

18

- Case classes and products

19

- Enumeration values

20

21

Usage example:

22

```scala

23

val kryoInstantiator = new FlinkScalaKryoInstantiator()

24

val kryo = kryoInstantiator.newKryo

25

// Kryo instance ready for efficient Scala type serialization

26

```

27

28

## Core Serializers

29

30

### CaseClassSerializer

31

32

Generic serializer for Scala case classes providing efficient field-by-field serialization.

33

34

```scala { .api }

35

class CaseClassSerializer[T <: Product](

36

clazz: Class[T],

37

scalaFieldSerializers: Array[TypeSerializer[_]],

38

scalaFieldTypes: Array[TypeInformation[_]]

39

) extends TypeSerializer[T] {

40

41

def duplicate(): TypeSerializer[T]

42

def createInstance(): T

43

def copy(from: T): T

44

def copy(from: T, reuse: T): T

45

def getLength: Int

46

def serialize(value: T, target: DataOutputView): Unit

47

def deserialize(source: DataInputView): T

48

def deserialize(reuse: T, source: DataInputView): T

49

def copy(source: DataInputView, target: DataOutputView): Unit

50

def equals(obj: Any): Boolean

51

def hashCode(): Int

52

def snapshotConfiguration(): TypeSerializerSnapshot[T]

53

}

54

```

55

56

Features:

57

- Field-level serialization for optimal performance

58

- Null handling for optional fields

59

- Version compatibility for schema evolution

60

- Memory-efficient packed layouts

61

62

### Tuple2CaseClassSerializer

63

64

Specialized optimized serializer for Tuple2 types.

65

66

```scala { .api }

67

class Tuple2CaseClassSerializer[T1, T2](

68

tupleClass: Class[(T1, T2)],

69

fieldSerializers: Array[TypeSerializer[_]]

70

) extends CaseClassSerializer[(T1, T2)] {

71

72

// Optimized serialization for two-element tuples

73

override def serialize(value: (T1, T2), target: DataOutputView): Unit

74

override def deserialize(source: DataInputView): (T1, T2)

75

override def copy(from: (T1, T2)): (T1, T2)

76

}

77

```

78

79

### OptionSerializer

80

81

Efficient serializer for Scala Option types with null-bit optimization.

82

83

```scala { .api }

84

class OptionSerializer[A](elementSerializer: TypeSerializer[A]) extends TypeSerializer[Option[A]] {

85

86

def duplicate(): TypeSerializer[Option[A]]

87

def createInstance(): Option[A] = None

88

def copy(from: Option[A]): Option[A]

89

def copy(from: Option[A], reuse: Option[A]): Option[A]

90

def getLength: Int = -1 // Variable length

91

def serialize(value: Option[A], target: DataOutputView): Unit

92

def deserialize(source: DataInputView): Option[A]

93

def deserialize(reuse: Option[A], source: DataInputView): Option[A]

94

def copy(source: DataInputView, target: DataOutputView): Unit

95

def snapshotConfiguration(): TypeSerializerSnapshot[Option[A]]

96

}

97

```

98

99

Serialization format:

100

- 1 byte: presence flag (0 = None, 1 = Some)

101

- If Some: serialized element value

102

103

### EitherSerializer

104

105

Serializer for Scala Either types supporting both Left and Right values.

106

107

```scala { .api }

108

class EitherSerializer[A, B](

109

leftSerializer: TypeSerializer[A],

110

rightSerializer: TypeSerializer[B]

111

) extends TypeSerializer[Either[A, B]] {

112

113

def duplicate(): TypeSerializer[Either[A, B]]

114

def createInstance(): Either[A, B] = Left(leftSerializer.createInstance())

115

def copy(from: Either[A, B]): Either[A, B]

116

def getLength: Int = -1 // Variable length

117

def serialize(value: Either[A, B], target: DataOutputView): Unit

118

def deserialize(source: DataInputView): Either[A, B]

119

def snapshotConfiguration(): TypeSerializerSnapshot[Either[A, B]]

120

}

121

```

122

123

Serialization format:

124

- 1 byte: discriminator (0 = Left, 1 = Right)

125

- Serialized value using appropriate serializer

126

127

### TrySerializer

128

129

Serializer for Scala Try types handling Success and Failure cases.

130

131

```scala { .api }

132

class TrySerializer[A](elementSerializer: TypeSerializer[A]) extends TypeSerializer[Try[A]] {

133

134

def duplicate(): TypeSerializer[Try[A]]

135

def createInstance(): Try[A] = Success(elementSerializer.createInstance())

136

def copy(from: Try[A]): Try[A]

137

def serialize(value: Try[A], target: DataOutputView): Unit

138

def deserialize(source: DataInputView): Try[A]

139

def snapshotConfiguration(): TypeSerializerSnapshot[Try[A]]

140

}

141

```

142

143

Serialization format:

144

- 1 byte: success flag (0 = Failure, 1 = Success)

145

- If Success: serialized value

146

- If Failure: serialized exception information

147

148

## Collection Serializers

149

150

### TraversableSerializer

151

152

Base serializer for Scala collections implementing TraversableOnce.

153

154

```scala { .api }

155

class TraversableSerializer[T <: TraversableOnce[E], E](

156

clazz: Class[T],

157

elementSerializer: TypeSerializer[E]

158

) extends TypeSerializer[T] {

159

160

def duplicate(): TypeSerializer[T]

161

def copy(from: T): T

162

def serialize(value: T, target: DataOutputView): Unit

163

def deserialize(source: DataInputView): T

164

def snapshotConfiguration(): TypeSerializerSnapshot[T]

165

}

166

```

167

168

Serialization format:

169

- 4 bytes: collection size

170

- Element values serialized sequentially

171

172

Supported collection types:

173

- `List[T]`, `Vector[T]`, `Array[T]`

174

- `Set[T]`, `HashSet[T]`, `TreeSet[T]`

175

- `Map[K,V]`, `HashMap[K,V]`, `TreeMap[K,V]`

176

- `Seq[T]`, `IndexedSeq[T]`, `LinearSeq[T]`

177

178

### KryoTraversableSerializer

179

180

Kryo-based fallback serializer for complex collections.

181

182

```scala { .api }

183

class KryoTraversableSerializer[T <: TraversableOnce[_]](

184

clazz: Class[T],

185

kryo: Kryo

186

) extends TypeSerializer[T] {

187

188

def serialize(value: T, target: DataOutputView): Unit

189

def deserialize(source: DataInputView): T

190

}

191

```

192

193

Used for collections that don't have specialized serializers.

194

195

## Special Type Serializers

196

197

### EnumValueSerializer

198

199

Efficient serializer for Scala Enumeration values using ordinal-based serialization.

200

201

```scala { .api }

202

class EnumValueSerializer[E <: Enumeration](

203

enum: E,

204

valueClass: Class[E#Value]

205

) extends TypeSerializer[E#Value] {

206

207

def serialize(value: E#Value, target: DataOutputView): Unit

208

def deserialize(source: DataInputView): E#Value

209

def copy(from: E#Value): E#Value = from // Enumerations are immutable

210

def getLength: Int = 4 // Fixed length (ordinal as int)

211

}

212

```

213

214

Serialization format:

215

- 4 bytes: enumeration ordinal value

216

217

### UnitSerializer

218

219

Serializer for Scala Unit type (no actual data serialized).

220

221

```scala { .api }

222

class UnitSerializer extends TypeSerializer[Unit] {

223

def serialize(value: Unit, target: DataOutputView): Unit = {} // Nothing to serialize

224

def deserialize(source: DataInputView): Unit = ()

225

def copy(from: Unit): Unit = ()

226

def getLength: Int = 0 // No bytes needed

227

def createInstance(): Unit = ()

228

}

229

```

230

231

### NothingSerializer

232

233

Serializer for Scala Nothing type (never actually used since Nothing cannot be instantiated).

234

235

```scala { .api }

236

class NothingSerializer extends TypeSerializer[Nothing] {

237

def serialize(value: Nothing, target: DataOutputView): Unit =

238

throw new RuntimeException("Cannot serialize Nothing")

239

def deserialize(source: DataInputView): Nothing =

240

throw new RuntimeException("Cannot deserialize Nothing")

241

def createInstance(): Nothing =

242

throw new RuntimeException("Cannot create Nothing instance")

243

}

244

```

245

246

## Runtime Type Serializers

247

248

### SymbolSerializer

249

250

Serializer for Scala Symbol objects with string interning.

251

252

```scala { .api }

253

class SymbolSerializer extends TypeSerializer[Symbol] {

254

def serialize(value: Symbol, target: DataOutputView): Unit

255

def deserialize(source: DataInputView): Symbol

256

def copy(from: Symbol): Symbol = from // Symbols are interned/immutable

257

}

258

```

259

260

### SingletonSerializer

261

262

Generic serializer for Scala singleton objects.

263

264

```scala { .api }

265

class SingletonSerializer[T](instance: T) extends TypeSerializer[T] {

266

def serialize(value: T, target: DataOutputView): Unit = {} // Nothing to serialize

267

def deserialize(source: DataInputView): T = instance

268

def copy(from: T): T = instance

269

def createInstance(): T = instance

270

}

271

```

272

273

### SomeSerializer

274

275

Specialized serializer for Some instances (part of Option serialization).

276

277

```scala { .api }

278

class SomeSerializer[T](elementSerializer: TypeSerializer[T]) extends TypeSerializer[Some[T]] {

279

def serialize(value: Some[T], target: DataOutputView): Unit

280

def deserialize(source: DataInputView): Some[T]

281

def copy(from: Some[T]): Some[T]

282

}

283

```

284

285

## Tuple Serializers

286

287

Specialized serializers for all Scala tuple types:

288

289

```scala { .api }

290

class Tuple1Serializer[T1](s1: TypeSerializer[T1]) extends TypeSerializer[Tuple1[T1]]

291

class Tuple2Serializer[T1, T2](s1: TypeSerializer[T1], s2: TypeSerializer[T2]) extends TypeSerializer[(T1, T2)]

292

class Tuple3Serializer[T1, T2, T3](...) extends TypeSerializer[(T1, T2, T3)]

293

// ... up to Tuple22Serializer

294

```

295

296

Each tuple serializer:

297

- Serializes elements in order

298

- Uses fixed-length layout when possible

299

- Supports efficient copy operations

300

- Maintains type safety for each element

301

302

## Wrapper Serializers

303

304

### JavaIterableWrapperSerializer

305

306

Wraps Java collections for compatibility with Scala collection serializers.

307

308

```scala { .api }

309

class JavaIterableWrapperSerializer[T](

310

elementSerializer: TypeSerializer[T]

311

) extends TypeSerializer[java.lang.Iterable[T]] {

312

313

def serialize(value: java.lang.Iterable[T], target: DataOutputView): Unit

314

def deserialize(source: DataInputView): java.lang.Iterable[T]

315

}

316

```

317

318

### ClassTagSerializer

319

320

Handles Scala ClassTag serialization for generic type information preservation.

321

322

```scala { .api }

323

class ClassTagSerializer[T](classTag: ClassTag[T]) extends TypeSerializer[ClassTag[T]] {

324

def serialize(value: ClassTag[T], target: DataOutputView): Unit

325

def deserialize(source: DataInputView): ClassTag[T]

326

}

327

```

328

329

## Performance Characteristics

330

331

### Serialization Performance

332

333

- **Case Classes**: Field-level serialization with minimal overhead

334

- **Collections**: Bulk serialization with size prefixing

335

- **Option/Either/Try**: Single-byte discriminators with payload

336

- **Enumerations**: 4-byte ordinal serialization

337

- **Tuples**: Packed field layouts with no metadata overhead

338

339

### Memory Efficiency

340

341

- Null-bit vectors for optional fields

342

- Shared string interning for symbols

343

- Reference deduplication for immutable objects

344

- Compact encoding for small collections

345

346

### Deserialization Speed

347

348

- Direct object creation without reflection

349

- Reusable buffer allocation

350

- Lazy evaluation for large collections

351

- Parallel deserialization support

352

353

## Configuration and Usage

354

355

### Custom Kryo Registration

356

357

```scala

358

val kryoInstantiator = new FlinkScalaKryoInstantiator() {

359

override def newKryo: Kryo = {

360

val kryo = super.newKryo

361

// Add custom serializers

362

kryo.register(classOf[MyCustomClass], new MyCustomSerializer())

363

kryo

364

}

365

}

366

```

367

368

### Serializer Configuration

369

370

```scala

371

// Configure execution environment with Scala serializers

372

val env = StreamExecutionEnvironment.getExecutionEnvironment

373

env.getConfig.setDefaultKryoInstantiator(classOf[FlinkScalaKryoInstantiator])

374

375

// Enable Kryo for generic types

376

env.getConfig.enableGenericTypes()

377

```

378

379

### Type Information Integration

380

381

```scala

382

// Serializers are automatically selected based on type information

383

case class MyData(values: List[Option[String]])

384

385

val dataTypeInfo = Types.CASE_CLASS[MyData]

386

val serializer = dataTypeInfo.createSerializer(env.getConfig.getSerializerConfig)

387

// Returns CaseClassSerializer with nested OptionalSerializer and TraversableSerializer

388

```

389

390

## Schema Evolution

391

392

### Serializer Snapshots

393

394

All serializers provide snapshot support for schema evolution:

395

396

```scala { .api }

397

trait TypeSerializerSnapshot[T] {

398

def getCurrentVersion: Int

399

def writeSnapshot(out: DataOutputView): Unit

400

def readSnapshot(in: DataInputView, userCodeClassLoader: ClassLoader): Unit

401

def restoreSerializer(): TypeSerializer[T]

402

def resolveSchemaCompatibility(newSerializer: TypeSerializer[T]): TypeSerializerSchemaCompatibility[T]

403

}

404

```

405

406

### Compatibility Resolution

407

408

- **Compatible**: Serializer can read old data

409

- **Incompatible**: Migration required

410

- **RequiresMigration**: Automatic migration available

411

412

### Migration Strategies

413

414

```scala

415

// Graceful handling of schema changes

416

val oldData: DataStream[OldCaseClass] = ...

417

val migratedData: DataStream[NewCaseClass] = oldData.map { old =>

418

NewCaseClass(

419

old.existingField,

420

"defaultValue", // New field with default

421

old.modifiedField.map(transform) // Transform existing field

422

)

423

}

424

```

425

426

## Best Practices

427

428

### Performance Optimization

429

430

```scala

431

// Prefer specialized serializers over generic Kryo

432

case class OptimizedData(id: Int, values: Array[Double]) // Fast array serialization

433

// Instead of: case class GenericData(id: Int, values: List[Any]) // Slow generic serialization

434

```

435

436

### Memory Management

437

438

```scala

439

// Reuse serializer instances when possible

440

val serializer = typeInfo.createSerializer(config)

441

// Use same serializer instance for multiple operations

442

```

443

444

### Error Handling

445

446

```scala

447

// Handle serialization failures gracefully

448

try {

449

serializer.serialize(value, output)

450

} catch {

451

case e: IOException =>

452

logger.error(s"Serialization failed for value: $value", e)

453

// Handle error or use fallback serialization

454

}

455

```

456

457

### Custom Serializer Integration

458

459

```scala

460

// Implement custom serializers for domain-specific types

461

class MoneySerializer extends TypeSerializer[Money] {

462

def serialize(money: Money, target: DataOutputView): Unit = {

463

target.writeLong(money.amount)

464

target.writeUTF(money.currency)

465

}

466

467

def deserialize(source: DataInputView): Money = {

468

Money(source.readLong(), source.readUTF())

469

}

470

}

471

```