0
# Serialization
1
2
Serialization frameworks for efficient data transfer and storage in Spark, supporting Java serialization and Kryo for optimized performance across cluster nodes.
3
4
## Capabilities
5
6
### Serializer Base Class
7
8
Abstract base class for all serialization implementations in Spark.
9
10
```scala { .api }
11
/**
12
* Base class for serializers used by Spark
13
*/
14
abstract class Serializer {
15
/** Default buffer size for serialization */
16
def defaultBufferSize: Int = 65536
17
18
/** Create new serializer instance for thread-local use */
19
def newInstance(): SerializerInstance
20
21
/** Whether this serializer supports relocation of objects */
22
def supportsRelocationOfSerializedObjects: Boolean = true
23
}
24
25
/**
26
* Thread-local serializer instance
27
*/
28
abstract class SerializerInstance {
29
/** Serialize object to ByteBuffer */
30
def serialize[T: ClassTag](t: T): ByteBuffer
31
def serialize[T: ClassTag](t: T, buffer: ByteBuffer): ByteBuffer
32
33
/** Deserialize object from ByteBuffer */
34
def deserialize[T: ClassTag](bytes: ByteBuffer): T
35
def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T
36
37
/** Create serialization stream for multiple objects */
38
def serializeStream(s: OutputStream): SerializationStream
39
40
/** Create deserialization stream for multiple objects */
41
def deserializeStream(s: InputStream): DeserializationStream
42
}
43
```
44
45
### Serialization Streams
46
47
Streaming interfaces for serializing multiple objects efficiently.
48
49
```scala { .api }
50
/**
51
* Stream for writing serialized objects
52
*/
53
abstract class SerializationStream extends Closeable {
54
/** Write object to stream */
55
def writeObject[T: ClassTag](t: T): SerializationStream
56
57
/** Write key-value pair */
58
def writeKey[T: ClassTag](key: T): SerializationStream
59
def writeValue[T: ClassTag](value: T): SerializationStream
60
61
/** Write all objects from iterator */
62
def writeAll[T: ClassTag](iter: Iterator[T]): SerializationStream
63
64
/** Flush stream */
65
def flush(): Unit
66
67
/** Close stream */
68
def close(): Unit
69
}
70
71
/**
72
* Stream for reading serialized objects
73
*/
74
abstract class DeserializationStream extends Closeable {
75
/** Read object from stream */
76
def readObject[T: ClassTag](): T
77
78
/** Read key from stream */
79
def readKey[T: ClassTag](): T
80
81
/** Read value from stream */
82
def readValue[T: ClassTag](): T
83
84
/** Close stream */
85
def close(): Unit
86
87
/** Iterator over all objects in stream */
88
def asIterator: Iterator[Any]
89
def asKeyValueIterator: Iterator[(Any, Any)]
90
}
91
```
92
93
### Java Serializer
94
95
Default serializer using Java's built-in serialization mechanism.
96
97
```scala { .api }
98
/**
99
* Serializer using Java's built-in serialization
100
* @param conf Spark configuration
101
*/
102
class JavaSerializer(conf: SparkConf) extends Serializer with Externalizable {
103
override def newInstance(): SerializerInstance = {
104
val classLoader = defaultClassLoader.getOrElse(Thread.currentThread.getContextClassLoader)
105
new JavaSerializerInstance(counterReset, classLoader, this)
106
}
107
108
override def supportsRelocationOfSerializedObjects: Boolean = {
109
// Java serialization stores full class information, so objects can be relocated
110
true
111
}
112
113
private[spark] def writeExternal(out: ObjectOutput): Unit = { }
114
private[spark] def readExternal(in: ObjectInput): Unit = { }
115
}
116
117
/**
118
* Java serializer instance
119
*/
120
private[spark] class JavaSerializerInstance(
121
counterReset: Int,
122
defaultClassLoader: ClassLoader,
123
javaSerializer: JavaSerializer
124
) extends SerializerInstance {
125
126
override def serialize[T: ClassTag](t: T): ByteBuffer = {
127
val bos = new ByteArrayOutputStream()
128
val out = serializeStream(bos)
129
out.writeObject(t)
130
out.close()
131
ByteBuffer.wrap(bos.toByteArray)
132
}
133
134
override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {
135
val bis = new ByteArrayInputStream(bytes.array(), bytes.position(), bytes.remaining())
136
val in = deserializeStream(bis)
137
in.readObject[T]()
138
}
139
140
override def serializeStream(s: OutputStream): SerializationStream = {
141
new JavaSerializationStream(s, counterReset)
142
}
143
144
override def deserializeStream(s: InputStream): DeserializationStream = {
145
new JavaDeserializationStream(s, defaultClassLoader)
146
}
147
}
148
```
149
150
### Kryo Serializer
151
152
High-performance serializer using the Kryo serialization library.
153
154
```scala { .api }
155
/**
156
* Serializer using Kryo serialization library
157
* @param conf Spark configuration
158
*/
159
class KryoSerializer(conf: SparkConf) extends Serializer with Logging with Serializable {
160
161
override def newInstance(): SerializerInstance = {
162
this.synchronized {
163
new KryoSerializerInstance(this, useUnsafe, usePool)
164
}
165
}
166
167
override def supportsRelocationOfSerializedObjects: Boolean = {
168
// Kryo serialization may not include full class information
169
false
170
}
171
172
/** Buffer size for Kryo */
173
def bufferSize: Int = conf.get(KRYO_SERIALIZER_BUFFER_SIZE)
174
175
/** Maximum buffer size for Kryo */
176
def maxBufferSize: Int = conf.get(KRYO_SERIALIZER_MAX_BUFFER_SIZE)
177
178
/** Whether to use unsafe I/O */
179
def useUnsafe: Boolean = conf.get(KRYO_USE_UNSAFE)
180
181
/** Whether to use object pools */
182
def usePool: Boolean = conf.get(KRYO_USE_POOL)
183
184
/** Custom Kryo registrator class */
185
def registratorClass: Option[Class[_]] = {
186
conf.getOption("spark.kryo.registrator").map { className =>
187
Class.forName(className, true, Thread.currentThread.getContextClassLoader)
188
}
189
}
190
191
/** Classes to register with Kryo */
192
def registrationRequired: Boolean = conf.get(KRYO_REGISTRATION_REQUIRED)
193
194
/** Create and configure Kryo instance */
195
def newKryo(): Kryo = {
196
val kryo = new Kryo()
197
198
// Configure Kryo settings
199
kryo.setRegistrationRequired(registrationRequired)
200
kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()))
201
202
// Register common Spark classes
203
kryo.register(classOf[Array[Any]])
204
kryo.register(classOf[Array[Object]])
205
kryo.register(classOf[Array[String]])
206
kryo.register(classOf[Array[Byte]])
207
kryo.register(classOf[Array[Int]])
208
kryo.register(classOf[Array[Long]])
209
kryo.register(classOf[Array[Double]])
210
211
// Apply custom registrator if specified
212
registratorClass.foreach { clazz =>
213
val registrator = clazz.newInstance().asInstanceOf[KryoRegistrator]
214
registrator.registerClasses(kryo)
215
}
216
217
kryo
218
}
219
}
220
221
/**
222
* Interface for custom Kryo registration
223
*/
224
trait KryoRegistrator {
225
def registerClasses(kryo: Kryo): Unit
226
}
227
228
/**
229
* Kryo serializer instance
230
*/
231
private[spark] class KryoSerializerInstance(
232
ks: KryoSerializer,
233
useUnsafe: Boolean,
234
usePool: Boolean
235
) extends SerializerInstance {
236
237
override def serialize[T: ClassTag](t: T): ByteBuffer = {
238
output.reset()
239
kryo.writeClassAndObject(output, t)
240
ByteBuffer.wrap(output.toBytes)
241
}
242
243
override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {
244
input.setBuffer(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining())
245
kryo.readClassAndObject(input).asInstanceOf[T]
246
}
247
248
override def serializeStream(s: OutputStream): SerializationStream = {
249
new KryoSerializationStream(ks, s, useUnsafe)
250
}
251
252
override def deserializeStream(s: InputStream): DeserializationStream = {
253
new KryoDeserializationStream(ks, s, useUnsafe)
254
}
255
256
private[this] var kryo: Kryo = ks.newKryo()
257
private[this] var output: Output = ks.newKryoOutput()
258
private[this] var input: Input = ks.newKryoInput()
259
}
260
```
261
262
### Serializer Configuration
263
264
Configuration options for different serialization strategies.
265
266
```scala { .api }
267
/**
268
* Spark configuration keys for serialization
269
*/
270
object SerializationConfig {
271
// Serializer class
272
val SERIALIZER = ConfigBuilder("spark.serializer")
273
.version("0.5.0")
274
.stringConf
275
.createWithDefault("org.apache.spark.serializer.JavaSerializer")
276
277
// Kryo configurations
278
val KRYO_SERIALIZER_BUFFER_SIZE = ConfigBuilder("spark.kryoserializer.buffer")
279
.version("0.5.0")
280
.bytesConf(ByteUnit.KiB)
281
.createWithDefaultString("64k")
282
283
val KRYO_SERIALIZER_MAX_BUFFER_SIZE = ConfigBuilder("spark.kryoserializer.buffer.max")
284
.version("0.5.0")
285
.bytesConf(ByteUnit.MiB)
286
.createWithDefaultString("64m")
287
288
val KRYO_USE_UNSAFE = ConfigBuilder("spark.kryo.unsafe")
289
.version("2.1.0")
290
.booleanConf
291
.createWithDefault(false)
292
293
val KRYO_USE_POOL = ConfigBuilder("spark.kryo.pool")
294
.version("3.0.0")
295
.booleanConf
296
.createWithDefault(true)
297
298
val KRYO_REGISTRATION_REQUIRED = ConfigBuilder("spark.kryo.registrationRequired")
299
.version("1.1.0")
300
.booleanConf
301
.createWithDefault(false)
302
}
303
```
304
305
### Custom Serializers
306
307
Creating custom serializers for specific data types or performance requirements.
308
309
```scala { .api }
310
/**
311
* Example: Custom serializer for specific domain objects
312
*/
313
class CustomDomainSerializer(conf: SparkConf) extends Serializer {
314
override def newInstance(): SerializerInstance = new CustomDomainSerializerInstance()
315
}
316
317
class CustomDomainSerializerInstance extends SerializerInstance {
318
override def serialize[T: ClassTag](t: T): ByteBuffer = t match {
319
case customObj: CustomDomainObject =>
320
// Custom serialization logic
321
val buffer = ByteBuffer.allocate(1024)
322
buffer.putInt(customObj.id)
323
buffer.put(customObj.name.getBytes("UTF-8"))
324
buffer.flip()
325
buffer
326
case _ =>
327
// Fall back to Java serialization
328
val bos = new ByteArrayOutputStream()
329
val oos = new ObjectOutputStream(bos)
330
oos.writeObject(t)
331
oos.close()
332
ByteBuffer.wrap(bos.toByteArray)
333
}
334
335
override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {
336
val classTag = implicitly[ClassTag[T]]
337
if (classTag.runtimeClass == classOf[CustomDomainObject]) {
338
// Custom deserialization logic
339
val id = bytes.getInt()
340
val nameBytes = new Array[Byte](bytes.remaining())
341
bytes.get(nameBytes)
342
val name = new String(nameBytes, "UTF-8")
343
CustomDomainObject(id, name).asInstanceOf[T]
344
} else {
345
// Fall back to Java deserialization
346
val bis = new ByteArrayInputStream(bytes.array(), bytes.position(), bytes.remaining())
347
val ois = new ObjectInputStream(bis)
348
ois.readObject().asInstanceOf[T]
349
}
350
}
351
352
override def serializeStream(s: OutputStream): SerializationStream = ???
353
override def deserializeStream(s: InputStream): DeserializationStream = ???
354
}
355
356
case class CustomDomainObject(id: Int, name: String)
357
```
358
359
### Kryo Registration Examples
360
361
```scala { .api }
362
/**
363
* Custom Kryo registrator for application-specific classes
364
*/
365
class MyKryoRegistrator extends KryoRegistrator {
366
override def registerClasses(kryo: Kryo): Unit = {
367
// Register custom classes
368
kryo.register(classOf[CustomDomainObject])
369
kryo.register(classOf[Array[CustomDomainObject]])
370
371
// Register with custom serializer
372
kryo.register(classOf[ComplexObject], new ComplexObjectSerializer())
373
374
// Register collection types
375
kryo.register(classOf[scala.collection.mutable.WrappedArray.ofRef[_]])
376
kryo.register(classOf[scala.collection.immutable.Map[_, _]])
377
378
// Register common third-party classes
379
kryo.register(classOf[org.joda.time.DateTime])
380
kryo.register(classOf[java.util.UUID])
381
}
382
}
383
384
/**
385
* Custom Kryo serializer for complex objects
386
*/
387
class ComplexObjectSerializer extends com.esotericsoftware.kryo.Serializer[ComplexObject] {
388
override def write(kryo: Kryo, output: Output, obj: ComplexObject): Unit = {
389
output.writeInt(obj.id)
390
output.writeString(obj.data)
391
kryo.writeObject(output, obj.metadata)
392
}
393
394
override def read(kryo: Kryo, input: Input, clazz: Class[ComplexObject]): ComplexObject = {
395
val id = input.readInt()
396
val data = input.readString()
397
val metadata = kryo.readObject(input, classOf[Map[String, String]])
398
ComplexObject(id, data, metadata)
399
}
400
}
401
402
case class ComplexObject(id: Int, data: String, metadata: Map[String, String])
403
```
404
405
**Usage Examples:**
406
407
```scala
408
import org.apache.spark.{SparkContext, SparkConf}
409
import org.apache.spark.serializer.{KryoSerializer, JavaSerializer}
410
411
// Java Serializer (default)
412
val javaConf = new SparkConf()
413
.setAppName("Java Serialization Example")
414
.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
415
416
val sc1 = new SparkContext(javaConf)
417
418
// Kryo Serializer (recommended for performance)
419
val kryoConf = new SparkConf()
420
.setAppName("Kryo Serialization Example")
421
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
422
.set("spark.kryo.registrator", "com.example.MyKryoRegistrator")
423
.set("spark.kryoserializer.buffer", "128k")
424
.set("spark.kryoserializer.buffer.max", "128m")
425
.set("spark.kryo.registrationRequired", "true")
426
427
val sc2 = new SparkContext(kryoConf)
428
429
// Example with custom objects
430
case class Person(id: Int, name: String, age: Int)
431
432
val people = sc2.parallelize(Array(
433
Person(1, "Alice", 25),
434
Person(2, "Bob", 30),
435
Person(3, "Charlie", 35)
436
))
437
438
// Serialization happens automatically during shuffles
439
val grouped = people.groupBy(_.age / 10) // Triggers serialization
440
val result = grouped.collect()
441
442
// Broadcasting also uses serialization
443
val lookup = Map(1 -> "Manager", 2 -> "Developer", 3 -> "Analyst")
444
val broadcastLookup = sc2.broadcast(lookup)
445
446
val enriched = people.map { person =>
447
val roles = broadcastLookup.value
448
(person, roles.getOrElse(person.id, "Unknown"))
449
}
450
451
sc1.stop()
452
sc2.stop()
453
```
454
455
**Java Examples:**
456
457
```java
458
import org.apache.spark.SparkConf;
459
import org.apache.spark.api.java.JavaSparkContext;
460
import org.apache.spark.serializer.KryoRegistrator;
461
import com.esotericsoftware.kryo.Kryo;
462
463
// Kryo configuration in Java
464
SparkConf conf = new SparkConf()
465
.setAppName("Java Kryo Example")
466
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
467
.set("spark.kryo.registrator", MyJavaKryoRegistrator.class.getName());
468
469
JavaSparkContext sc = new JavaSparkContext(conf);
470
471
// Custom Kryo registrator in Java
472
public class MyJavaKryoRegistrator implements KryoRegistrator {
473
@Override
474
public void registerClasses(Kryo kryo) {
475
kryo.register(Person.class);
476
kryo.register(Person[].class);
477
}
478
}
479
480
public class Person implements Serializable {
481
private int id;
482
private String name;
483
484
// Constructors, getters, setters...
485
}
486
```
487
488
## Performance Considerations
489
490
### Java vs Kryo Serialization
491
492
**Java Serialization:**
493
- Pros: Built-in, handles complex object graphs, version compatibility
494
- Cons: Slower, larger serialized size, reflection overhead
495
496
**Kryo Serialization:**
497
- Pros: 2-10x faster, more compact, less CPU overhead
498
- Cons: Requires class registration, less compatible with schema evolution
499
500
### Configuration Best Practices
501
502
```scala
503
// Optimal Kryo configuration
504
val conf = new SparkConf()
505
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
506
.set("spark.kryo.registrator", "com.mycompany.MyKryoRegistrator")
507
.set("spark.kryoserializer.buffer", "256k") // Increase if needed
508
.set("spark.kryoserializer.buffer.max", "256m") // Increase for large objects
509
.set("spark.kryo.unsafe", "true") // Use unsafe I/O for performance
510
.set("spark.kryo.registrationRequired", "false") // Set true only after thorough testing
511
```
512
513
### Common Serialization Issues
514
515
```scala
516
// Avoid: Non-serializable objects in closures
517
val nonSerializable = new Database() // Not serializable
518
rdd.map(x => nonSerializable.lookup(x)) // Will fail!
519
520
// Solution: Use broadcast variables or create inside map
521
val config = Map("url" -> "jdbc:...")
522
val broadcastConfig = sc.broadcast(config)
523
rdd.map { x =>
524
val db = new Database(broadcastConfig.value("url"))
525
db.lookup(x)
526
}
527
528
// Avoid: Large objects in closures
529
val largeLookup = loadLargeMap() // Will be serialized with every task
530
rdd.map(x => largeLookup.get(x))
531
532
// Solution: Use broadcast variables
533
val broadcastLookup = sc.broadcast(largeLookup)
534
rdd.map(x => broadcastLookup.value.get(x))
535
```
536
537
Proper serialization configuration is crucial for Spark performance, especially in shuffle-heavy workloads and when using broadcast variables or accumulators.