0
# Class Loading
1
2
Dynamic class loading infrastructure for distributing REPL-generated classes to cluster executors.
3
4
## Capabilities
5
6
### ExecutorClassLoader Class
7
8
The `ExecutorClassLoader` enables distribution of classes generated in the REPL to remote Spark executors, allowing interactive code to execute across the cluster.
9
10
```scala { .api }
11
/**
12
* ClassLoader for loading REPL-generated classes in executors
13
* @param conf Spark configuration
14
* @param env Spark environment
15
* @param classUri URI where classes can be fetched (typically HTTP server)
16
* @param parent Parent class loader for delegation
17
* @param userClassPathFirst Whether to prioritize user classpath over system
18
*/
19
class ExecutorClassLoader(
20
conf: SparkConf,
21
env: SparkEnv,
22
classUri: String,
23
parent: ClassLoader,
24
userClassPathFirst: Boolean
25
) extends ClassLoader
26
```
27
28
**Usage Examples:**
29
30
```scala
31
import org.apache.spark.repl.ExecutorClassLoader
32
import org.apache.spark.{SparkConf, SparkEnv}
33
34
// Create executor class loader for cluster distribution
35
val conf = new SparkConf()
36
val env = SparkEnv.get
37
val classUri = "spark://driver-node:4040/classes"
38
val parentLoader = Thread.currentThread().getContextClassLoader
39
40
val execLoader = new ExecutorClassLoader(
41
conf = conf,
42
env = env,
43
classUri = classUri,
44
parent = parentLoader,
45
userClassPathFirst = false
46
)
47
48
// Use as context class loader for executor threads
49
Thread.currentThread().setContextClassLoader(execLoader)
50
```
51
52
### Class Discovery and Loading
53
54
Core class loading functionality with HTTP-based class fetching and local caching.
55
56
```scala { .api }
57
/**
58
* Find and load a class by name
59
* First attempts local lookup, then fetches from class URI
60
* @param name Fully qualified class name
61
* @return Loaded Class object
62
* @throws ClassNotFoundException if class cannot be found
63
*/
64
def findClass(name: String): Class[_]
65
66
/**
67
* Attempt to find class locally without network fetch
68
* @param name Fully qualified class name
69
* @return Some(Class) if found locally, None otherwise
70
*/
71
def findClassLocally(name: String): Option[Class[_]]
72
73
/**
74
* Read class bytecode from input stream and apply transformations
75
* Handles REPL wrapper class cleanup for proper execution
76
* @param name Class name being loaded
77
* @param in InputStream containing class bytecode
78
* @return Transformed class bytecode
79
*/
80
def readAndTransformClass(name: String, in: InputStream): Array[Byte]
81
```
82
83
**Usage Examples:**
84
85
```scala
86
// Load a REPL-generated class
87
val clazz = execLoader.findClass("$line3.$read$$iw$$iw$MyClass")
88
val instance = clazz.newInstance()
89
90
// Check if class exists locally first
91
execLoader.findClassLocally("com.example.UserClass") match {
92
case Some(clazz) => println(s"Found locally: ${clazz.getName}")
93
case None => println("Class not available locally")
94
}
95
96
// Manual class transformation (rarely needed)
97
val classBytes = Files.readAllBytes(Paths.get("MyClass.class"))
98
val transformedBytes = execLoader.readAndTransformClass(
99
"MyClass",
100
new ByteArrayInputStream(classBytes)
101
)
102
```
103
104
### Resource Loading
105
106
Resource loading with HTTP fallback for files not available locally.
107
108
```scala { .api }
109
/**
110
* Get resource URL, checking local classpath first then remote URI
111
* @param name Resource name/path
112
* @return URL to resource or null if not found
113
*/
114
def getResource(name: String): URL
115
116
/**
117
* Get all resource URLs matching the name
118
* Combines local and remote resources
119
* @param name Resource name/path
120
* @return Enumeration of matching URLs
121
*/
122
def getResources(name: String): java.util.Enumeration[URL]
123
124
/**
125
* Get resource as input stream
126
* @param name Resource name/path
127
* @return InputStream or null if not found
128
*/
129
def getResourceAsStream(name: String): InputStream
130
```
131
132
**Usage Examples:**
133
134
```scala
135
// Load configuration files
136
val configUrl = execLoader.getResource("application.conf")
137
if (configUrl != null) {
138
val config = ConfigFactory.parseURL(configUrl)
139
}
140
141
// Load resource as stream
142
val stream = execLoader.getResourceAsStream("data/sample.json")
143
if (stream != null) {
144
val content = Source.fromInputStream(stream).mkString
145
stream.close()
146
}
147
148
// Find all matching resources
149
import scala.collection.JavaConverters._
150
val urls = execLoader.getResources("META-INF/services/").asScala
151
urls.foreach(url => println(s"Service: $url"))
152
```
153
154
### Configuration and State
155
156
Configuration properties and internal state management.
157
158
```scala { .api }
159
/**
160
* URI for fetching classes and resources from driver
161
* Typically HTTP server running on driver node
162
*/
163
val uri: URI
164
165
/**
166
* Directory path component of the class URI
167
* Used for constructing resource paths
168
*/
169
val directory: String
170
171
/**
172
* Parent class loader wrapper with delegation control
173
* Handles user-classpath-first vs parent-first loading
174
*/
175
val parentLoader: ParentClassLoader
176
177
/**
178
* HTTP connection timeout in milliseconds (visible for testing)
179
* Default: -1 (no timeout set)
180
* Package-private mutable variable for testing configuration
181
*/
182
private[repl] var httpUrlConnectionTimeoutMillis: Int
183
```
184
185
**Configuration Examples:**
186
187
```scala
188
// Check class loader configuration
189
println(s"Class URI: ${execLoader.uri}")
190
println(s"Directory: ${execLoader.directory}")
191
println(s"Timeout: ${execLoader.httpUrlConnectionTimeoutMillis}ms")
192
193
// Examine parent loader behavior
194
val parentLoader = execLoader.parentLoader
195
println(s"User classpath first: ${parentLoader.userClassPathFirst}")
196
```
197
198
### Internal Class Loading Methods
199
200
Internal methods for fetching class files from different sources (HTTP RPC, file system).
201
202
```scala { .api }
203
/**
204
* Fetch class file via Spark RPC from driver node
205
* @param path Relative path to class file
206
* @return InputStream containing class bytecode
207
* @throws ClassNotFoundException if class cannot be fetched
208
*/
209
private def getClassFileInputStreamFromSparkRPC(path: String): InputStream
210
211
/**
212
* Fetch class file from Hadoop FileSystem
213
* @param fileSystem Configured Hadoop FileSystem instance
214
* @param pathInDirectory Relative path to class file within directory
215
* @return InputStream containing class bytecode
216
* @throws ClassNotFoundException if class file not found
217
*/
218
private def getClassFileInputStreamFromFileSystem(fileSystem: FileSystem)(
219
pathInDirectory: String): InputStream
220
221
/**
222
* Get class resource as stream from local REPL class server
223
* Handles .class files generated by REPL for resource loading
224
* @param name Resource name/path
225
* @return InputStream or null if not found locally
226
*/
227
private def getClassResourceAsStreamLocally(name: String): InputStream
228
```
229
230
### Utility Methods
231
232
Utility methods for URL encoding and string manipulation.
233
234
```scala { .api }
235
/**
236
* URL-encode string while preserving forward slashes
237
* Used for encoding class names in HTTP requests
238
* @param str String to encode
239
* @return URL-encoded string with slashes preserved
240
*/
241
def urlEncode(str: String): String
242
```
243
244
**Usage Examples:**
245
246
```scala
247
// Encode class names for HTTP requests
248
val className = "com.example.MyClass$InnerClass"
249
val encoded = execLoader.urlEncode(className)
250
println(s"Encoded: $encoded") // "com.example.MyClass%24InnerClass"
251
252
// Slashes are preserved for path components
253
val path = "package/MyClass.class"
254
val encodedPath = execLoader.urlEncode(path)
255
println(s"Encoded path: $encodedPath") // "package/MyClass.class"
256
```
257
258
## REPL Class Transformation
259
260
### ConstructorCleaner Class
261
262
ASM-based bytecode transformer that cleans up REPL wrapper class constructors for proper execution in distributed environments.
263
264
```scala { .api }
265
/**
266
* ASM ClassVisitor for cleaning REPL wrapper class constructors
267
* Removes problematic constructor code that prevents serialization
268
* @param className Name of class being transformed
269
* @param cv Delegate ClassVisitor for transformation chain
270
*/
271
class ConstructorCleaner(className: String, cv: ClassVisitor) extends ClassVisitor(ASM6, cv)
272
273
/**
274
* Visit and potentially transform method bytecode
275
* Specifically handles constructor methods to replace initialization code
276
* @param access Method access flags (ACC_PUBLIC, ACC_PRIVATE etc.)
277
* @param name Method name ("<init>" for constructors)
278
* @param desc Method descriptor (e.g., "()V" for no-arg void method)
279
* @param sig Generic signature (may be null)
280
* @param exceptions Exception types that method can throw (may be null)
281
* @return MethodVisitor for method transformation, or null to skip method
282
*/
283
override def visitMethod(
284
access: Int,
285
name: String,
286
desc: String,
287
sig: String,
288
exceptions: Array[String]
289
): MethodVisitor
290
```
291
292
**Implementation Details:**
293
294
The ConstructorCleaner addresses issues with REPL-generated wrapper classes:
295
296
1. **Constructor Cleaning**: Removes problematic initialization code from constructors
297
2. **Serialization Support**: Ensures classes can be serialized for cluster distribution
298
3. **Bytecode Transformation**: Uses ASM library for safe bytecode manipulation
299
4. **REPL Integration**: Automatically applied when loading REPL-generated classes
300
301
## Class Loading Strategy
302
303
The ExecutorClassLoader implements a sophisticated loading strategy:
304
305
### Loading Order
306
307
1. **Bootstrap Classes**: System classes loaded by bootstrap class loader
308
2. **User Classes** (if userClassPathFirst=true): User-provided classes
309
3. **Parent Delegation**: Classes from parent class loader
310
4. **Local Cache**: Previously loaded classes from local cache
311
5. **Remote Fetch**: HTTP fetch from driver's class server
312
6. **User Classes** (if userClassPathFirst=false): User-provided classes as fallback
313
314
### Caching Behavior
315
316
- **Local Caching**: Classes are cached after first load
317
- **Resource Caching**: Resources are cached to avoid repeated HTTP requests
318
- **Memory Management**: Cache uses weak references to allow garbage collection
319
- **Cache Invalidation**: No explicit invalidation - relies on class loader lifecycle
320
321
### Error Handling
322
323
The class loader handles various error conditions:
324
325
- **Network Failures**: Timeout and connection errors during HTTP fetch
326
- **Class Format Errors**: Invalid bytecode or transformation failures
327
- **Security Restrictions**: Access denied or security manager restrictions
328
- **Resource Not Found**: Missing classes or resources
329
330
**Error Examples:**
331
332
```scala
333
try {
334
val clazz = execLoader.findClass("NonExistentClass")
335
} catch {
336
case _: ClassNotFoundException =>
337
println("Class not found locally or remotely")
338
case e: IOException =>
339
println(s"Network error loading class: ${e.getMessage}")
340
}
341
```
342
343
## Thread Safety and Performance
344
345
### Thread Safety
346
347
- ExecutorClassLoader instances are thread-safe for concurrent class loading
348
- Internal synchronization prevents race conditions during class definition
349
- Resource loading is thread-safe with proper synchronization
350
- Cache access is synchronized to prevent corruption
351
352
### Performance Considerations
353
354
- **HTTP Connection Pooling**: Reuses connections when possible
355
- **Compression**: Supports gzip compression for class transfer
356
- **Timeout Configuration**: Configurable timeouts prevent hanging
357
- **Local Caching**: Avoids repeated network requests for same classes
358
359
**Performance Tuning:**
360
361
```scala
362
// Configure HTTP timeout for faster failure detection
363
val conf = new SparkConf()
364
conf.set("spark.repl.class.httpTimeout", "30000") // 30 seconds
365
366
// Use connection pooling for better performance
367
conf.set("spark.repl.class.maxConnections", "10")
368
```