0
# Distributed Class Loading
1
2
ClassLoader implementation for loading REPL-defined classes from Hadoop FileSystem or HTTP URIs, enabling distributed execution of user-defined code across Spark clusters with support for both local and remote class loading strategies.
3
4
## Capabilities
5
6
### ExecutorClassLoader Class
7
8
Custom ClassLoader that reads classes from distributed storage systems, allowing REPL-defined classes to be loaded on Spark executor nodes.
9
10
```scala { .api }
11
/**
12
* ClassLoader for reading classes from Hadoop FileSystem or HTTP URI
13
* @param conf SparkConf configuration
14
* @param classUri URI pointing to class storage location
15
* @param parent Parent ClassLoader
16
* @param userClassPathFirst Whether to check user classpath before parent
17
*/
18
class ExecutorClassLoader(
19
conf: SparkConf,
20
classUri: String,
21
parent: ClassLoader,
22
userClassPathFirst: Boolean
23
) extends ClassLoader with Logging
24
25
/**
26
* Class URI as parsed URI object
27
*/
28
val uri: URI
29
30
/**
31
* Directory path from URI
32
*/
33
val directory: String
34
35
/**
36
* Parent class loader wrapper
37
*/
38
val parentLoader: ParentClassLoader
39
40
/**
41
* Hadoop FileSystem instance for accessing remote classes
42
*/
43
var fileSystem: FileSystem
44
```
45
46
**Usage Examples:**
47
48
```scala
49
import org.apache.spark.repl.ExecutorClassLoader
50
import org.apache.spark.SparkConf
51
52
// Create for HTTP class loading
53
val conf = new SparkConf()
54
val httpLoader = new ExecutorClassLoader(
55
conf,
56
"http://driver-host:8080/classes/",
57
getClass.getClassLoader,
58
userClassPathFirst = true
59
)
60
61
// Create for HDFS class loading
62
val hdfsLoader = new ExecutorClassLoader(
63
conf,
64
"hdfs://cluster/spark-classes/",
65
getClass.getClassLoader,
66
userClassPathFirst = false
67
)
68
```
69
70
### Class Loading Methods
71
72
Methods for finding and loading classes from distributed storage.
73
74
```scala { .api }
75
/**
76
* Find class by name, checking both local and remote sources
77
* @param name Fully qualified class name
78
* @return Class instance
79
* @throws ClassNotFoundException if class not found
80
*/
81
override def findClass(name: String): Class[_]
82
83
/**
84
* Find class locally without remote lookup
85
* @param name Fully qualified class name
86
* @return Optional class instance
87
*/
88
def findClassLocally(name: String): Option[Class[_]]
89
```
90
91
**Usage Examples:**
92
93
```scala
94
// Load class from distributed storage
95
try {
96
val clazz = classLoader.findClass("com.example.ReplDefinedClass")
97
val instance = clazz.newInstance()
98
} catch {
99
case _: ClassNotFoundException => println("Class not found")
100
}
101
102
// Try local loading first
103
classLoader.findClassLocally("com.example.LocalClass") match {
104
case Some(clazz) => println(s"Found locally: ${clazz.getName}")
105
case None => println("Not available locally")
106
}
107
```
108
109
### Resource Loading Methods
110
111
Methods for loading resources from distributed storage systems.
112
113
```scala { .api }
114
/**
115
* Get resource URL from distributed storage
116
* @param name Resource name
117
* @return Resource URL or null if not found
118
*/
119
override def getResource(name: String): URL
120
121
/**
122
* Get enumeration of resource URLs
123
* @param name Resource name
124
* @return Enumeration of matching resource URLs
125
*/
126
override def getResources(name: String): java.util.Enumeration[URL]
127
```
128
129
**Usage Examples:**
130
131
```scala
132
// Load resource from distributed storage
133
val resourceUrl = classLoader.getResource("config.properties")
134
if (resourceUrl != null) {
135
val inputStream = resourceUrl.openStream()
136
// Process resource
137
}
138
139
// Load multiple resources
140
import scala.collection.JavaConverters._
141
val resources = classLoader.getResources("META-INF/services/provider").asScala
142
resources.foreach { url =>
143
println(s"Found service provider: $url")
144
}
145
```
146
147
### Bytecode Transformation
148
149
Methods for reading and transforming class bytecode during loading.
150
151
```scala { .api }
152
/**
153
* Read class bytecode and apply transformations
154
* @param name Class name
155
* @param in InputStream containing class bytes
156
* @return Transformed class bytecode
157
*/
158
def readAndTransformClass(name: String, in: InputStream): Array[Byte]
159
160
/**
161
* URL encode string for HTTP requests
162
* @param str String to encode
163
* @return URL-encoded string
164
*/
165
def urlEncode(str: String): String
166
```
167
168
**Usage Examples:**
169
170
```scala
171
import java.io.FileInputStream
172
173
// Transform class during loading
174
val inputStream = new FileInputStream("MyClass.class")
175
val transformedBytes = classLoader.readAndTransformClass(
176
"com.example.MyClass",
177
inputStream
178
)
179
180
// URL encoding for HTTP paths
181
val encodedPath = classLoader.urlEncode("com/example/My Class.class")
182
// Result: "com/example/My%20Class.class"
183
```
184
185
### ConstructorCleaner Class
186
187
ASM-based bytecode transformer that cleans REPL wrapper constructors to enable proper class loading in distributed environments.
188
189
```scala { .api }
190
/**
191
* ASM ClassVisitor for cleaning constructor bytecode
192
* @param className Name of class being transformed
193
* @param cv Underlying ClassVisitor
194
*/
195
class ConstructorCleaner(className: String, cv: ClassVisitor) extends ClassVisitor(ASM5, cv)
196
197
/**
198
* Visit and potentially transform method bytecode
199
* @param access Method access flags
200
* @param name Method name
201
* @param desc Method descriptor
202
* @param signature Method signature
203
* @param exceptions Exception types
204
* @return MethodVisitor for transformation
205
*/
206
override def visitMethod(
207
access: Int,
208
name: String,
209
desc: String,
210
signature: String,
211
exceptions: Array[String]
212
): MethodVisitor
213
```
214
215
**Usage Examples:**
216
217
```scala
218
import org.apache.xbean.asm5.{ClassReader, ClassWriter}
219
import org.apache.spark.repl.ConstructorCleaner
220
221
// Transform class bytecode
222
val classReader = new ClassReader(originalBytes)
223
val classWriter = new ClassWriter(classReader, ClassWriter.COMPUTE_FRAMES)
224
val cleaner = new ConstructorCleaner("com.example.MyClass", classWriter)
225
226
classReader.accept(cleaner, 0)
227
val cleanedBytes = classWriter.toByteArray
228
```
229
230
### Configuration and Setup
231
232
Methods for configuring HTTP timeouts and connection parameters.
233
234
```scala { .api }
235
/**
236
* HTTP connection timeout in milliseconds (package-private for testing)
237
*/
238
private[repl] var httpUrlConnectionTimeoutMillis: Int
239
```
240
241
**Usage Examples:**
242
243
```scala
244
// Configure HTTP timeout for testing
245
classLoader.httpUrlConnectionTimeoutMillis = 10000 // 10 seconds
246
247
// Access URI information
248
println(s"Loading classes from: ${classLoader.uri}")
249
println(s"Class directory: ${classLoader.directory}")
250
251
// Check file system
252
if (classLoader.fileSystem != null) {
253
println("Using Hadoop FileSystem for class loading")
254
} else {
255
println("Using HTTP for class loading")
256
}
257
```
258
259
## Integration with Spark REPL
260
261
The ExecutorClassLoader integrates with the Spark REPL system to enable distributed execution:
262
263
**REPL-defined class distribution:**
264
```scala
265
// Classes defined in REPL are automatically made available to executors
266
val interpreter = new SparkIMain()
267
interpreter.interpret("class MyUDF extends Function1[String, Int] { ... }")
268
269
// The ExecutorClassLoader ensures MyUDF is available on all executor nodes
270
val rdd = sparkContext.textFile("data.txt")
271
val result = rdd.map(new MyUDF()) // Works on all executors
272
```
273
274
**Custom class loading configuration:**
275
```scala
276
// Configure class URI for executor class loading
277
val conf = new SparkConf()
278
conf.set("spark.repl.class.uri", "http://driver:8080/classes/")
279
280
// ExecutorClassLoader will use this URI on executor nodes
281
val classLoader = new ExecutorClassLoader(
282
conf,
283
conf.get("spark.repl.class.uri"),
284
Thread.currentThread().getContextClassLoader,
285
userClassPathFirst = true
286
)
287
```