0
# Serialization
1
2
Apache Spark provides a pluggable serialization framework supporting Java serialization and Kryo for optimized network communication and storage. The serialization system handles object serialization for RDD storage, shuffle operations, and broadcast variables.
3
4
## Core Serialization Interfaces
5
6
### Serializer
7
```scala { .api }
8
abstract class Serializer {
9
def newInstance(): SerializerInstance
10
def supportsRelocationOfSerializedObjects: Boolean
11
}
12
```
13
14
### SerializerInstance
15
```scala { .api }
16
abstract class SerializerInstance {
17
def serialize[T: ClassTag](t: T): ByteBuffer
18
def deserialize[T: ClassTag](bytes: ByteBuffer): T
19
def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T
20
def serializeStream(s: OutputStream): SerializationStream
21
def deserializeStream(s: InputStream): DeserializationStream
22
}
23
```
24
25
### Serialization Streams
26
```scala { .api }
27
abstract class SerializationStream {
28
def writeObject[T: ClassTag](t: T): SerializationStream
29
def flush(): Unit
30
def close(): Unit
31
def writeKey[T: ClassTag](key: T): SerializationStream
32
def writeValue[T: ClassTag](value: T): SerializationStream
33
def writeAll[T: ClassTag](iter: Iterator[T]): SerializationStream
34
}
35
36
abstract class DeserializationStream {
37
def readObject[T: ClassTag](): T
38
def readKey[T: ClassTag](): T
39
def readValue[T: ClassTag](): T
40
def close(): Unit
41
def asIterator: Iterator[Any]
42
def asKeyValueIterator: Iterator[(Any, Any)]
43
}
44
```
45
46
## Built-in Serializer Implementations
47
48
### JavaSerializer
49
```scala { .api }
50
class JavaSerializer(conf: SparkConf) extends Serializer {
51
def newInstance(): SerializerInstance
52
def supportsRelocationOfSerializedObjects: Boolean = true
53
54
private[spark] override def toString: String = "JavaSerializer"
55
}
56
57
class JavaSerializerInstance(counterReset: Int, extraDebugInfo: Boolean) extends SerializerInstance {
58
def serialize[T: ClassTag](t: T): ByteBuffer
59
def deserialize[T: ClassTag](bytes: ByteBuffer): T
60
def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T
61
def serializeStream(s: OutputStream): SerializationStream
62
def deserializeStream(s: InputStream): DeserializationStream
63
}
64
```
65
66
### KryoSerializer
67
```scala { .api }
68
class KryoSerializer(conf: SparkConf) extends Serializer with Logging {
69
def newInstance(): SerializerInstance
70
def supportsRelocationOfSerializedObjects: Boolean = false
71
72
private[spark] override def toString: String = "KryoSerializer"
73
}
74
75
class KryoSerializerInstance(ks: KryoSerializer, useUnsafe: Boolean, usePool: Boolean) extends SerializerInstance {
76
def serialize[T: ClassTag](t: T): ByteBuffer
77
def deserialize[T: ClassTag](bytes: ByteBuffer): T
78
def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T
79
def serializeStream(s: OutputStream): SerializationStream
80
def deserializeStream(s: InputStream): DeserializationStream
81
}
82
```
83
84
## Kryo Registration
85
86
### KryoRegistrator
87
```scala { .api }
88
trait KryoRegistrator {
89
def registerClasses(kryo: Kryo): Unit
90
}
91
```
92
93
### Built-in Registrators
94
```scala { .api }
95
class SparkKryoRegistrator extends KryoRegistrator {
96
def registerClasses(kryo: Kryo): Unit
97
}
98
```
99
100
## Serialization Configuration
101
102
Key configuration properties for controlling serialization behavior:
103
104
### General Serialization
105
- `spark.serializer` - Serializer class to use (Java or Kryo)
106
- `spark.closure.serializer` - Serializer for closures (Java only)
107
- `spark.serializer.objectStreamReset` - Reset frequency for Java serialization
108
109
### Kryo Configuration
110
- `spark.kryo.classesToRegister` - Comma-separated list of classes to register
111
- `spark.kryo.registrator` - Custom KryoRegistrator class
112
- `spark.kryo.registrationRequired` - Require class registration
113
- `spark.kryo.referenceTracking` - Enable reference tracking
114
- `spark.kryo.unsafe` - Use unsafe-based Kryo serialization
115
116
### Buffer Configuration
117
- `spark.kryoserializer.buffer` - Initial buffer size for Kryo
118
- `spark.kryoserializer.buffer.max` - Maximum buffer size for Kryo
119
- `spark.serializer.buffer.size` - Buffer size for other serializers
120
121
## Usage Examples
122
123
### Basic Kryo Configuration
124
```scala
125
import org.apache.spark.{SparkConf, SparkContext}
126
127
val conf = new SparkConf()
128
.setAppName("Kryo Example")
129
.setMaster("local[*]")
130
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
131
.set("spark.kryo.registrationRequired", "false")
132
.set("spark.kryoserializer.buffer", "64k")
133
.set("spark.kryoserializer.buffer.max", "64m")
134
135
val sc = new SparkContext(conf)
136
```
137
138
### Custom Kryo Registrator
139
```scala
140
import com.esotericsoftware.kryo.Kryo
141
import org.apache.spark.serializer.KryoRegistrator
142
143
// Define custom classes
144
case class Person(name: String, age: Int, addresses: List[String])
145
case class Company(name: String, employees: List[Person])
146
147
// Custom registrator
148
class MyKryoRegistrator extends KryoRegistrator {
149
override def registerClasses(kryo: Kryo): Unit = {
150
kryo.register(classOf[Person])
151
kryo.register(classOf[Company])
152
kryo.register(classOf[scala.collection.immutable.List[_]])
153
kryo.register(classOf[scala.collection.immutable.$colon$colon[_]])
154
kryo.register(classOf[scala.collection.immutable.Nil$])
155
}
156
}
157
158
// Configure Spark to use custom registrator
159
val conf = new SparkConf()
160
.setAppName("Custom Kryo")
161
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
162
.set("spark.kryo.registrator", "com.example.MyKryoRegistrator")
163
.set("spark.kryo.registrationRequired", "true") // Require registration
164
165
val sc = new SparkContext(conf)
166
167
// Use custom objects
168
val people = sc.parallelize(Seq(
169
Person("Alice", 25, List("123 Main St", "456 Oak Ave")),
170
Person("Bob", 30, List("789 Pine St"))
171
))
172
173
val companies = sc.parallelize(Seq(
174
Company("TechCorp", List(
175
Person("Alice", 25, List("123 Main St")),
176
Person("Bob", 30, List("789 Pine St"))
177
))
178
))
179
180
val result = companies.flatMap(_.employees).collect()
181
```
182
183
### Class Registration via Configuration
184
```scala
185
// Register classes via configuration instead of custom registrator
186
val conf = new SparkConf()
187
.setAppName("Class Registration")
188
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
189
.set("spark.kryo.classesToRegister",
190
"com.example.Person," +
191
"com.example.Company," +
192
"scala.collection.immutable.List," +
193
"scala.collection.immutable.$colon$colon," +
194
"scala.collection.immutable.Nil$"
195
)
196
197
val sc = new SparkContext(conf)
198
```
199
200
### Performance Comparison
201
```scala
202
import scala.util.Random
203
204
case class LargeObject(
205
id: Long,
206
name: String,
207
data: Array[Double],
208
metadata: Map[String, String]
209
)
210
211
// Generate test data
212
val random = new Random(42)
213
val testData = (1 to 10000).map { i =>
214
LargeObject(
215
id = i,
216
name = s"object_$i",
217
data = Array.fill(100)(random.nextDouble()),
218
metadata = Map(
219
"type" -> "test",
220
"version" -> "1.0",
221
"created" -> System.currentTimeMillis().toString
222
)
223
)
224
}
225
226
// Test with Java serialization
227
def testJavaSerialization(): Long = {
228
val javaConf = new SparkConf()
229
.setAppName("Java Serialization Test")
230
.setMaster("local[*]")
231
.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
232
233
val javaSc = new SparkContext(javaConf)
234
235
val start = System.currentTimeMillis()
236
val rdd = javaSc.parallelize(testData)
237
rdd.cache()
238
val count = rdd.count()
239
val end = System.currentTimeMillis()
240
241
javaSc.stop()
242
end - start
243
}
244
245
// Test with Kryo serialization
246
def testKryoSerialization(): Long = {
247
val kryoConf = new SparkConf()
248
.setAppName("Kryo Serialization Test")
249
.setMaster("local[*]")
250
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
251
.set("spark.kryo.registrationRequired", "false")
252
253
val kryoSc = new SparkContext(kryoConf)
254
255
val start = System.currentTimeMillis()
256
val rdd = kryoSc.parallelize(testData)
257
rdd.cache()
258
val count = rdd.count()
259
val end = System.currentTimeMillis()
260
261
kryoSc.stop()
262
end - start
263
}
264
265
// Compare performance
266
val javaTime = testJavaSerialization()
267
val kryoTime = testKryoSerialization()
268
269
println(s"Java serialization time: ${javaTime}ms")
270
println(s"Kryo serialization time: ${kryoTime}ms")
271
println(s"Kryo speedup: ${javaTime.toDouble / kryoTime}x")
272
```
273
274
### Custom Serialization for Complex Types
275
```scala
276
import com.esotericsoftware.kryo.{Kryo, Serializer}
277
import com.esotericsoftware.kryo.io.{Input, Output}
278
import java.time.LocalDateTime
279
import java.time.format.DateTimeFormatter
280
281
// Custom class with complex serialization needs
282
case class TimestampedEvent(
283
timestamp: LocalDateTime,
284
eventType: String,
285
data: Map[String, Any],
286
metadata: Option[String]
287
)
288
289
// Custom Kryo serializer for LocalDateTime
290
class LocalDateTimeSerializer extends Serializer[LocalDateTime] {
291
private val formatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME
292
293
override def write(kryo: Kryo, output: Output, dateTime: LocalDateTime): Unit = {
294
output.writeString(dateTime.format(formatter))
295
}
296
297
override def read(kryo: Kryo, input: Input, `type`: Class[LocalDateTime]): LocalDateTime = {
298
LocalDateTime.parse(input.readString(), formatter)
299
}
300
}
301
302
// Registrator with custom serializers
303
class AdvancedKryoRegistrator extends KryoRegistrator {
304
override def registerClasses(kryo: Kryo): Unit = {
305
kryo.register(classOf[TimestampedEvent])
306
kryo.register(classOf[LocalDateTime], new LocalDateTimeSerializer)
307
kryo.register(classOf[scala.collection.immutable.Map[_, _]])
308
kryo.register(classOf[scala.Some[_]])
309
kryo.register(classOf[scala.None$])
310
311
// Configure Kryo settings
312
kryo.setReferences(true) // Enable object reference tracking
313
kryo.setRegistrationRequired(false) // Allow unregistered classes
314
}
315
}
316
317
val conf = new SparkConf()
318
.setAppName("Advanced Kryo")
319
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
320
.set("spark.kryo.registrator", "com.example.AdvancedKryoRegistrator")
321
322
val sc = new SparkContext(conf)
323
324
// Use complex objects
325
val events = sc.parallelize(Seq(
326
TimestampedEvent(
327
LocalDateTime.now(),
328
"user_login",
329
Map("userId" -> 123, "sessionId" -> "abc123"),
330
Some("Mobile app login")
331
),
332
TimestampedEvent(
333
LocalDateTime.now().minusHours(1),
334
"page_view",
335
Map("page" -> "/home", "duration" -> 30.5),
336
None
337
)
338
))
339
340
val result = events.filter(_.eventType == "user_login").collect()
341
```
342
343
### Broadcast Variable Serialization
344
```scala
345
// Large lookup table that benefits from efficient serialization
346
val largeLookupTable = (1 to 100000).map { i =>
347
i -> s"value_$i"
348
}.toMap
349
350
// Configure for optimal broadcast performance
351
val conf = new SparkConf()
352
.setAppName("Broadcast Serialization")
353
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
354
.set("spark.broadcast.compress", "true")
355
.set("spark.io.compression.codec", "lz4")
356
357
val sc = new SparkContext(conf)
358
359
// Broadcast with efficient serialization
360
val broadcastLookup = sc.broadcast(largeLookupTable)
361
362
val data = sc.parallelize(1 to 10000)
363
val enrichedData = data.map { id =>
364
val lookup = broadcastLookup.value
365
(id, lookup.getOrElse(id, "unknown"))
366
}
367
368
val result = enrichedData.collect()
369
broadcastLookup.unpersist()
370
```
371
372
## Best Practices
373
374
### Serializer Selection
375
1. **Use Kryo for production**: Generally 2-10x faster than Java serialization
376
2. **Register frequently used classes**: Improves performance and reduces storage
377
3. **Benchmark your workload**: Test both serializers with your specific data types
378
4. **Consider compression**: Enable compression for large objects
379
5. **Monitor serialization overhead**: Use Spark UI to identify bottlenecks
380
381
### Kryo Configuration
382
1. **Increase buffer sizes**: Set appropriate buffer sizes for large objects
383
2. **Enable registration requirement**: Use `registrationRequired=true` to catch unregistered classes
384
3. **Use custom serializers**: Implement efficient serialization for complex types
385
4. **Consider reference tracking**: Enable for object graphs with shared references
386
5. **Test thoroughly**: Verify serialization/deserialization of all data types
387
388
### Performance Optimization
389
1. **Avoid nested collections**: Flatten complex nested structures when possible
390
2. **Use primitive collections**: Prefer specialized collections for primitives
391
3. **Minimize object creation**: Reuse objects in serialization-heavy operations
392
4. **Profile serialization costs**: Identify expensive serialization operations
393
5. **Consider data formats**: Use efficient formats like Avro or Parquet for storage
394
395
### Common Pitfalls
396
1. **Serialization failures**: Always test serialization of custom classes
397
2. **Class loading issues**: Ensure all required classes are available on workers
398
3. **Version compatibility**: Be careful with serialized data across Spark versions
399
4. **Lambda serialization**: Avoid capturing large objects in closures
400
5. **Singleton objects**: Be careful with serializing singleton instances