CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-spark-common-utils

Core utility classes and functions for Apache Spark including exception handling, logging, storage configuration, and Java API integration

Pending
Overview
Eval results
Files

storage-configuration.mddocs/

Storage Configuration

Storage level definitions for controlling RDD and Dataset persistence, including memory, disk, serialization, and replication strategies to optimize Spark application performance.

Capabilities

StorageLevel Class

Configuration class that defines how RDD and Dataset data should be stored, combining memory, disk, serialization, and replication options.

/**
 * Configuration for RDD/Dataset storage persistence
 * Note: Constructor is private - use StorageLevel object factory methods or predefined constants
 */
class StorageLevel private(
  private var _useDisk: Boolean,
  private var _useMemory: Boolean,
  private var _useOffHeap: Boolean,
  private var _deserialized: Boolean,
  private var _replication: Int = 1
) extends Externalizable {
  
  /** Validates the storage level configuration */
  def isValid: Boolean
  
  /** Creates a copy of the storage level */
  override def clone(): StorageLevel
  
  /** Human-readable description of the storage strategy */
  def description: String
  
  /** String representation showing all configuration flags */
  override def toString: String
  
  // Properties for accessing configuration
  def useDisk: Boolean
  def useMemory: Boolean
  def useOffHeap: Boolean
  def deserialized: Boolean
  def replication: Int
  
  /** Returns the memory mode (ON_HEAP or OFF_HEAP) */
  private[spark] def memoryMode: MemoryMode
}

Usage Examples:

import org.apache.spark.storage.StorageLevel

// Create custom storage level
val customLevel = new StorageLevel(
  useDisk = true,
  useMemory = true, 
  useOffHeap = false,
  deserialized = false,
  replication = 2
)

// Check storage properties  
println(s"Uses disk: ${customLevel.useDisk}")
println(s"Uses memory: ${customLevel.useMemory}")
println(s"Replication factor: ${customLevel.replication}")
println(s"Description: ${customLevel.description}")

// Validate configuration
if (customLevel.isValid) {
  println("Storage level is valid")
}

// Use with RDD persistence
val rdd = spark.sparkContext.parallelize(1 to 1000)
rdd.persist(customLevel)

Predefined Storage Levels

Companion object providing common storage level configurations for typical use cases.

object StorageLevel {
  /** No storage - data will be recomputed each time */
  val NONE: StorageLevel
  
  /** Store data on disk only */
  val DISK_ONLY: StorageLevel
  
  /** Store data on disk only with 2x replication */
  val DISK_ONLY_2: StorageLevel
  
  /** Store data on disk only with 3x replication */  
  val DISK_ONLY_3: StorageLevel
  
  /** Store data in memory only, deserialized */
  val MEMORY_ONLY: StorageLevel
  
  /** Store data in memory only, deserialized, with 2x replication */
  val MEMORY_ONLY_2: StorageLevel
  
  /** Store data in memory only, serialized */
  val MEMORY_ONLY_SER: StorageLevel
  
  /** Store data in memory only, serialized, with 2x replication */
  val MEMORY_ONLY_SER_2: StorageLevel
  
  /** Store data in memory first, spill to disk if needed */
  val MEMORY_AND_DISK: StorageLevel
  
  /** Store data in memory first, spill to disk if needed, with 2x replication */
  val MEMORY_AND_DISK_2: StorageLevel
  
  /** Store data in memory first (serialized), spill to disk if needed */
  val MEMORY_AND_DISK_SER: StorageLevel
  
  /** Store data in memory first (serialized), spill to disk if needed, with 2x replication */
  val MEMORY_AND_DISK_SER_2: StorageLevel
  
  /** Store data in off-heap memory */
  val OFF_HEAP: StorageLevel
}

Usage Examples:

import org.apache.spark.storage.StorageLevel

// Using predefined storage levels
val rdd = spark.sparkContext.parallelize(1 to 10000)

// Memory-only storage for fast access
rdd.persist(StorageLevel.MEMORY_ONLY)

// Memory and disk with serialization for space efficiency  
rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)

// High availability with replication
rdd.persist(StorageLevel.MEMORY_AND_DISK_2)

// Disk-only for very large datasets
rdd.persist(StorageLevel.DISK_ONLY)

// No persistence (default behavior)
rdd.persist(StorageLevel.NONE)

// Check storage level properties
val level = StorageLevel.MEMORY_AND_DISK_SER_2
println(s"Uses memory: ${level.useMemory}")       // true
println(s"Uses disk: ${level.useDisk}")           // true  
println(s"Serialized: ${!level.deserialized}")   // true
println(s"Replication: ${level.replication}")     // 2

Factory Methods

Static factory methods for creating storage levels with different configurations.

object StorageLevel {
  /**
   * Creates storage level from string representation
   * @param s - String representation of storage level
   * @return Corresponding StorageLevel instance
   */
  def fromString(s: String): StorageLevel
  
  /**
   * Creates storage level with full configuration
   * @param useDisk - Whether to use disk storage
   * @param useMemory - Whether to use memory storage
   * @param useOffHeap - Whether to use off-heap memory
   * @param deserialized - Whether to store data deserialized
   * @param replication - Number of replicas
   * @return StorageLevel instance
   */
  def apply(
    useDisk: Boolean,
    useMemory: Boolean,
    useOffHeap: Boolean,
    deserialized: Boolean,
    replication: Int
  ): StorageLevel
  
  /**
   * Creates storage level with simplified configuration  
   * @param useDisk - Whether to use disk storage
   * @param useMemory - Whether to use memory storage
   * @param deserialized - Whether to store data deserialized
   * @param replication - Number of replicas (default 1)
   * @return StorageLevel instance
   */
  def apply(
    useDisk: Boolean,
    useMemory: Boolean,
    deserialized: Boolean,
    replication: Int = 1
  ): StorageLevel
  
  /**
   * Creates storage level from bit flags
   * @param flags - Bit flags representing storage options
   * @param replication - Number of replicas
   * @return StorageLevel instance
   */
  def apply(flags: Int, replication: Int): StorageLevel
  
  /**
   * Creates storage level from ObjectInput (for deserialization)
   * @param in - ObjectInput stream
   * @return StorageLevel instance
   */
  def apply(in: ObjectInput): StorageLevel
}

Usage Examples:

import org.apache.spark.storage.StorageLevel

// Create from string
val level1 = StorageLevel.fromString("MEMORY_AND_DISK_SER_2")

// Create with full parameters
val level2 = StorageLevel(
  useDisk = true,
  useMemory = true,
  useOffHeap = false,
  deserialized = false,
  replication = 2
)

// Create with simplified parameters
val level3 = StorageLevel(
  useDisk = false,
  useMemory = true,
  deserialized = true,
  replication = 3
)

// Create from bit flags (advanced usage)
val level4 = StorageLevel(0x0F, 1)

Storage Strategy Guidelines

Memory-Only Strategies

Best for small to medium datasets that fit in cluster memory:

// Fast access, but data lost if executors fail
StorageLevel.MEMORY_ONLY

// Fast access with fault tolerance  
StorageLevel.MEMORY_ONLY_2

// Space-efficient for large objects
StorageLevel.MEMORY_ONLY_SER

Memory and Disk Strategies

Best for medium to large datasets with balanced performance:

// Good balance of speed and reliability
StorageLevel.MEMORY_AND_DISK

// Space-efficient with reliability
StorageLevel.MEMORY_AND_DISK_SER

// High availability for critical data
StorageLevel.MEMORY_AND_DISK_2

Disk-Only Strategies

Best for very large datasets or when memory is constrained:

// Cheapest storage option
StorageLevel.DISK_ONLY

// Disk storage with fault tolerance
StorageLevel.DISK_ONLY_2

Off-Heap Storage

Best for large datasets when heap pressure is a concern:

// Reduces GC pressure
StorageLevel.OFF_HEAP

Performance Considerations

Serialization Trade-offs

// Faster access but more memory usage
StorageLevel.MEMORY_ONLY           // Deserialized

// Slower access but less memory usage  
StorageLevel.MEMORY_ONLY_SER       // Serialized

Replication Trade-offs

// Faster computation restart on failure
StorageLevel.MEMORY_AND_DISK_2     // 2x replication

// More storage overhead but better availability
StorageLevel.DISK_ONLY_3           // 3x replication

Usage with DataFrames and Datasets

import org.apache.spark.storage.StorageLevel

// DataFrame persistence
val df = spark.read.parquet("path/to/data")
df.persist(StorageLevel.MEMORY_AND_DISK_SER)

// Dataset persistence  
case class Person(name: String, age: Int)
val ds = spark.read.json("path/to/people").as[Person]
ds.persist(StorageLevel.MEMORY_ONLY_2)

// Check current storage level
println(s"DataFrame storage level: ${df.storageLevel}")

MemoryMode Enum

Enumeration defining memory allocation modes for storage operations.

/**
 * Memory allocation modes for Spark storage
 */
public enum MemoryMode {
  /** Store data in JVM heap memory */
  ON_HEAP,
  
  /** Store data in off-heap memory */
  OFF_HEAP
}

Usage Examples:

import org.apache.spark.memory.MemoryMode
import org.apache.spark.storage.StorageLevel

// Check memory mode of storage level
val level = StorageLevel.MEMORY_ONLY
val mode = level.memoryMode  // Returns MemoryMode.ON_HEAP

val offHeapLevel = StorageLevel.OFF_HEAP
val offHeapMode = offHeapLevel.memoryMode  // Returns MemoryMode.OFF_HEAP

// Memory mode affects performance characteristics
mode match {
  case MemoryMode.ON_HEAP => 
    println("Using JVM heap - subject to garbage collection")
  case MemoryMode.OFF_HEAP => 
    println("Using off-heap memory - reduced GC pressure")
}

Type Definitions

// Storage configuration class
class StorageLevel private(
  private var _useDisk: Boolean,
  private var _useMemory: Boolean,
  private var _useOffHeap: Boolean,
  private var _deserialized: Boolean,
  private var _replication: Int
) extends Externalizable {
  def isValid: Boolean
  def clone(): StorageLevel  
  def description: String
  def useDisk: Boolean
  def useMemory: Boolean
  def useOffHeap: Boolean
  def deserialized: Boolean
  def replication: Int
  private[spark] def memoryMode: MemoryMode
}

// Predefined storage level constants
object StorageLevel {
  val NONE: StorageLevel
  val DISK_ONLY: StorageLevel
  val DISK_ONLY_2: StorageLevel
  val DISK_ONLY_3: StorageLevel
  val MEMORY_ONLY: StorageLevel
  val MEMORY_ONLY_2: StorageLevel
  val MEMORY_ONLY_SER: StorageLevel
  val MEMORY_ONLY_SER_2: StorageLevel
  val MEMORY_AND_DISK: StorageLevel
  val MEMORY_AND_DISK_2: StorageLevel
  val MEMORY_AND_DISK_SER: StorageLevel
  val MEMORY_AND_DISK_SER_2: StorageLevel
  val OFF_HEAP: StorageLevel
}
// Memory allocation modes
public enum MemoryMode {
  ON_HEAP,
  OFF_HEAP
}

Install with Tessl CLI

npx tessl i tessl/maven-spark-common-utils

docs

exception-handling.md

index.md

java-api-functions.md

logging.md

network-utilities.md

storage-configuration.md

tile.json