or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

auto-completion.mdcode-interpreter.mddistributed-class-loading.mdindex.mdinteractive-shell.md
tile.json

distributed-class-loading.mddocs/

Distributed Class Loading

The Distributed Class Loading system enables loading REPL-compiled classes on Spark executors across the cluster. It supports fetching classes from various sources including HTTP servers, Hadoop file systems, and Spark RPC endpoints.

Capabilities

ExecutorClassLoader Class

The main class loader for distributing REPL-defined classes to Spark executors.

/**
 * ClassLoader for loading REPL-defined classes on executors from various sources
 * @param conf SparkConf configuration
 * @param env SparkEnv environment  
 * @param classUri URI where classes can be fetched (HTTP, HDFS, or Spark RPC)
 * @param parent Parent ClassLoader
 * @param userClassPathFirst Whether to prioritize user classpath over system classpath
 */
class ExecutorClassLoader(
  conf: SparkConf,
  env: SparkEnv, 
  classUri: String,
  parent: ClassLoader,
  userClassPathFirst: Boolean
) extends ClassLoader(parent) {

  /**
   * Find and load a class by name
   * @param name Fully qualified class name
   * @return Loaded Class instance
   * @throws ClassNotFoundException if class cannot be found
   */
  override def findClass(name: String): Class[_]
  
  /**
   * Get a resource by name from the class URI
   * @param name Resource name
   * @return URL to the resource or null if not found
   */
  override def getResource(name: String): URL
  
  /**
   * Get all resources with a given name
   * @param name Resource name
   * @return Enumeration of URLs for matching resources
   */
  override def getResources(name: String): java.util.Enumeration[URL]
}

Usage Examples:

import org.apache.spark.repl.ExecutorClassLoader
import org.apache.spark.{SparkConf, SparkEnv}

// Create configuration
val conf = new SparkConf()
val env = SparkEnv.get  // Get current Spark environment

// Create class loader for HTTP-based class serving
val httpClassLoader = new ExecutorClassLoader(
  conf = conf,
  env = env,
  classUri = "http://driver-host:12345/classes",
  parent = Thread.currentThread().getContextClassLoader,
  userClassPathFirst = true
)

// Load a REPL-defined class on executor
val className = "org.apache.spark.repl.ExecutorClassLoader$$anonfun$1"
val loadedClass = httpClassLoader.findClass(className)
println(s"Loaded class: ${loadedClass.getName}")

Local Class Finding

Methods for finding classes locally before attempting remote fetch.

/**
 * Find a class locally without remote fetching
 * @param name Fully qualified class name
 * @return Optional Class instance if found locally
 */
def findClassLocally(name: String): Option[Class[_]]

Usage Example:

val classLoader = new ExecutorClassLoader(conf, env, classUri, parent, true)

// Try to find class locally first
val localClass = classLoader.findClassLocally("com.example.LocalClass")
localClass match {
  case Some(clazz) => println(s"Found locally: ${clazz.getName}")
  case None => println("Class not available locally, will fetch remotely")
}

Class Transformation

Methods for reading and transforming class bytecode during loading.

/**
 * Read and transform class bytes from input stream
 * @param name Class name for transformation context
 * @param in InputStream containing class bytecode
 * @return Transformed class bytecode
 */
def readAndTransformClass(name: String, in: InputStream): Array[Byte]

Usage Example:

import java.io.{FileInputStream, InputStream}

val classLoader = new ExecutorClassLoader(conf, env, classUri, parent, true)

// Read and transform class file
val classFile = new FileInputStream("/path/to/MyClass.class")
val transformedBytes = classLoader.readAndTransformClass(
  "com.example.MyClass", 
  classFile
)

println(s"Transformed class size: ${transformedBytes.length} bytes")
classFile.close()

URL Encoding Utilities

Utility methods for handling URL encoding in distributed environments.

/**
 * URL-encode a string while preserving forward slashes
 * @param str String to encode
 * @return URL-encoded string with slashes preserved
 */
def urlEncode(str: String): String

Usage Example:

val classLoader = new ExecutorClassLoader(conf, env, classUri, parent, true)

// Encode class name for URL usage
val className = "com.example.MyClass$InnerClass"
val encodedName = classLoader.urlEncode(className)
println(s"Encoded class name: $encodedName")

// Can be used in HTTP requests
val classUrl = s"$classUri/${encodedName.replace('.', '/')}.class"

Signal Handling

Utilities for handling interrupts and signals in REPL environment.

/**
 * Signal handling utilities for the REPL
 */
object Signaling {
  /**
   * Register SIGINT handler for job cancellation
   * Sets up interrupt handling to cancel running Spark jobs
   */
  def cancelOnInterrupt(): Unit
}

Usage Example:

import org.apache.spark.repl.Signaling

// Set up interrupt handling for REPL
Signaling.cancelOnInterrupt()

// Now CTRL+C will properly cancel running Spark jobs
// instead of terminating the entire REPL session

Class Loading Strategies

HTTP-Based Class Serving

The most common approach for distributing REPL classes is through HTTP serving from the driver.

import org.apache.spark.repl.ExecutorClassLoader
import org.apache.spark.{SparkConf, SparkEnv}

// Configuration for HTTP class serving
val conf = new SparkConf()
  .set("spark.repl.class.uri", "http://driver-host:12345/classes")

val env = SparkEnv.get
val httpUri = conf.get("spark.repl.class.uri")

val classLoader = new ExecutorClassLoader(
  conf = conf,
  env = env, 
  classUri = httpUri,
  parent = Thread.currentThread().getContextClassLoader,
  userClassPathFirst = true
)

// Classes will be fetched via HTTP when needed
val replClass = classLoader.findClass("$line1.$read$$iw$$iw$MyFunction")

HDFS-Based Class Distribution

For clusters with shared storage, classes can be distributed via HDFS.

import org.apache.spark.repl.ExecutorClassLoader

val conf = new SparkConf()
  .set("spark.repl.class.uri", "hdfs://namenode:9000/spark-repl/classes")

val hdfsClassLoader = new ExecutorClassLoader(
  conf = conf,
  env = SparkEnv.get,
  classUri = "hdfs://namenode:9000/spark-repl/classes",
  parent = Thread.currentThread().getContextClassLoader,
  userClassPathFirst = true
)

// Classes will be fetched from HDFS
val distributedClass = hdfsClassLoader.findClass("$line5.$read$$iw$$iw$ProcessData")

Spark RPC-Based Class Loading

Advanced setups can use Spark's internal RPC system for class distribution.

import org.apache.spark.repl.ExecutorClassLoader

val rpcClassLoader = new ExecutorClassLoader(
  conf = new SparkConf(),
  env = SparkEnv.get,
  classUri = "spark-rpc://driver:7077/class-server", 
  parent = Thread.currentThread().getContextClassLoader,
  userClassPathFirst = false  // System classpath first for RPC
)

// Classes fetched via Spark RPC
val rpcClass = rpcClassLoader.findClass("$line3.$read$$iw$$iw$DataTransformer")

Integration Patterns

Setting Up Distributed Class Loading

import org.apache.spark.repl.{SparkIMain, ExecutorClassLoader}
import org.apache.spark.{SparkConf, SparkContext, SparkEnv}

// Create Spark configuration with class server
val conf = new SparkConf()
  .setAppName("REPL with Distributed Classes")
  .setMaster("spark://cluster:7077")
  .set("spark.repl.class.outputDir", "/tmp/spark-repl-classes")
  .set("spark.repl.class.uri", "http://driver:12345/classes")

// Initialize Spark context
val sc = new SparkContext(conf)

// Create interpreter that will compile classes to shared location
val interpreter = new SparkIMain()
interpreter.initializeSynchronous()

// Define function in REPL
interpreter.interpret("""
  def processData(data: Array[Int]): Array[Int] = {
    data.map(_ * 2).filter(_ > 10)
  }
""")

// Create RDD that uses REPL-defined function
val rdd = sc.parallelize(1 to 100)
val processed = rdd.map(processData)  // Function distributed automatically

val results = processed.collect()
println(s"Processed ${results.length} elements")

Custom Class Transformation

import org.apache.spark.repl.ExecutorClassLoader
import java.io.{ByteArrayInputStream, InputStream}

// Custom class loader with transformation logic
class CustomExecutorClassLoader(
  conf: SparkConf,
  env: SparkEnv,
  classUri: String,
  parent: ClassLoader,
  userClassPathFirst: Boolean
) extends ExecutorClassLoader(conf, env, classUri, parent, userClassPathFirst) {

  override def readAndTransformClass(name: String, in: InputStream): Array[Byte] = {
    val originalBytes = super.readAndTransformClass(name, in)
    
    // Apply custom transformations
    if (name.contains("$$iw$$iw")) {
      // Transform REPL-generated classes
      transformReplClass(originalBytes)
    } else {
      originalBytes
    }
  }
  
  private def transformReplClass(bytes: Array[Byte]): Array[Byte] = {
    // Custom bytecode transformation logic
    // For example: add debugging information, optimize code, etc.
    bytes  // Return transformed bytes
  }
}

Error Handling and Fallbacks

import org.apache.spark.repl.ExecutorClassLoader
import scala.util.{Try, Success, Failure}

class RobustExecutorClassLoader(
  conf: SparkConf,
  env: SparkEnv,
  classUri: String,
  parent: ClassLoader,
  userClassPathFirst: Boolean
) extends ExecutorClassLoader(conf, env, classUri, parent, userClassPathFirst) {

  override def findClass(name: String): Class[_] = {
    // Try local first
    findClassLocally(name) match {
      case Some(clazz) => clazz
      case None =>
        // Try remote with error handling
        Try(super.findClass(name)) match {
          case Success(clazz) => clazz
          case Failure(exception) =>
            // Log error and try fallback strategies
            logClassLoadFailure(name, exception)
            tryFallbackStrategies(name)
        }
    }
  }
  
  private def logClassLoadFailure(name: String, error: Throwable): Unit = {
    println(s"Failed to load class $name: ${error.getMessage}")
  }
  
  private def tryFallbackStrategies(name: String): Class[_] = {
    // Implement fallback strategies
    // 1. Try alternative class URIs
    // 2. Check if class is available in system classpath
    // 3. Generate stub class if necessary
    throw new ClassNotFoundException(s"Could not load class $name with any strategy")
  }
}