Serialization frameworks for efficient data transfer and storage in Spark, supporting Java serialization and Kryo for optimized performance across cluster nodes.
Abstract base class for all serialization implementations in Spark.
/**
* Base class for serializers used by Spark
*/
abstract class Serializer {
/** Default buffer size for serialization */
def defaultBufferSize: Int = 65536
/** Create new serializer instance for thread-local use */
def newInstance(): SerializerInstance
/** Whether this serializer supports relocation of objects */
def supportsRelocationOfSerializedObjects: Boolean = true
}
/**
* Thread-local serializer instance
*/
abstract class SerializerInstance {
/** Serialize object to ByteBuffer */
def serialize[T: ClassTag](t: T): ByteBuffer
def serialize[T: ClassTag](t: T, buffer: ByteBuffer): ByteBuffer
/** Deserialize object from ByteBuffer */
def deserialize[T: ClassTag](bytes: ByteBuffer): T
def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T
/** Create serialization stream for multiple objects */
def serializeStream(s: OutputStream): SerializationStream
/** Create deserialization stream for multiple objects */
def deserializeStream(s: InputStream): DeserializationStream
}Streaming interfaces for serializing multiple objects efficiently.
/**
* Stream for writing serialized objects
*/
abstract class SerializationStream extends Closeable {
/** Write object to stream */
def writeObject[T: ClassTag](t: T): SerializationStream
/** Write key-value pair */
def writeKey[T: ClassTag](key: T): SerializationStream
def writeValue[T: ClassTag](value: T): SerializationStream
/** Write all objects from iterator */
def writeAll[T: ClassTag](iter: Iterator[T]): SerializationStream
/** Flush stream */
def flush(): Unit
/** Close stream */
def close(): Unit
}
/**
* Stream for reading serialized objects
*/
abstract class DeserializationStream extends Closeable {
/** Read object from stream */
def readObject[T: ClassTag](): T
/** Read key from stream */
def readKey[T: ClassTag](): T
/** Read value from stream */
def readValue[T: ClassTag](): T
/** Close stream */
def close(): Unit
/** Iterator over all objects in stream */
def asIterator: Iterator[Any]
def asKeyValueIterator: Iterator[(Any, Any)]
}Default serializer using Java's built-in serialization mechanism.
/**
* Serializer using Java's built-in serialization
* @param conf Spark configuration
*/
class JavaSerializer(conf: SparkConf) extends Serializer with Externalizable {
override def newInstance(): SerializerInstance = {
val classLoader = defaultClassLoader.getOrElse(Thread.currentThread.getContextClassLoader)
new JavaSerializerInstance(counterReset, classLoader, this)
}
override def supportsRelocationOfSerializedObjects: Boolean = {
// Java serialization stores full class information, so objects can be relocated
true
}
private[spark] def writeExternal(out: ObjectOutput): Unit = { }
private[spark] def readExternal(in: ObjectInput): Unit = { }
}
/**
* Java serializer instance
*/
private[spark] class JavaSerializerInstance(
counterReset: Int,
defaultClassLoader: ClassLoader,
javaSerializer: JavaSerializer
) extends SerializerInstance {
override def serialize[T: ClassTag](t: T): ByteBuffer = {
val bos = new ByteArrayOutputStream()
val out = serializeStream(bos)
out.writeObject(t)
out.close()
ByteBuffer.wrap(bos.toByteArray)
}
override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {
val bis = new ByteArrayInputStream(bytes.array(), bytes.position(), bytes.remaining())
val in = deserializeStream(bis)
in.readObject[T]()
}
override def serializeStream(s: OutputStream): SerializationStream = {
new JavaSerializationStream(s, counterReset)
}
override def deserializeStream(s: InputStream): DeserializationStream = {
new JavaDeserializationStream(s, defaultClassLoader)
}
}High-performance serializer using the Kryo serialization library.
/**
* Serializer using Kryo serialization library
* @param conf Spark configuration
*/
class KryoSerializer(conf: SparkConf) extends Serializer with Logging with Serializable {
override def newInstance(): SerializerInstance = {
this.synchronized {
new KryoSerializerInstance(this, useUnsafe, usePool)
}
}
override def supportsRelocationOfSerializedObjects: Boolean = {
// Kryo serialization may not include full class information
false
}
/** Buffer size for Kryo */
def bufferSize: Int = conf.get(KRYO_SERIALIZER_BUFFER_SIZE)
/** Maximum buffer size for Kryo */
def maxBufferSize: Int = conf.get(KRYO_SERIALIZER_MAX_BUFFER_SIZE)
/** Whether to use unsafe I/O */
def useUnsafe: Boolean = conf.get(KRYO_USE_UNSAFE)
/** Whether to use object pools */
def usePool: Boolean = conf.get(KRYO_USE_POOL)
/** Custom Kryo registrator class */
def registratorClass: Option[Class[_]] = {
conf.getOption("spark.kryo.registrator").map { className =>
Class.forName(className, true, Thread.currentThread.getContextClassLoader)
}
}
/** Classes to register with Kryo */
def registrationRequired: Boolean = conf.get(KRYO_REGISTRATION_REQUIRED)
/** Create and configure Kryo instance */
def newKryo(): Kryo = {
val kryo = new Kryo()
// Configure Kryo settings
kryo.setRegistrationRequired(registrationRequired)
kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()))
// Register common Spark classes
kryo.register(classOf[Array[Any]])
kryo.register(classOf[Array[Object]])
kryo.register(classOf[Array[String]])
kryo.register(classOf[Array[Byte]])
kryo.register(classOf[Array[Int]])
kryo.register(classOf[Array[Long]])
kryo.register(classOf[Array[Double]])
// Apply custom registrator if specified
registratorClass.foreach { clazz =>
val registrator = clazz.newInstance().asInstanceOf[KryoRegistrator]
registrator.registerClasses(kryo)
}
kryo
}
}
/**
* Interface for custom Kryo registration
*/
trait KryoRegistrator {
def registerClasses(kryo: Kryo): Unit
}
/**
* Kryo serializer instance
*/
private[spark] class KryoSerializerInstance(
ks: KryoSerializer,
useUnsafe: Boolean,
usePool: Boolean
) extends SerializerInstance {
override def serialize[T: ClassTag](t: T): ByteBuffer = {
output.reset()
kryo.writeClassAndObject(output, t)
ByteBuffer.wrap(output.toBytes)
}
override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {
input.setBuffer(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining())
kryo.readClassAndObject(input).asInstanceOf[T]
}
override def serializeStream(s: OutputStream): SerializationStream = {
new KryoSerializationStream(ks, s, useUnsafe)
}
override def deserializeStream(s: InputStream): DeserializationStream = {
new KryoDeserializationStream(ks, s, useUnsafe)
}
private[this] var kryo: Kryo = ks.newKryo()
private[this] var output: Output = ks.newKryoOutput()
private[this] var input: Input = ks.newKryoInput()
}Configuration options for different serialization strategies.
/**
* Spark configuration keys for serialization
*/
object SerializationConfig {
// Serializer class
val SERIALIZER = ConfigBuilder("spark.serializer")
.version("0.5.0")
.stringConf
.createWithDefault("org.apache.spark.serializer.JavaSerializer")
// Kryo configurations
val KRYO_SERIALIZER_BUFFER_SIZE = ConfigBuilder("spark.kryoserializer.buffer")
.version("0.5.0")
.bytesConf(ByteUnit.KiB)
.createWithDefaultString("64k")
val KRYO_SERIALIZER_MAX_BUFFER_SIZE = ConfigBuilder("spark.kryoserializer.buffer.max")
.version("0.5.0")
.bytesConf(ByteUnit.MiB)
.createWithDefaultString("64m")
val KRYO_USE_UNSAFE = ConfigBuilder("spark.kryo.unsafe")
.version("2.1.0")
.booleanConf
.createWithDefault(false)
val KRYO_USE_POOL = ConfigBuilder("spark.kryo.pool")
.version("3.0.0")
.booleanConf
.createWithDefault(true)
val KRYO_REGISTRATION_REQUIRED = ConfigBuilder("spark.kryo.registrationRequired")
.version("1.1.0")
.booleanConf
.createWithDefault(false)
}Creating custom serializers for specific data types or performance requirements.
/**
* Example: Custom serializer for specific domain objects
*/
class CustomDomainSerializer(conf: SparkConf) extends Serializer {
override def newInstance(): SerializerInstance = new CustomDomainSerializerInstance()
}
class CustomDomainSerializerInstance extends SerializerInstance {
override def serialize[T: ClassTag](t: T): ByteBuffer = t match {
case customObj: CustomDomainObject =>
// Custom serialization logic
val buffer = ByteBuffer.allocate(1024)
buffer.putInt(customObj.id)
buffer.put(customObj.name.getBytes("UTF-8"))
buffer.flip()
buffer
case _ =>
// Fall back to Java serialization
val bos = new ByteArrayOutputStream()
val oos = new ObjectOutputStream(bos)
oos.writeObject(t)
oos.close()
ByteBuffer.wrap(bos.toByteArray)
}
override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {
val classTag = implicitly[ClassTag[T]]
if (classTag.runtimeClass == classOf[CustomDomainObject]) {
// Custom deserialization logic
val id = bytes.getInt()
val nameBytes = new Array[Byte](bytes.remaining())
bytes.get(nameBytes)
val name = new String(nameBytes, "UTF-8")
CustomDomainObject(id, name).asInstanceOf[T]
} else {
// Fall back to Java deserialization
val bis = new ByteArrayInputStream(bytes.array(), bytes.position(), bytes.remaining())
val ois = new ObjectInputStream(bis)
ois.readObject().asInstanceOf[T]
}
}
override def serializeStream(s: OutputStream): SerializationStream = ???
override def deserializeStream(s: InputStream): DeserializationStream = ???
}
case class CustomDomainObject(id: Int, name: String)/**
* Custom Kryo registrator for application-specific classes
*/
class MyKryoRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo): Unit = {
// Register custom classes
kryo.register(classOf[CustomDomainObject])
kryo.register(classOf[Array[CustomDomainObject]])
// Register with custom serializer
kryo.register(classOf[ComplexObject], new ComplexObjectSerializer())
// Register collection types
kryo.register(classOf[scala.collection.mutable.WrappedArray.ofRef[_]])
kryo.register(classOf[scala.collection.immutable.Map[_, _]])
// Register common third-party classes
kryo.register(classOf[org.joda.time.DateTime])
kryo.register(classOf[java.util.UUID])
}
}
/**
* Custom Kryo serializer for complex objects
*/
class ComplexObjectSerializer extends com.esotericsoftware.kryo.Serializer[ComplexObject] {
override def write(kryo: Kryo, output: Output, obj: ComplexObject): Unit = {
output.writeInt(obj.id)
output.writeString(obj.data)
kryo.writeObject(output, obj.metadata)
}
override def read(kryo: Kryo, input: Input, clazz: Class[ComplexObject]): ComplexObject = {
val id = input.readInt()
val data = input.readString()
val metadata = kryo.readObject(input, classOf[Map[String, String]])
ComplexObject(id, data, metadata)
}
}
case class ComplexObject(id: Int, data: String, metadata: Map[String, String])Usage Examples:
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.serializer.{KryoSerializer, JavaSerializer}
// Java Serializer (default)
val javaConf = new SparkConf()
.setAppName("Java Serialization Example")
.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
val sc1 = new SparkContext(javaConf)
// Kryo Serializer (recommended for performance)
val kryoConf = new SparkConf()
.setAppName("Kryo Serialization Example")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "com.example.MyKryoRegistrator")
.set("spark.kryoserializer.buffer", "128k")
.set("spark.kryoserializer.buffer.max", "128m")
.set("spark.kryo.registrationRequired", "true")
val sc2 = new SparkContext(kryoConf)
// Example with custom objects
case class Person(id: Int, name: String, age: Int)
val people = sc2.parallelize(Array(
Person(1, "Alice", 25),
Person(2, "Bob", 30),
Person(3, "Charlie", 35)
))
// Serialization happens automatically during shuffles
val grouped = people.groupBy(_.age / 10) // Triggers serialization
val result = grouped.collect()
// Broadcasting also uses serialization
val lookup = Map(1 -> "Manager", 2 -> "Developer", 3 -> "Analyst")
val broadcastLookup = sc2.broadcast(lookup)
val enriched = people.map { person =>
val roles = broadcastLookup.value
(person, roles.getOrElse(person.id, "Unknown"))
}
sc1.stop()
sc2.stop()Java Examples:
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.serializer.KryoRegistrator;
import com.esotericsoftware.kryo.Kryo;
// Kryo configuration in Java
SparkConf conf = new SparkConf()
.setAppName("Java Kryo Example")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", MyJavaKryoRegistrator.class.getName());
JavaSparkContext sc = new JavaSparkContext(conf);
// Custom Kryo registrator in Java
public class MyJavaKryoRegistrator implements KryoRegistrator {
@Override
public void registerClasses(Kryo kryo) {
kryo.register(Person.class);
kryo.register(Person[].class);
}
}
public class Person implements Serializable {
private int id;
private String name;
// Constructors, getters, setters...
}Java Serialization:
Kryo Serialization:
// Optimal Kryo configuration
val conf = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "com.mycompany.MyKryoRegistrator")
.set("spark.kryoserializer.buffer", "256k") // Increase if needed
.set("spark.kryoserializer.buffer.max", "256m") // Increase for large objects
.set("spark.kryo.unsafe", "true") // Use unsafe I/O for performance
.set("spark.kryo.registrationRequired", "false") // Set true only after thorough testing// Avoid: Non-serializable objects in closures
val nonSerializable = new Database() // Not serializable
rdd.map(x => nonSerializable.lookup(x)) // Will fail!
// Solution: Use broadcast variables or create inside map
val config = Map("url" -> "jdbc:...")
val broadcastConfig = sc.broadcast(config)
rdd.map { x =>
val db = new Database(broadcastConfig.value("url"))
db.lookup(x)
}
// Avoid: Large objects in closures
val largeLookup = loadLargeMap() // Will be serialized with every task
rdd.map(x => largeLookup.get(x))
// Solution: Use broadcast variables
val broadcastLookup = sc.broadcast(largeLookup)
rdd.map(x => broadcastLookup.value.get(x))Proper serialization configuration is crucial for Spark performance, especially in shuffle-heavy workloads and when using broadcast variables or accumulators.