CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-repl-2-11

Interactive Scala shell for Apache Spark with distributed computing capabilities

Pending
Overview
Eval results
Files

class-loading.mddocs/

Distributed Class Loading

Custom class loader system for loading REPL-compiled classes on remote Spark executors with support for RPC and Hadoop filesystem access.

Capabilities

ExecutorClassLoader

A ClassLoader that reads classes from a Hadoop FileSystem or Spark RPC endpoint, used to load classes defined by the interpreter when the REPL is used.

/**
 * A ClassLoader for reading REPL-compiled classes from remote sources
 * Supports both Spark RPC and Hadoop FileSystem for class distribution
 * @param conf Spark configuration
 * @param env Spark environment for RPC access
 * @param classUri URI pointing to class file location (spark:// or hdfs://)
 * @param parent Parent class loader
 * @param userClassPathFirst Whether to prioritize user classpath over parent
 */
class ExecutorClassLoader(
  conf: SparkConf,
  env: SparkEnv,
  classUri: String,
  parent: ClassLoader,
  userClassPathFirst: Boolean
) extends ClassLoader(null) with Logging {
  
  /** URI parsing and directory path extraction */
  val uri: URI
  val directory: String
  
  /** Parent class loader wrapper */
  val parentLoader: ParentClassLoader
  
  /**
   * Find and load a class by name
   * Respects userClassPathFirst setting for load order
   * @param name Fully qualified class name
   * @return Loaded Class object
   */
  override def findClass(name: String): Class[_]
  
  /**
   * Find a class locally from the REPL class server
   * @param name Fully qualified class name  
   * @return Some(Class) if found, None otherwise
   */
  def findClassLocally(name: String): Option[Class[_]]
  
  /**
   * Read and transform class bytecode
   * Special handling for REPL wrapper classes
   * @param name Class name
   * @param in Input stream with class bytes
   * @return Transformed class bytecode
   */
  def readAndTransformClass(name: String, in: InputStream): Array[Byte]
  
  /**
   * URL-encode a string, preserving only slashes
   * @param str String to encode
   * @return URL-encoded string
   */
  def urlEncode(str: String): String
  
  /** Get resource by name (delegates to parent) */
  override def getResource(name: String): URL
  
  /** Get all resources by name (delegates to parent) */
  override def getResources(name: String): java.util.Enumeration[URL]
  
  /** Get resource as input stream with REPL class support */
  override def getResourceAsStream(name: String): InputStream
}

Usage Examples:

import org.apache.spark.repl.ExecutorClassLoader
import org.apache.spark.{SparkConf, SparkEnv}

// Create class loader for RPC-based class loading
val conf = new SparkConf()
val env = SparkEnv.get
val classUri = "spark://driver-host:port/classes"
val parentClassLoader = Thread.currentThread().getContextClassLoader
val userClassPathFirst = false

val classLoader = new ExecutorClassLoader(
  conf, env, classUri, parentClassLoader, userClassPathFirst
)

// Load a REPL-compiled class
val clazz = classLoader.findClass("line123$iw$MyClass")

// Check if class exists locally
val maybeClass = classLoader.findClassLocally("com.example.MyClass")

ConstructorCleaner

ASM visitor for cleaning REPL wrapper class constructors to prevent initialization issues.

/**
 * ASM ClassVisitor that cleans constructor initialization code
 * from REPL-generated wrapper classes to prevent execution issues
 * @param className Name of the class being cleaned
 * @param cv Target ClassVisitor for output
 */
class ConstructorCleaner(className: String, cv: ClassVisitor) extends ClassVisitor(ASM6, cv) {
  /**
   * Visit and potentially modify method definitions
   * Replaces constructor bodies for wrapper classes
   * @param access Method access flags
   * @param name Method name
   * @param desc Method descriptor
   * @param sig Method signature
   * @param exceptions Exception types
   * @return MethodVisitor for method processing
   */
  override def visitMethod(
    access: Int, 
    name: String, 
    desc: String,
    sig: String, 
    exceptions: Array[String]
  ): MethodVisitor
}

Class Loading Strategies

Load Order Control

The userClassPathFirst parameter controls class loading precedence:

User-First Loading (userClassPathFirst = true):

  1. Check REPL class server first
  2. Fall back to parent class loader

Parent-First Loading (userClassPathFirst = false):

  1. Try parent class loader first
  2. Check REPL class server on ClassNotFoundException

Remote Class Access

Two methods for accessing remote classes:

Spark RPC Protocol (spark://):

private def getClassFileInputStreamFromSparkRPC(path: String): InputStream = {
  val channel = env.rpcEnv.openChannel(s"$classUri/$path")
  new FilterInputStream(Channels.newInputStream(channel)) {
    // Error handling converts exceptions to ClassNotFoundException
  }
}

Hadoop FileSystem (hdfs://, file://, etc.):

private def getClassFileInputStreamFromFileSystem(fileSystem: FileSystem)(pathInDirectory: String): InputStream = {
  val path = new Path(directory, pathInDirectory)
  try {
    fileSystem.open(path)
  } catch {
    case _: FileNotFoundException =>
      throw new ClassNotFoundException(s"Class file not found at path $path")
  }
}

Bytecode Transformation

REPL Wrapper Class Handling

Special transformation for REPL-generated wrapper classes:

def readAndTransformClass(name: String, in: InputStream): Array[Byte] = {
  if (name.startsWith("line") && name.endsWith("$iw$")) {
    // Transform wrapper class constructor
    val cr = new ClassReader(in)
    val cw = new ClassWriter(ClassWriter.COMPUTE_FRAMES + ClassWriter.COMPUTE_MAXS)
    val cleaner = new ConstructorCleaner(name, cw)
    cr.accept(cleaner, 0)
    cw.toByteArray
  } else {
    // Pass through unchanged
    // ... standard byte copying logic
  }
}

Constructor Cleaning

The ConstructorCleaner replaces constructor bodies with minimal initialization:

  1. Load this reference
  2. Call Object.<init>()
  3. Return immediately
  4. Skip original initialization code

This prevents execution of REPL initialization code that should run later through reflection.

Resource Handling

Class File Resources

Special handling for .class files as resources:

private def getClassResourceAsStreamLocally(name: String): InputStream = {
  try {
    if (name.endsWith(".class")) fetchFn(name) else null
  } catch {
    case _: ClassNotFoundException => null
  }
}

Resource Delegation

Non-class resources delegate to parent class loader:

  • getResource: Returns parent's resource URL
  • getResources: Returns parent's resource enumeration
  • getResourceAsStream: Tries REPL classes first (if user-first), then parent

Error Handling

Class Loading Errors

// Class not found locally
case e: ClassNotFoundException =>
  logDebug(s"Did not load class $name from REPL class server at $uri", e)
  None

// General loading errors  
case e: Exception =>
  logError(s"Failed to check existence of class $name on REPL class server at $uri", e)
  None

RPC Channel Errors

RPC errors are converted to ClassNotFoundException:

private def toClassNotFound(fn: => Int): Int = {
  try {
    fn
  } catch {
    case e: Exception =>
      throw new ClassNotFoundException(path, e)
  }
}

FileSystem Errors

FileSystem errors map to class loading failures:

try {
  fileSystem.open(path)
} catch {
  case _: FileNotFoundException =>
    throw new ClassNotFoundException(s"Class file not found at path $path")
}

Configuration

HTTP Timeout Control

Testing/debugging timeout configuration:

/** HTTP connection timeout in milliseconds (testing/debugging) */
private[repl] var httpUrlConnectionTimeoutMillis: Int = -1

URL Encoding

Path encoding for safe transmission:

def urlEncode(str: String): String = {
  str.split('/').map(part => URLEncoder.encode(part, "UTF-8")).mkString("/")
}

Integration Points

Spark Environment Integration

  • Uses SparkEnv.rpcEnv for RPC-based class loading
  • Integrates with Spark's configuration system
  • Leverages Hadoop utilities for FileSystem access

Scala Compiler Integration

  • Works with Scala's class loading architecture
  • Handles REPL-specific class naming conventions
  • Supports dynamic class compilation and loading

Executor Integration

  • Designed for use on Spark executors
  • Handles distributed class distribution
  • Manages parent class loader relationships correctly

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-repl-2-11

docs

class-loading.md

index.md

interactive-shell.md

main-entry.md

scala-compatibility.md

signal-handling.md

tile.json