Interactive Scala shell for Apache Spark with distributed computing capabilities
—
Custom class loader system for loading REPL-compiled classes on remote Spark executors with support for RPC and Hadoop filesystem access.
A ClassLoader that reads classes from a Hadoop FileSystem or Spark RPC endpoint, used to load classes defined by the interpreter when the REPL is used.
/**
* A ClassLoader for reading REPL-compiled classes from remote sources
* Supports both Spark RPC and Hadoop FileSystem for class distribution
* @param conf Spark configuration
* @param env Spark environment for RPC access
* @param classUri URI pointing to class file location (spark:// or hdfs://)
* @param parent Parent class loader
* @param userClassPathFirst Whether to prioritize user classpath over parent
*/
class ExecutorClassLoader(
conf: SparkConf,
env: SparkEnv,
classUri: String,
parent: ClassLoader,
userClassPathFirst: Boolean
) extends ClassLoader(null) with Logging {
/** URI parsing and directory path extraction */
val uri: URI
val directory: String
/** Parent class loader wrapper */
val parentLoader: ParentClassLoader
/**
* Find and load a class by name
* Respects userClassPathFirst setting for load order
* @param name Fully qualified class name
* @return Loaded Class object
*/
override def findClass(name: String): Class[_]
/**
* Find a class locally from the REPL class server
* @param name Fully qualified class name
* @return Some(Class) if found, None otherwise
*/
def findClassLocally(name: String): Option[Class[_]]
/**
* Read and transform class bytecode
* Special handling for REPL wrapper classes
* @param name Class name
* @param in Input stream with class bytes
* @return Transformed class bytecode
*/
def readAndTransformClass(name: String, in: InputStream): Array[Byte]
/**
* URL-encode a string, preserving only slashes
* @param str String to encode
* @return URL-encoded string
*/
def urlEncode(str: String): String
/** Get resource by name (delegates to parent) */
override def getResource(name: String): URL
/** Get all resources by name (delegates to parent) */
override def getResources(name: String): java.util.Enumeration[URL]
/** Get resource as input stream with REPL class support */
override def getResourceAsStream(name: String): InputStream
}Usage Examples:
import org.apache.spark.repl.ExecutorClassLoader
import org.apache.spark.{SparkConf, SparkEnv}
// Create class loader for RPC-based class loading
val conf = new SparkConf()
val env = SparkEnv.get
val classUri = "spark://driver-host:port/classes"
val parentClassLoader = Thread.currentThread().getContextClassLoader
val userClassPathFirst = false
val classLoader = new ExecutorClassLoader(
conf, env, classUri, parentClassLoader, userClassPathFirst
)
// Load a REPL-compiled class
val clazz = classLoader.findClass("line123$iw$MyClass")
// Check if class exists locally
val maybeClass = classLoader.findClassLocally("com.example.MyClass")ASM visitor for cleaning REPL wrapper class constructors to prevent initialization issues.
/**
* ASM ClassVisitor that cleans constructor initialization code
* from REPL-generated wrapper classes to prevent execution issues
* @param className Name of the class being cleaned
* @param cv Target ClassVisitor for output
*/
class ConstructorCleaner(className: String, cv: ClassVisitor) extends ClassVisitor(ASM6, cv) {
/**
* Visit and potentially modify method definitions
* Replaces constructor bodies for wrapper classes
* @param access Method access flags
* @param name Method name
* @param desc Method descriptor
* @param sig Method signature
* @param exceptions Exception types
* @return MethodVisitor for method processing
*/
override def visitMethod(
access: Int,
name: String,
desc: String,
sig: String,
exceptions: Array[String]
): MethodVisitor
}The userClassPathFirst parameter controls class loading precedence:
User-First Loading (userClassPathFirst = true):
Parent-First Loading (userClassPathFirst = false):
Two methods for accessing remote classes:
Spark RPC Protocol (spark://):
private def getClassFileInputStreamFromSparkRPC(path: String): InputStream = {
val channel = env.rpcEnv.openChannel(s"$classUri/$path")
new FilterInputStream(Channels.newInputStream(channel)) {
// Error handling converts exceptions to ClassNotFoundException
}
}Hadoop FileSystem (hdfs://, file://, etc.):
private def getClassFileInputStreamFromFileSystem(fileSystem: FileSystem)(pathInDirectory: String): InputStream = {
val path = new Path(directory, pathInDirectory)
try {
fileSystem.open(path)
} catch {
case _: FileNotFoundException =>
throw new ClassNotFoundException(s"Class file not found at path $path")
}
}Special transformation for REPL-generated wrapper classes:
def readAndTransformClass(name: String, in: InputStream): Array[Byte] = {
if (name.startsWith("line") && name.endsWith("$iw$")) {
// Transform wrapper class constructor
val cr = new ClassReader(in)
val cw = new ClassWriter(ClassWriter.COMPUTE_FRAMES + ClassWriter.COMPUTE_MAXS)
val cleaner = new ConstructorCleaner(name, cw)
cr.accept(cleaner, 0)
cw.toByteArray
} else {
// Pass through unchanged
// ... standard byte copying logic
}
}The ConstructorCleaner replaces constructor bodies with minimal initialization:
this referenceObject.<init>()This prevents execution of REPL initialization code that should run later through reflection.
Special handling for .class files as resources:
private def getClassResourceAsStreamLocally(name: String): InputStream = {
try {
if (name.endsWith(".class")) fetchFn(name) else null
} catch {
case _: ClassNotFoundException => null
}
}Non-class resources delegate to parent class loader:
// Class not found locally
case e: ClassNotFoundException =>
logDebug(s"Did not load class $name from REPL class server at $uri", e)
None
// General loading errors
case e: Exception =>
logError(s"Failed to check existence of class $name on REPL class server at $uri", e)
NoneRPC errors are converted to ClassNotFoundException:
private def toClassNotFound(fn: => Int): Int = {
try {
fn
} catch {
case e: Exception =>
throw new ClassNotFoundException(path, e)
}
}FileSystem errors map to class loading failures:
try {
fileSystem.open(path)
} catch {
case _: FileNotFoundException =>
throw new ClassNotFoundException(s"Class file not found at path $path")
}Testing/debugging timeout configuration:
/** HTTP connection timeout in milliseconds (testing/debugging) */
private[repl] var httpUrlConnectionTimeoutMillis: Int = -1Path encoding for safe transmission:
def urlEncode(str: String): String = {
str.split('/').map(part => URLEncoder.encode(part, "UTF-8")).mkString("/")
}SparkEnv.rpcEnv for RPC-based class loadingInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-repl-2-11