or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

context-config.mdindex.mdjava-api.mdrdd-operations.mdresource-management.mdserialization.mdshared-variables.mdstorage-caching.mdtask-context.md
tile.json

serialization.mddocs/

Serialization

Apache Spark provides a pluggable serialization framework supporting Java serialization and Kryo for optimized network communication and storage. The serialization system handles object serialization for RDD storage, shuffle operations, and broadcast variables.

Core Serialization Interfaces

Serializer

abstract class Serializer {
  def newInstance(): SerializerInstance
  def supportsRelocationOfSerializedObjects: Boolean
}

SerializerInstance

abstract class SerializerInstance {
  def serialize[T: ClassTag](t: T): ByteBuffer
  def deserialize[T: ClassTag](bytes: ByteBuffer): T
  def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T
  def serializeStream(s: OutputStream): SerializationStream
  def deserializeStream(s: InputStream): DeserializationStream
}

Serialization Streams

abstract class SerializationStream {
  def writeObject[T: ClassTag](t: T): SerializationStream
  def flush(): Unit
  def close(): Unit
  def writeKey[T: ClassTag](key: T): SerializationStream
  def writeValue[T: ClassTag](value: T): SerializationStream
  def writeAll[T: ClassTag](iter: Iterator[T]): SerializationStream
}

abstract class DeserializationStream {
  def readObject[T: ClassTag](): T
  def readKey[T: ClassTag](): T
  def readValue[T: ClassTag](): T
  def close(): Unit
  def asIterator: Iterator[Any]
  def asKeyValueIterator: Iterator[(Any, Any)]
}

Built-in Serializer Implementations

JavaSerializer

class JavaSerializer(conf: SparkConf) extends Serializer {
  def newInstance(): SerializerInstance
  def supportsRelocationOfSerializedObjects: Boolean = true
  
  private[spark] override def toString: String = "JavaSerializer"
}

class JavaSerializerInstance(counterReset: Int, extraDebugInfo: Boolean) extends SerializerInstance {
  def serialize[T: ClassTag](t: T): ByteBuffer
  def deserialize[T: ClassTag](bytes: ByteBuffer): T
  def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T
  def serializeStream(s: OutputStream): SerializationStream
  def deserializeStream(s: InputStream): DeserializationStream
}

KryoSerializer

class KryoSerializer(conf: SparkConf) extends Serializer with Logging {
  def newInstance(): SerializerInstance
  def supportsRelocationOfSerializedObjects: Boolean = false
  
  private[spark] override def toString: String = "KryoSerializer"
}

class KryoSerializerInstance(ks: KryoSerializer, useUnsafe: Boolean, usePool: Boolean) extends SerializerInstance {
  def serialize[T: ClassTag](t: T): ByteBuffer
  def deserialize[T: ClassTag](bytes: ByteBuffer): T
  def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T
  def serializeStream(s: OutputStream): SerializationStream
  def deserializeStream(s: InputStream): DeserializationStream
}

Kryo Registration

KryoRegistrator

trait KryoRegistrator {
  def registerClasses(kryo: Kryo): Unit
}

Built-in Registrators

class SparkKryoRegistrator extends KryoRegistrator {
  def registerClasses(kryo: Kryo): Unit
}

Serialization Configuration

Key configuration properties for controlling serialization behavior:

General Serialization

  • spark.serializer - Serializer class to use (Java or Kryo)
  • spark.closure.serializer - Serializer for closures (Java only)
  • spark.serializer.objectStreamReset - Reset frequency for Java serialization

Kryo Configuration

  • spark.kryo.classesToRegister - Comma-separated list of classes to register
  • spark.kryo.registrator - Custom KryoRegistrator class
  • spark.kryo.registrationRequired - Require class registration
  • spark.kryo.referenceTracking - Enable reference tracking
  • spark.kryo.unsafe - Use unsafe-based Kryo serialization

Buffer Configuration

  • spark.kryoserializer.buffer - Initial buffer size for Kryo
  • spark.kryoserializer.buffer.max - Maximum buffer size for Kryo
  • spark.serializer.buffer.size - Buffer size for other serializers

Usage Examples

Basic Kryo Configuration

import org.apache.spark.{SparkConf, SparkContext}

val conf = new SparkConf()
  .setAppName("Kryo Example")
  .setMaster("local[*]")
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .set("spark.kryo.registrationRequired", "false")
  .set("spark.kryoserializer.buffer", "64k")
  .set("spark.kryoserializer.buffer.max", "64m")

val sc = new SparkContext(conf)

Custom Kryo Registrator

import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator

// Define custom classes
case class Person(name: String, age: Int, addresses: List[String])
case class Company(name: String, employees: List[Person])

// Custom registrator
class MyKryoRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo): Unit = {
    kryo.register(classOf[Person])
    kryo.register(classOf[Company])
    kryo.register(classOf[scala.collection.immutable.List[_]])
    kryo.register(classOf[scala.collection.immutable.$colon$colon[_]])
    kryo.register(classOf[scala.collection.immutable.Nil$])
  }
}

// Configure Spark to use custom registrator
val conf = new SparkConf()
  .setAppName("Custom Kryo")
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .set("spark.kryo.registrator", "com.example.MyKryoRegistrator")
  .set("spark.kryo.registrationRequired", "true") // Require registration

val sc = new SparkContext(conf)

// Use custom objects
val people = sc.parallelize(Seq(
  Person("Alice", 25, List("123 Main St", "456 Oak Ave")),
  Person("Bob", 30, List("789 Pine St"))
))

val companies = sc.parallelize(Seq(
  Company("TechCorp", List(
    Person("Alice", 25, List("123 Main St")),
    Person("Bob", 30, List("789 Pine St"))
  ))
))

val result = companies.flatMap(_.employees).collect()

Class Registration via Configuration

// Register classes via configuration instead of custom registrator
val conf = new SparkConf()
  .setAppName("Class Registration")
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .set("spark.kryo.classesToRegister", 
    "com.example.Person," +
    "com.example.Company," +
    "scala.collection.immutable.List," +
    "scala.collection.immutable.$colon$colon," +
    "scala.collection.immutable.Nil$"
  )

val sc = new SparkContext(conf)

Performance Comparison

import scala.util.Random

case class LargeObject(
  id: Long,
  name: String,
  data: Array[Double],
  metadata: Map[String, String]
)

// Generate test data
val random = new Random(42)
val testData = (1 to 10000).map { i =>
  LargeObject(
    id = i,
    name = s"object_$i",
    data = Array.fill(100)(random.nextDouble()),
    metadata = Map(
      "type" -> "test",
      "version" -> "1.0",
      "created" -> System.currentTimeMillis().toString
    )
  )
}

// Test with Java serialization
def testJavaSerialization(): Long = {
  val javaConf = new SparkConf()
    .setAppName("Java Serialization Test")
    .setMaster("local[*]")
    .set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
  
  val javaSc = new SparkContext(javaConf)
  
  val start = System.currentTimeMillis()
  val rdd = javaSc.parallelize(testData)
  rdd.cache()
  val count = rdd.count()
  val end = System.currentTimeMillis()
  
  javaSc.stop()
  end - start
}

// Test with Kryo serialization
def testKryoSerialization(): Long = {
  val kryoConf = new SparkConf()
    .setAppName("Kryo Serialization Test")
    .setMaster("local[*]")
    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .set("spark.kryo.registrationRequired", "false")
  
  val kryoSc = new SparkContext(kryoConf)
  
  val start = System.currentTimeMillis()
  val rdd = kryoSc.parallelize(testData)
  rdd.cache()
  val count = rdd.count()
  val end = System.currentTimeMillis()
  
  kryoSc.stop()
  end - start
}

// Compare performance
val javaTime = testJavaSerialization()
val kryoTime = testKryoSerialization()

println(s"Java serialization time: ${javaTime}ms")
println(s"Kryo serialization time: ${kryoTime}ms")
println(s"Kryo speedup: ${javaTime.toDouble / kryoTime}x")

Custom Serialization for Complex Types

import com.esotericsoftware.kryo.{Kryo, Serializer}
import com.esotericsoftware.kryo.io.{Input, Output}
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter

// Custom class with complex serialization needs
case class TimestampedEvent(
  timestamp: LocalDateTime,
  eventType: String,
  data: Map[String, Any],
  metadata: Option[String]
)

// Custom Kryo serializer for LocalDateTime
class LocalDateTimeSerializer extends Serializer[LocalDateTime] {
  private val formatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME
  
  override def write(kryo: Kryo, output: Output, dateTime: LocalDateTime): Unit = {
    output.writeString(dateTime.format(formatter))
  }
  
  override def read(kryo: Kryo, input: Input, `type`: Class[LocalDateTime]): LocalDateTime = {
    LocalDateTime.parse(input.readString(), formatter)
  }
}

// Registrator with custom serializers
class AdvancedKryoRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo): Unit = {
    kryo.register(classOf[TimestampedEvent])
    kryo.register(classOf[LocalDateTime], new LocalDateTimeSerializer)
    kryo.register(classOf[scala.collection.immutable.Map[_, _]])
    kryo.register(classOf[scala.Some[_]])
    kryo.register(classOf[scala.None$])
    
    // Configure Kryo settings
    kryo.setReferences(true) // Enable object reference tracking
    kryo.setRegistrationRequired(false) // Allow unregistered classes
  }
}

val conf = new SparkConf()
  .setAppName("Advanced Kryo")
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .set("spark.kryo.registrator", "com.example.AdvancedKryoRegistrator")

val sc = new SparkContext(conf)

// Use complex objects
val events = sc.parallelize(Seq(
  TimestampedEvent(
    LocalDateTime.now(),
    "user_login",
    Map("userId" -> 123, "sessionId" -> "abc123"),
    Some("Mobile app login")
  ),
  TimestampedEvent(
    LocalDateTime.now().minusHours(1),
    "page_view",
    Map("page" -> "/home", "duration" -> 30.5),
    None
  )
))

val result = events.filter(_.eventType == "user_login").collect()

Broadcast Variable Serialization

// Large lookup table that benefits from efficient serialization
val largeLookupTable = (1 to 100000).map { i =>
  i -> s"value_$i"
}.toMap

// Configure for optimal broadcast performance
val conf = new SparkConf()
  .setAppName("Broadcast Serialization")
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .set("spark.broadcast.compress", "true")
  .set("spark.io.compression.codec", "lz4")

val sc = new SparkContext(conf)

// Broadcast with efficient serialization
val broadcastLookup = sc.broadcast(largeLookupTable)

val data = sc.parallelize(1 to 10000)
val enrichedData = data.map { id =>
  val lookup = broadcastLookup.value
  (id, lookup.getOrElse(id, "unknown"))
}

val result = enrichedData.collect()
broadcastLookup.unpersist()

Best Practices

Serializer Selection

  1. Use Kryo for production: Generally 2-10x faster than Java serialization
  2. Register frequently used classes: Improves performance and reduces storage
  3. Benchmark your workload: Test both serializers with your specific data types
  4. Consider compression: Enable compression for large objects
  5. Monitor serialization overhead: Use Spark UI to identify bottlenecks

Kryo Configuration

  1. Increase buffer sizes: Set appropriate buffer sizes for large objects
  2. Enable registration requirement: Use registrationRequired=true to catch unregistered classes
  3. Use custom serializers: Implement efficient serialization for complex types
  4. Consider reference tracking: Enable for object graphs with shared references
  5. Test thoroughly: Verify serialization/deserialization of all data types

Performance Optimization

  1. Avoid nested collections: Flatten complex nested structures when possible
  2. Use primitive collections: Prefer specialized collections for primitives
  3. Minimize object creation: Reuse objects in serialization-heavy operations
  4. Profile serialization costs: Identify expensive serialization operations
  5. Consider data formats: Use efficient formats like Avro or Parquet for storage

Common Pitfalls

  1. Serialization failures: Always test serialization of custom classes
  2. Class loading issues: Ensure all required classes are available on workers
  3. Version compatibility: Be careful with serialized data across Spark versions
  4. Lambda serialization: Avoid capturing large objects in closures
  5. Singleton objects: Be careful with serializing singleton instances