Apache Spark provides a pluggable serialization framework supporting Java serialization and Kryo for optimized network communication and storage. The serialization system handles object serialization for RDD storage, shuffle operations, and broadcast variables.
abstract class Serializer {
def newInstance(): SerializerInstance
def supportsRelocationOfSerializedObjects: Boolean
}abstract class SerializerInstance {
def serialize[T: ClassTag](t: T): ByteBuffer
def deserialize[T: ClassTag](bytes: ByteBuffer): T
def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T
def serializeStream(s: OutputStream): SerializationStream
def deserializeStream(s: InputStream): DeserializationStream
}abstract class SerializationStream {
def writeObject[T: ClassTag](t: T): SerializationStream
def flush(): Unit
def close(): Unit
def writeKey[T: ClassTag](key: T): SerializationStream
def writeValue[T: ClassTag](value: T): SerializationStream
def writeAll[T: ClassTag](iter: Iterator[T]): SerializationStream
}
abstract class DeserializationStream {
def readObject[T: ClassTag](): T
def readKey[T: ClassTag](): T
def readValue[T: ClassTag](): T
def close(): Unit
def asIterator: Iterator[Any]
def asKeyValueIterator: Iterator[(Any, Any)]
}class JavaSerializer(conf: SparkConf) extends Serializer {
def newInstance(): SerializerInstance
def supportsRelocationOfSerializedObjects: Boolean = true
private[spark] override def toString: String = "JavaSerializer"
}
class JavaSerializerInstance(counterReset: Int, extraDebugInfo: Boolean) extends SerializerInstance {
def serialize[T: ClassTag](t: T): ByteBuffer
def deserialize[T: ClassTag](bytes: ByteBuffer): T
def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T
def serializeStream(s: OutputStream): SerializationStream
def deserializeStream(s: InputStream): DeserializationStream
}class KryoSerializer(conf: SparkConf) extends Serializer with Logging {
def newInstance(): SerializerInstance
def supportsRelocationOfSerializedObjects: Boolean = false
private[spark] override def toString: String = "KryoSerializer"
}
class KryoSerializerInstance(ks: KryoSerializer, useUnsafe: Boolean, usePool: Boolean) extends SerializerInstance {
def serialize[T: ClassTag](t: T): ByteBuffer
def deserialize[T: ClassTag](bytes: ByteBuffer): T
def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T
def serializeStream(s: OutputStream): SerializationStream
def deserializeStream(s: InputStream): DeserializationStream
}trait KryoRegistrator {
def registerClasses(kryo: Kryo): Unit
}class SparkKryoRegistrator extends KryoRegistrator {
def registerClasses(kryo: Kryo): Unit
}Key configuration properties for controlling serialization behavior:
spark.serializer - Serializer class to use (Java or Kryo)spark.closure.serializer - Serializer for closures (Java only)spark.serializer.objectStreamReset - Reset frequency for Java serializationspark.kryo.classesToRegister - Comma-separated list of classes to registerspark.kryo.registrator - Custom KryoRegistrator classspark.kryo.registrationRequired - Require class registrationspark.kryo.referenceTracking - Enable reference trackingspark.kryo.unsafe - Use unsafe-based Kryo serializationspark.kryoserializer.buffer - Initial buffer size for Kryospark.kryoserializer.buffer.max - Maximum buffer size for Kryospark.serializer.buffer.size - Buffer size for other serializersimport org.apache.spark.{SparkConf, SparkContext}
val conf = new SparkConf()
.setAppName("Kryo Example")
.setMaster("local[*]")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrationRequired", "false")
.set("spark.kryoserializer.buffer", "64k")
.set("spark.kryoserializer.buffer.max", "64m")
val sc = new SparkContext(conf)import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator
// Define custom classes
case class Person(name: String, age: Int, addresses: List[String])
case class Company(name: String, employees: List[Person])
// Custom registrator
class MyKryoRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo): Unit = {
kryo.register(classOf[Person])
kryo.register(classOf[Company])
kryo.register(classOf[scala.collection.immutable.List[_]])
kryo.register(classOf[scala.collection.immutable.$colon$colon[_]])
kryo.register(classOf[scala.collection.immutable.Nil$])
}
}
// Configure Spark to use custom registrator
val conf = new SparkConf()
.setAppName("Custom Kryo")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "com.example.MyKryoRegistrator")
.set("spark.kryo.registrationRequired", "true") // Require registration
val sc = new SparkContext(conf)
// Use custom objects
val people = sc.parallelize(Seq(
Person("Alice", 25, List("123 Main St", "456 Oak Ave")),
Person("Bob", 30, List("789 Pine St"))
))
val companies = sc.parallelize(Seq(
Company("TechCorp", List(
Person("Alice", 25, List("123 Main St")),
Person("Bob", 30, List("789 Pine St"))
))
))
val result = companies.flatMap(_.employees).collect()// Register classes via configuration instead of custom registrator
val conf = new SparkConf()
.setAppName("Class Registration")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.classesToRegister",
"com.example.Person," +
"com.example.Company," +
"scala.collection.immutable.List," +
"scala.collection.immutable.$colon$colon," +
"scala.collection.immutable.Nil$"
)
val sc = new SparkContext(conf)import scala.util.Random
case class LargeObject(
id: Long,
name: String,
data: Array[Double],
metadata: Map[String, String]
)
// Generate test data
val random = new Random(42)
val testData = (1 to 10000).map { i =>
LargeObject(
id = i,
name = s"object_$i",
data = Array.fill(100)(random.nextDouble()),
metadata = Map(
"type" -> "test",
"version" -> "1.0",
"created" -> System.currentTimeMillis().toString
)
)
}
// Test with Java serialization
def testJavaSerialization(): Long = {
val javaConf = new SparkConf()
.setAppName("Java Serialization Test")
.setMaster("local[*]")
.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
val javaSc = new SparkContext(javaConf)
val start = System.currentTimeMillis()
val rdd = javaSc.parallelize(testData)
rdd.cache()
val count = rdd.count()
val end = System.currentTimeMillis()
javaSc.stop()
end - start
}
// Test with Kryo serialization
def testKryoSerialization(): Long = {
val kryoConf = new SparkConf()
.setAppName("Kryo Serialization Test")
.setMaster("local[*]")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrationRequired", "false")
val kryoSc = new SparkContext(kryoConf)
val start = System.currentTimeMillis()
val rdd = kryoSc.parallelize(testData)
rdd.cache()
val count = rdd.count()
val end = System.currentTimeMillis()
kryoSc.stop()
end - start
}
// Compare performance
val javaTime = testJavaSerialization()
val kryoTime = testKryoSerialization()
println(s"Java serialization time: ${javaTime}ms")
println(s"Kryo serialization time: ${kryoTime}ms")
println(s"Kryo speedup: ${javaTime.toDouble / kryoTime}x")import com.esotericsoftware.kryo.{Kryo, Serializer}
import com.esotericsoftware.kryo.io.{Input, Output}
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
// Custom class with complex serialization needs
case class TimestampedEvent(
timestamp: LocalDateTime,
eventType: String,
data: Map[String, Any],
metadata: Option[String]
)
// Custom Kryo serializer for LocalDateTime
class LocalDateTimeSerializer extends Serializer[LocalDateTime] {
private val formatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME
override def write(kryo: Kryo, output: Output, dateTime: LocalDateTime): Unit = {
output.writeString(dateTime.format(formatter))
}
override def read(kryo: Kryo, input: Input, `type`: Class[LocalDateTime]): LocalDateTime = {
LocalDateTime.parse(input.readString(), formatter)
}
}
// Registrator with custom serializers
class AdvancedKryoRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo): Unit = {
kryo.register(classOf[TimestampedEvent])
kryo.register(classOf[LocalDateTime], new LocalDateTimeSerializer)
kryo.register(classOf[scala.collection.immutable.Map[_, _]])
kryo.register(classOf[scala.Some[_]])
kryo.register(classOf[scala.None$])
// Configure Kryo settings
kryo.setReferences(true) // Enable object reference tracking
kryo.setRegistrationRequired(false) // Allow unregistered classes
}
}
val conf = new SparkConf()
.setAppName("Advanced Kryo")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "com.example.AdvancedKryoRegistrator")
val sc = new SparkContext(conf)
// Use complex objects
val events = sc.parallelize(Seq(
TimestampedEvent(
LocalDateTime.now(),
"user_login",
Map("userId" -> 123, "sessionId" -> "abc123"),
Some("Mobile app login")
),
TimestampedEvent(
LocalDateTime.now().minusHours(1),
"page_view",
Map("page" -> "/home", "duration" -> 30.5),
None
)
))
val result = events.filter(_.eventType == "user_login").collect()// Large lookup table that benefits from efficient serialization
val largeLookupTable = (1 to 100000).map { i =>
i -> s"value_$i"
}.toMap
// Configure for optimal broadcast performance
val conf = new SparkConf()
.setAppName("Broadcast Serialization")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.broadcast.compress", "true")
.set("spark.io.compression.codec", "lz4")
val sc = new SparkContext(conf)
// Broadcast with efficient serialization
val broadcastLookup = sc.broadcast(largeLookupTable)
val data = sc.parallelize(1 to 10000)
val enrichedData = data.map { id =>
val lookup = broadcastLookup.value
(id, lookup.getOrElse(id, "unknown"))
}
val result = enrichedData.collect()
broadcastLookup.unpersist()registrationRequired=true to catch unregistered classes