0
# Distributed Class Loading
1
2
Custom class loader system for loading REPL-compiled classes on remote Spark executors with support for RPC and Hadoop filesystem access.
3
4
## Capabilities
5
6
### ExecutorClassLoader
7
8
A ClassLoader that reads classes from a Hadoop FileSystem or Spark RPC endpoint, used to load classes defined by the interpreter when the REPL is used.
9
10
```scala { .api }
11
/**
12
* A ClassLoader for reading REPL-compiled classes from remote sources
13
* Supports both Spark RPC and Hadoop FileSystem for class distribution
14
* @param conf Spark configuration
15
* @param env Spark environment for RPC access
16
* @param classUri URI pointing to class file location (spark:// or hdfs://)
17
* @param parent Parent class loader
18
* @param userClassPathFirst Whether to prioritize user classpath over parent
19
*/
20
class ExecutorClassLoader(
21
conf: SparkConf,
22
env: SparkEnv,
23
classUri: String,
24
parent: ClassLoader,
25
userClassPathFirst: Boolean
26
) extends ClassLoader(null) with Logging {
27
28
/** URI parsing and directory path extraction */
29
val uri: URI
30
val directory: String
31
32
/** Parent class loader wrapper */
33
val parentLoader: ParentClassLoader
34
35
/**
36
* Find and load a class by name
37
* Respects userClassPathFirst setting for load order
38
* @param name Fully qualified class name
39
* @return Loaded Class object
40
*/
41
override def findClass(name: String): Class[_]
42
43
/**
44
* Find a class locally from the REPL class server
45
* @param name Fully qualified class name
46
* @return Some(Class) if found, None otherwise
47
*/
48
def findClassLocally(name: String): Option[Class[_]]
49
50
/**
51
* Read and transform class bytecode
52
* Special handling for REPL wrapper classes
53
* @param name Class name
54
* @param in Input stream with class bytes
55
* @return Transformed class bytecode
56
*/
57
def readAndTransformClass(name: String, in: InputStream): Array[Byte]
58
59
/**
60
* URL-encode a string, preserving only slashes
61
* @param str String to encode
62
* @return URL-encoded string
63
*/
64
def urlEncode(str: String): String
65
66
/** Get resource by name (delegates to parent) */
67
override def getResource(name: String): URL
68
69
/** Get all resources by name (delegates to parent) */
70
override def getResources(name: String): java.util.Enumeration[URL]
71
72
/** Get resource as input stream with REPL class support */
73
override def getResourceAsStream(name: String): InputStream
74
}
75
```
76
77
**Usage Examples:**
78
79
```scala
80
import org.apache.spark.repl.ExecutorClassLoader
81
import org.apache.spark.{SparkConf, SparkEnv}
82
83
// Create class loader for RPC-based class loading
84
val conf = new SparkConf()
85
val env = SparkEnv.get
86
val classUri = "spark://driver-host:port/classes"
87
val parentClassLoader = Thread.currentThread().getContextClassLoader
88
val userClassPathFirst = false
89
90
val classLoader = new ExecutorClassLoader(
91
conf, env, classUri, parentClassLoader, userClassPathFirst
92
)
93
94
// Load a REPL-compiled class
95
val clazz = classLoader.findClass("line123$iw$MyClass")
96
97
// Check if class exists locally
98
val maybeClass = classLoader.findClassLocally("com.example.MyClass")
99
```
100
101
### ConstructorCleaner
102
103
ASM visitor for cleaning REPL wrapper class constructors to prevent initialization issues.
104
105
```scala { .api }
106
/**
107
* ASM ClassVisitor that cleans constructor initialization code
108
* from REPL-generated wrapper classes to prevent execution issues
109
* @param className Name of the class being cleaned
110
* @param cv Target ClassVisitor for output
111
*/
112
class ConstructorCleaner(className: String, cv: ClassVisitor) extends ClassVisitor(ASM6, cv) {
113
/**
114
* Visit and potentially modify method definitions
115
* Replaces constructor bodies for wrapper classes
116
* @param access Method access flags
117
* @param name Method name
118
* @param desc Method descriptor
119
* @param sig Method signature
120
* @param exceptions Exception types
121
* @return MethodVisitor for method processing
122
*/
123
override def visitMethod(
124
access: Int,
125
name: String,
126
desc: String,
127
sig: String,
128
exceptions: Array[String]
129
): MethodVisitor
130
}
131
```
132
133
## Class Loading Strategies
134
135
### Load Order Control
136
137
The `userClassPathFirst` parameter controls class loading precedence:
138
139
**User-First Loading (`userClassPathFirst = true`)**:
140
1. Check REPL class server first
141
2. Fall back to parent class loader
142
143
**Parent-First Loading (`userClassPathFirst = false`)**:
144
1. Try parent class loader first
145
2. Check REPL class server on ClassNotFoundException
146
147
### Remote Class Access
148
149
Two methods for accessing remote classes:
150
151
**Spark RPC Protocol (`spark://`)**:
152
```scala
153
private def getClassFileInputStreamFromSparkRPC(path: String): InputStream = {
154
val channel = env.rpcEnv.openChannel(s"$classUri/$path")
155
new FilterInputStream(Channels.newInputStream(channel)) {
156
// Error handling converts exceptions to ClassNotFoundException
157
}
158
}
159
```
160
161
**Hadoop FileSystem (`hdfs://`, `file://`, etc.)**:
162
```scala
163
private def getClassFileInputStreamFromFileSystem(fileSystem: FileSystem)(pathInDirectory: String): InputStream = {
164
val path = new Path(directory, pathInDirectory)
165
try {
166
fileSystem.open(path)
167
} catch {
168
case _: FileNotFoundException =>
169
throw new ClassNotFoundException(s"Class file not found at path $path")
170
}
171
}
172
```
173
174
## Bytecode Transformation
175
176
### REPL Wrapper Class Handling
177
178
Special transformation for REPL-generated wrapper classes:
179
180
```scala
181
def readAndTransformClass(name: String, in: InputStream): Array[Byte] = {
182
if (name.startsWith("line") && name.endsWith("$iw$")) {
183
// Transform wrapper class constructor
184
val cr = new ClassReader(in)
185
val cw = new ClassWriter(ClassWriter.COMPUTE_FRAMES + ClassWriter.COMPUTE_MAXS)
186
val cleaner = new ConstructorCleaner(name, cw)
187
cr.accept(cleaner, 0)
188
cw.toByteArray
189
} else {
190
// Pass through unchanged
191
// ... standard byte copying logic
192
}
193
}
194
```
195
196
### Constructor Cleaning
197
198
The `ConstructorCleaner` replaces constructor bodies with minimal initialization:
199
200
1. **Load `this` reference**
201
2. **Call `Object.<init>()`**
202
3. **Return immediately**
203
4. **Skip original initialization code**
204
205
This prevents execution of REPL initialization code that should run later through reflection.
206
207
## Resource Handling
208
209
### Class File Resources
210
211
Special handling for `.class` files as resources:
212
213
```scala
214
private def getClassResourceAsStreamLocally(name: String): InputStream = {
215
try {
216
if (name.endsWith(".class")) fetchFn(name) else null
217
} catch {
218
case _: ClassNotFoundException => null
219
}
220
}
221
```
222
223
### Resource Delegation
224
225
Non-class resources delegate to parent class loader:
226
227
- **getResource**: Returns parent's resource URL
228
- **getResources**: Returns parent's resource enumeration
229
- **getResourceAsStream**: Tries REPL classes first (if user-first), then parent
230
231
## Error Handling
232
233
### Class Loading Errors
234
235
```scala
236
// Class not found locally
237
case e: ClassNotFoundException =>
238
logDebug(s"Did not load class $name from REPL class server at $uri", e)
239
None
240
241
// General loading errors
242
case e: Exception =>
243
logError(s"Failed to check existence of class $name on REPL class server at $uri", e)
244
None
245
```
246
247
### RPC Channel Errors
248
249
RPC errors are converted to ClassNotFoundException:
250
251
```scala
252
private def toClassNotFound(fn: => Int): Int = {
253
try {
254
fn
255
} catch {
256
case e: Exception =>
257
throw new ClassNotFoundException(path, e)
258
}
259
}
260
```
261
262
### FileSystem Errors
263
264
FileSystem errors map to class loading failures:
265
266
```scala
267
try {
268
fileSystem.open(path)
269
} catch {
270
case _: FileNotFoundException =>
271
throw new ClassNotFoundException(s"Class file not found at path $path")
272
}
273
```
274
275
## Configuration
276
277
### HTTP Timeout Control
278
279
Testing/debugging timeout configuration:
280
281
```scala
282
/** HTTP connection timeout in milliseconds (testing/debugging) */
283
private[repl] var httpUrlConnectionTimeoutMillis: Int = -1
284
```
285
286
### URL Encoding
287
288
Path encoding for safe transmission:
289
290
```scala
291
def urlEncode(str: String): String = {
292
str.split('/').map(part => URLEncoder.encode(part, "UTF-8")).mkString("/")
293
}
294
```
295
296
## Integration Points
297
298
### Spark Environment Integration
299
300
- Uses `SparkEnv.rpcEnv` for RPC-based class loading
301
- Integrates with Spark's configuration system
302
- Leverages Hadoop utilities for FileSystem access
303
304
### Scala Compiler Integration
305
306
- Works with Scala's class loading architecture
307
- Handles REPL-specific class naming conventions
308
- Supports dynamic class compilation and loading
309
310
### Executor Integration
311
312
- Designed for use on Spark executors
313
- Handles distributed class distribution
314
- Manages parent class loader relationships correctly