or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

accumulators.mdapplication-context.mdbroadcast-variables.mdindex.mdjava-api.mdpartitioning.mdrdd-operations.mdserialization.mdstorage-persistence.md
tile.json

serialization.mddocs/

Serialization

Serialization frameworks for efficient data transfer and storage in Spark, supporting Java serialization and Kryo for optimized performance across cluster nodes.

Capabilities

Serializer Base Class

Abstract base class for all serialization implementations in Spark.

/**
 * Base class for serializers used by Spark
 */
abstract class Serializer {
  /** Default buffer size for serialization */
  def defaultBufferSize: Int = 65536
  
  /** Create new serializer instance for thread-local use */
  def newInstance(): SerializerInstance
  
  /** Whether this serializer supports relocation of objects */
  def supportsRelocationOfSerializedObjects: Boolean = true
}

/**
 * Thread-local serializer instance
 */
abstract class SerializerInstance {
  /** Serialize object to ByteBuffer */
  def serialize[T: ClassTag](t: T): ByteBuffer
  def serialize[T: ClassTag](t: T, buffer: ByteBuffer): ByteBuffer
  
  /** Deserialize object from ByteBuffer */
  def deserialize[T: ClassTag](bytes: ByteBuffer): T
  def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T
  
  /** Create serialization stream for multiple objects */
  def serializeStream(s: OutputStream): SerializationStream
  
  /** Create deserialization stream for multiple objects */
  def deserializeStream(s: InputStream): DeserializationStream
}

Serialization Streams

Streaming interfaces for serializing multiple objects efficiently.

/**
 * Stream for writing serialized objects
 */
abstract class SerializationStream extends Closeable {
  /** Write object to stream */
  def writeObject[T: ClassTag](t: T): SerializationStream
  
  /** Write key-value pair */
  def writeKey[T: ClassTag](key: T): SerializationStream
  def writeValue[T: ClassTag](value: T): SerializationStream
  
  /** Write all objects from iterator */
  def writeAll[T: ClassTag](iter: Iterator[T]): SerializationStream
  
  /** Flush stream */
  def flush(): Unit
  
  /** Close stream */
  def close(): Unit
}

/**
 * Stream for reading serialized objects
 */
abstract class DeserializationStream extends Closeable {
  /** Read object from stream */
  def readObject[T: ClassTag](): T
  
  /** Read key from stream */
  def readKey[T: ClassTag](): T
  
  /** Read value from stream */
  def readValue[T: ClassTag](): T
  
  /** Close stream */
  def close(): Unit
  
  /** Iterator over all objects in stream */
  def asIterator: Iterator[Any]
  def asKeyValueIterator: Iterator[(Any, Any)]
}

Java Serializer

Default serializer using Java's built-in serialization mechanism.

/**
 * Serializer using Java's built-in serialization
 * @param conf Spark configuration
 */
class JavaSerializer(conf: SparkConf) extends Serializer with Externalizable {
  override def newInstance(): SerializerInstance = {
    val classLoader = defaultClassLoader.getOrElse(Thread.currentThread.getContextClassLoader)
    new JavaSerializerInstance(counterReset, classLoader, this)
  }
  
  override def supportsRelocationOfSerializedObjects: Boolean = {
    // Java serialization stores full class information, so objects can be relocated
    true
  }
  
  private[spark] def writeExternal(out: ObjectOutput): Unit = { }
  private[spark] def readExternal(in: ObjectInput): Unit = { }
}

/**
 * Java serializer instance
 */
private[spark] class JavaSerializerInstance(
    counterReset: Int,
    defaultClassLoader: ClassLoader,
    javaSerializer: JavaSerializer
) extends SerializerInstance {
  
  override def serialize[T: ClassTag](t: T): ByteBuffer = {
    val bos = new ByteArrayOutputStream()
    val out = serializeStream(bos)
    out.writeObject(t)
    out.close()
    ByteBuffer.wrap(bos.toByteArray)
  }
  
  override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {
    val bis = new ByteArrayInputStream(bytes.array(), bytes.position(), bytes.remaining())
    val in = deserializeStream(bis)
    in.readObject[T]()
  }
  
  override def serializeStream(s: OutputStream): SerializationStream = {
    new JavaSerializationStream(s, counterReset)
  }
  
  override def deserializeStream(s: InputStream): DeserializationStream = {
    new JavaDeserializationStream(s, defaultClassLoader)
  }
}

Kryo Serializer

High-performance serializer using the Kryo serialization library.

/**
 * Serializer using Kryo serialization library
 * @param conf Spark configuration
 */
class KryoSerializer(conf: SparkConf) extends Serializer with Logging with Serializable {
  
  override def newInstance(): SerializerInstance = {
    this.synchronized {
      new KryoSerializerInstance(this, useUnsafe, usePool)
    }
  }
  
  override def supportsRelocationOfSerializedObjects: Boolean = {
    // Kryo serialization may not include full class information
    false
  }
  
  /** Buffer size for Kryo */
  def bufferSize: Int = conf.get(KRYO_SERIALIZER_BUFFER_SIZE)
  
  /** Maximum buffer size for Kryo */
  def maxBufferSize: Int = conf.get(KRYO_SERIALIZER_MAX_BUFFER_SIZE)
  
  /** Whether to use unsafe I/O */
  def useUnsafe: Boolean = conf.get(KRYO_USE_UNSAFE)
  
  /** Whether to use object pools */
  def usePool: Boolean = conf.get(KRYO_USE_POOL)
  
  /** Custom Kryo registrator class */
  def registratorClass: Option[Class[_]] = {
    conf.getOption("spark.kryo.registrator").map { className =>
      Class.forName(className, true, Thread.currentThread.getContextClassLoader)
    }
  }
  
  /** Classes to register with Kryo */
  def registrationRequired: Boolean = conf.get(KRYO_REGISTRATION_REQUIRED)
  
  /** Create and configure Kryo instance */
  def newKryo(): Kryo = {
    val kryo = new Kryo()
    
    // Configure Kryo settings
    kryo.setRegistrationRequired(registrationRequired)
    kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()))
    
    // Register common Spark classes
    kryo.register(classOf[Array[Any]])
    kryo.register(classOf[Array[Object]])
    kryo.register(classOf[Array[String]])
    kryo.register(classOf[Array[Byte]])
    kryo.register(classOf[Array[Int]])
    kryo.register(classOf[Array[Long]])
    kryo.register(classOf[Array[Double]])
    
    // Apply custom registrator if specified
    registratorClass.foreach { clazz =>
      val registrator = clazz.newInstance().asInstanceOf[KryoRegistrator]
      registrator.registerClasses(kryo)
    }
    
    kryo
  }
}

/**
 * Interface for custom Kryo registration
 */
trait KryoRegistrator {
  def registerClasses(kryo: Kryo): Unit
}

/**
 * Kryo serializer instance
 */
private[spark] class KryoSerializerInstance(
    ks: KryoSerializer,
    useUnsafe: Boolean,
    usePool: Boolean
) extends SerializerInstance {
  
  override def serialize[T: ClassTag](t: T): ByteBuffer = {
    output.reset()
    kryo.writeClassAndObject(output, t)
    ByteBuffer.wrap(output.toBytes)
  }
  
  override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {
    input.setBuffer(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining())
    kryo.readClassAndObject(input).asInstanceOf[T]
  }
  
  override def serializeStream(s: OutputStream): SerializationStream = {
    new KryoSerializationStream(ks, s, useUnsafe)
  }
  
  override def deserializeStream(s: InputStream): DeserializationStream = {
    new KryoDeserializationStream(ks, s, useUnsafe)  
  }
  
  private[this] var kryo: Kryo = ks.newKryo()
  private[this] var output: Output = ks.newKryoOutput()
  private[this] var input: Input = ks.newKryoInput()
}

Serializer Configuration

Configuration options for different serialization strategies.

/**
 * Spark configuration keys for serialization
 */
object SerializationConfig {
  // Serializer class
  val SERIALIZER = ConfigBuilder("spark.serializer")
    .version("0.5.0")
    .stringConf
    .createWithDefault("org.apache.spark.serializer.JavaSerializer")
  
  // Kryo configurations  
  val KRYO_SERIALIZER_BUFFER_SIZE = ConfigBuilder("spark.kryoserializer.buffer")
    .version("0.5.0")
    .bytesConf(ByteUnit.KiB)
    .createWithDefaultString("64k")
    
  val KRYO_SERIALIZER_MAX_BUFFER_SIZE = ConfigBuilder("spark.kryoserializer.buffer.max")
    .version("0.5.0") 
    .bytesConf(ByteUnit.MiB)
    .createWithDefaultString("64m")
    
  val KRYO_USE_UNSAFE = ConfigBuilder("spark.kryo.unsafe")
    .version("2.1.0")
    .booleanConf
    .createWithDefault(false)
    
  val KRYO_USE_POOL = ConfigBuilder("spark.kryo.pool")
    .version("3.0.0")
    .booleanConf
    .createWithDefault(true)
    
  val KRYO_REGISTRATION_REQUIRED = ConfigBuilder("spark.kryo.registrationRequired")
    .version("1.1.0")
    .booleanConf
    .createWithDefault(false)
}

Custom Serializers

Creating custom serializers for specific data types or performance requirements.

/**
 * Example: Custom serializer for specific domain objects
 */
class CustomDomainSerializer(conf: SparkConf) extends Serializer {
  override def newInstance(): SerializerInstance = new CustomDomainSerializerInstance()
}

class CustomDomainSerializerInstance extends SerializerInstance {
  override def serialize[T: ClassTag](t: T): ByteBuffer = t match {
    case customObj: CustomDomainObject =>
      // Custom serialization logic
      val buffer = ByteBuffer.allocate(1024)
      buffer.putInt(customObj.id)
      buffer.put(customObj.name.getBytes("UTF-8"))
      buffer.flip()
      buffer
    case _ =>
      // Fall back to Java serialization
      val bos = new ByteArrayOutputStream()
      val oos = new ObjectOutputStream(bos)
      oos.writeObject(t)
      oos.close()
      ByteBuffer.wrap(bos.toByteArray)
  }
  
  override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {
    val classTag = implicitly[ClassTag[T]]
    if (classTag.runtimeClass == classOf[CustomDomainObject]) {
      // Custom deserialization logic
      val id = bytes.getInt()
      val nameBytes = new Array[Byte](bytes.remaining())
      bytes.get(nameBytes)
      val name = new String(nameBytes, "UTF-8")
      CustomDomainObject(id, name).asInstanceOf[T]
    } else {
      // Fall back to Java deserialization
      val bis = new ByteArrayInputStream(bytes.array(), bytes.position(), bytes.remaining())
      val ois = new ObjectInputStream(bis)
      ois.readObject().asInstanceOf[T]
    }
  }
  
  override def serializeStream(s: OutputStream): SerializationStream = ???
  override def deserializeStream(s: InputStream): DeserializationStream = ???
}

case class CustomDomainObject(id: Int, name: String)

Kryo Registration Examples

/**
 * Custom Kryo registrator for application-specific classes
 */
class MyKryoRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo): Unit = {
    // Register custom classes
    kryo.register(classOf[CustomDomainObject])
    kryo.register(classOf[Array[CustomDomainObject]])
    
    // Register with custom serializer
    kryo.register(classOf[ComplexObject], new ComplexObjectSerializer())
    
    // Register collection types
    kryo.register(classOf[scala.collection.mutable.WrappedArray.ofRef[_]])
    kryo.register(classOf[scala.collection.immutable.Map[_, _]])
    
    // Register common third-party classes
    kryo.register(classOf[org.joda.time.DateTime])
    kryo.register(classOf[java.util.UUID])
  }
}

/**
 * Custom Kryo serializer for complex objects
 */
class ComplexObjectSerializer extends com.esotericsoftware.kryo.Serializer[ComplexObject] {
  override def write(kryo: Kryo, output: Output, obj: ComplexObject): Unit = {
    output.writeInt(obj.id)
    output.writeString(obj.data)
    kryo.writeObject(output, obj.metadata)
  }
  
  override def read(kryo: Kryo, input: Input, clazz: Class[ComplexObject]): ComplexObject = {
    val id = input.readInt()
    val data = input.readString()
    val metadata = kryo.readObject(input, classOf[Map[String, String]])
    ComplexObject(id, data, metadata)
  }
}

case class ComplexObject(id: Int, data: String, metadata: Map[String, String])

Usage Examples:

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.serializer.{KryoSerializer, JavaSerializer}

// Java Serializer (default)
val javaConf = new SparkConf()
  .setAppName("Java Serialization Example")
  .set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")

val sc1 = new SparkContext(javaConf)

// Kryo Serializer (recommended for performance)
val kryoConf = new SparkConf()
  .setAppName("Kryo Serialization Example")
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .set("spark.kryo.registrator", "com.example.MyKryoRegistrator")
  .set("spark.kryoserializer.buffer", "128k")
  .set("spark.kryoserializer.buffer.max", "128m")
  .set("spark.kryo.registrationRequired", "true")

val sc2 = new SparkContext(kryoConf)

// Example with custom objects
case class Person(id: Int, name: String, age: Int)

val people = sc2.parallelize(Array(
  Person(1, "Alice", 25),
  Person(2, "Bob", 30),
  Person(3, "Charlie", 35)
))

// Serialization happens automatically during shuffles
val grouped = people.groupBy(_.age / 10) // Triggers serialization
val result = grouped.collect()

// Broadcasting also uses serialization
val lookup = Map(1 -> "Manager", 2 -> "Developer", 3 -> "Analyst")
val broadcastLookup = sc2.broadcast(lookup)

val enriched = people.map { person =>
  val roles = broadcastLookup.value
  (person, roles.getOrElse(person.id, "Unknown"))
}

sc1.stop()
sc2.stop()

Java Examples:

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.serializer.KryoRegistrator;
import com.esotericsoftware.kryo.Kryo;

// Kryo configuration in Java
SparkConf conf = new SparkConf()
    .setAppName("Java Kryo Example")
    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .set("spark.kryo.registrator", MyJavaKryoRegistrator.class.getName());

JavaSparkContext sc = new JavaSparkContext(conf);

// Custom Kryo registrator in Java
public class MyJavaKryoRegistrator implements KryoRegistrator {
    @Override
    public void registerClasses(Kryo kryo) {
        kryo.register(Person.class);
        kryo.register(Person[].class);
    }
}

public class Person implements Serializable {
    private int id;
    private String name;
    
    // Constructors, getters, setters...
}

Performance Considerations

Java vs Kryo Serialization

Java Serialization:

  • Pros: Built-in, handles complex object graphs, version compatibility
  • Cons: Slower, larger serialized size, reflection overhead

Kryo Serialization:

  • Pros: 2-10x faster, more compact, less CPU overhead
  • Cons: Requires class registration, less compatible with schema evolution

Configuration Best Practices

// Optimal Kryo configuration
val conf = new SparkConf()
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .set("spark.kryo.registrator", "com.mycompany.MyKryoRegistrator")
  .set("spark.kryoserializer.buffer", "256k")        // Increase if needed
  .set("spark.kryoserializer.buffer.max", "256m")    // Increase for large objects
  .set("spark.kryo.unsafe", "true")                  // Use unsafe I/O for performance
  .set("spark.kryo.registrationRequired", "false")   // Set true only after thorough testing

Common Serialization Issues

// Avoid: Non-serializable objects in closures
val nonSerializable = new Database() // Not serializable
rdd.map(x => nonSerializable.lookup(x)) // Will fail!

// Solution: Use broadcast variables or create inside map
val config = Map("url" -> "jdbc:...")
val broadcastConfig = sc.broadcast(config)
rdd.map { x =>
  val db = new Database(broadcastConfig.value("url"))
  db.lookup(x)
}

// Avoid: Large objects in closures
val largeLookup = loadLargeMap() // Will be serialized with every task
rdd.map(x => largeLookup.get(x))

// Solution: Use broadcast variables
val broadcastLookup = sc.broadcast(largeLookup)
rdd.map(x => broadcastLookup.value.get(x))

Proper serialization configuration is crucial for Spark performance, especially in shuffle-heavy workloads and when using broadcast variables or accumulators.