0
# Distributed Class Loading
1
2
The Distributed Class Loading system enables loading REPL-compiled classes on Spark executors across the cluster. It supports fetching classes from various sources including HTTP servers, Hadoop file systems, and Spark RPC endpoints.
3
4
## Capabilities
5
6
### ExecutorClassLoader Class
7
8
The main class loader for distributing REPL-defined classes to Spark executors.
9
10
```scala { .api }
11
/**
12
* ClassLoader for loading REPL-defined classes on executors from various sources
13
* @param conf SparkConf configuration
14
* @param env SparkEnv environment
15
* @param classUri URI where classes can be fetched (HTTP, HDFS, or Spark RPC)
16
* @param parent Parent ClassLoader
17
* @param userClassPathFirst Whether to prioritize user classpath over system classpath
18
*/
19
class ExecutorClassLoader(
20
conf: SparkConf,
21
env: SparkEnv,
22
classUri: String,
23
parent: ClassLoader,
24
userClassPathFirst: Boolean
25
) extends ClassLoader(parent) {
26
27
/**
28
* Find and load a class by name
29
* @param name Fully qualified class name
30
* @return Loaded Class instance
31
* @throws ClassNotFoundException if class cannot be found
32
*/
33
override def findClass(name: String): Class[_]
34
35
/**
36
* Get a resource by name from the class URI
37
* @param name Resource name
38
* @return URL to the resource or null if not found
39
*/
40
override def getResource(name: String): URL
41
42
/**
43
* Get all resources with a given name
44
* @param name Resource name
45
* @return Enumeration of URLs for matching resources
46
*/
47
override def getResources(name: String): java.util.Enumeration[URL]
48
}
49
```
50
51
**Usage Examples:**
52
53
```scala
54
import org.apache.spark.repl.ExecutorClassLoader
55
import org.apache.spark.{SparkConf, SparkEnv}
56
57
// Create configuration
58
val conf = new SparkConf()
59
val env = SparkEnv.get // Get current Spark environment
60
61
// Create class loader for HTTP-based class serving
62
val httpClassLoader = new ExecutorClassLoader(
63
conf = conf,
64
env = env,
65
classUri = "http://driver-host:12345/classes",
66
parent = Thread.currentThread().getContextClassLoader,
67
userClassPathFirst = true
68
)
69
70
// Load a REPL-defined class on executor
71
val className = "org.apache.spark.repl.ExecutorClassLoader$$anonfun$1"
72
val loadedClass = httpClassLoader.findClass(className)
73
println(s"Loaded class: ${loadedClass.getName}")
74
```
75
76
### Local Class Finding
77
78
Methods for finding classes locally before attempting remote fetch.
79
80
```scala { .api }
81
/**
82
* Find a class locally without remote fetching
83
* @param name Fully qualified class name
84
* @return Optional Class instance if found locally
85
*/
86
def findClassLocally(name: String): Option[Class[_]]
87
```
88
89
**Usage Example:**
90
91
```scala
92
val classLoader = new ExecutorClassLoader(conf, env, classUri, parent, true)
93
94
// Try to find class locally first
95
val localClass = classLoader.findClassLocally("com.example.LocalClass")
96
localClass match {
97
case Some(clazz) => println(s"Found locally: ${clazz.getName}")
98
case None => println("Class not available locally, will fetch remotely")
99
}
100
```
101
102
### Class Transformation
103
104
Methods for reading and transforming class bytecode during loading.
105
106
```scala { .api }
107
/**
108
* Read and transform class bytes from input stream
109
* @param name Class name for transformation context
110
* @param in InputStream containing class bytecode
111
* @return Transformed class bytecode
112
*/
113
def readAndTransformClass(name: String, in: InputStream): Array[Byte]
114
```
115
116
**Usage Example:**
117
118
```scala
119
import java.io.{FileInputStream, InputStream}
120
121
val classLoader = new ExecutorClassLoader(conf, env, classUri, parent, true)
122
123
// Read and transform class file
124
val classFile = new FileInputStream("/path/to/MyClass.class")
125
val transformedBytes = classLoader.readAndTransformClass(
126
"com.example.MyClass",
127
classFile
128
)
129
130
println(s"Transformed class size: ${transformedBytes.length} bytes")
131
classFile.close()
132
```
133
134
### URL Encoding Utilities
135
136
Utility methods for handling URL encoding in distributed environments.
137
138
```scala { .api }
139
/**
140
* URL-encode a string while preserving forward slashes
141
* @param str String to encode
142
* @return URL-encoded string with slashes preserved
143
*/
144
def urlEncode(str: String): String
145
```
146
147
**Usage Example:**
148
149
```scala
150
val classLoader = new ExecutorClassLoader(conf, env, classUri, parent, true)
151
152
// Encode class name for URL usage
153
val className = "com.example.MyClass$InnerClass"
154
val encodedName = classLoader.urlEncode(className)
155
println(s"Encoded class name: $encodedName")
156
157
// Can be used in HTTP requests
158
val classUrl = s"$classUri/${encodedName.replace('.', '/')}.class"
159
```
160
161
### Signal Handling
162
163
Utilities for handling interrupts and signals in REPL environment.
164
165
```scala { .api }
166
/**
167
* Signal handling utilities for the REPL
168
*/
169
object Signaling {
170
/**
171
* Register SIGINT handler for job cancellation
172
* Sets up interrupt handling to cancel running Spark jobs
173
*/
174
def cancelOnInterrupt(): Unit
175
}
176
```
177
178
**Usage Example:**
179
180
```scala
181
import org.apache.spark.repl.Signaling
182
183
// Set up interrupt handling for REPL
184
Signaling.cancelOnInterrupt()
185
186
// Now CTRL+C will properly cancel running Spark jobs
187
// instead of terminating the entire REPL session
188
```
189
190
## Class Loading Strategies
191
192
### HTTP-Based Class Serving
193
194
The most common approach for distributing REPL classes is through HTTP serving from the driver.
195
196
```scala
197
import org.apache.spark.repl.ExecutorClassLoader
198
import org.apache.spark.{SparkConf, SparkEnv}
199
200
// Configuration for HTTP class serving
201
val conf = new SparkConf()
202
.set("spark.repl.class.uri", "http://driver-host:12345/classes")
203
204
val env = SparkEnv.get
205
val httpUri = conf.get("spark.repl.class.uri")
206
207
val classLoader = new ExecutorClassLoader(
208
conf = conf,
209
env = env,
210
classUri = httpUri,
211
parent = Thread.currentThread().getContextClassLoader,
212
userClassPathFirst = true
213
)
214
215
// Classes will be fetched via HTTP when needed
216
val replClass = classLoader.findClass("$line1.$read$$iw$$iw$MyFunction")
217
```
218
219
### HDFS-Based Class Distribution
220
221
For clusters with shared storage, classes can be distributed via HDFS.
222
223
```scala
224
import org.apache.spark.repl.ExecutorClassLoader
225
226
val conf = new SparkConf()
227
.set("spark.repl.class.uri", "hdfs://namenode:9000/spark-repl/classes")
228
229
val hdfsClassLoader = new ExecutorClassLoader(
230
conf = conf,
231
env = SparkEnv.get,
232
classUri = "hdfs://namenode:9000/spark-repl/classes",
233
parent = Thread.currentThread().getContextClassLoader,
234
userClassPathFirst = true
235
)
236
237
// Classes will be fetched from HDFS
238
val distributedClass = hdfsClassLoader.findClass("$line5.$read$$iw$$iw$ProcessData")
239
```
240
241
### Spark RPC-Based Class Loading
242
243
Advanced setups can use Spark's internal RPC system for class distribution.
244
245
```scala
246
import org.apache.spark.repl.ExecutorClassLoader
247
248
val rpcClassLoader = new ExecutorClassLoader(
249
conf = new SparkConf(),
250
env = SparkEnv.get,
251
classUri = "spark-rpc://driver:7077/class-server",
252
parent = Thread.currentThread().getContextClassLoader,
253
userClassPathFirst = false // System classpath first for RPC
254
)
255
256
// Classes fetched via Spark RPC
257
val rpcClass = rpcClassLoader.findClass("$line3.$read$$iw$$iw$DataTransformer")
258
```
259
260
## Integration Patterns
261
262
### Setting Up Distributed Class Loading
263
264
```scala
265
import org.apache.spark.repl.{SparkIMain, ExecutorClassLoader}
266
import org.apache.spark.{SparkConf, SparkContext, SparkEnv}
267
268
// Create Spark configuration with class server
269
val conf = new SparkConf()
270
.setAppName("REPL with Distributed Classes")
271
.setMaster("spark://cluster:7077")
272
.set("spark.repl.class.outputDir", "/tmp/spark-repl-classes")
273
.set("spark.repl.class.uri", "http://driver:12345/classes")
274
275
// Initialize Spark context
276
val sc = new SparkContext(conf)
277
278
// Create interpreter that will compile classes to shared location
279
val interpreter = new SparkIMain()
280
interpreter.initializeSynchronous()
281
282
// Define function in REPL
283
interpreter.interpret("""
284
def processData(data: Array[Int]): Array[Int] = {
285
data.map(_ * 2).filter(_ > 10)
286
}
287
""")
288
289
// Create RDD that uses REPL-defined function
290
val rdd = sc.parallelize(1 to 100)
291
val processed = rdd.map(processData) // Function distributed automatically
292
293
val results = processed.collect()
294
println(s"Processed ${results.length} elements")
295
```
296
297
### Custom Class Transformation
298
299
```scala
300
import org.apache.spark.repl.ExecutorClassLoader
301
import java.io.{ByteArrayInputStream, InputStream}
302
303
// Custom class loader with transformation logic
304
class CustomExecutorClassLoader(
305
conf: SparkConf,
306
env: SparkEnv,
307
classUri: String,
308
parent: ClassLoader,
309
userClassPathFirst: Boolean
310
) extends ExecutorClassLoader(conf, env, classUri, parent, userClassPathFirst) {
311
312
override def readAndTransformClass(name: String, in: InputStream): Array[Byte] = {
313
val originalBytes = super.readAndTransformClass(name, in)
314
315
// Apply custom transformations
316
if (name.contains("$$iw$$iw")) {
317
// Transform REPL-generated classes
318
transformReplClass(originalBytes)
319
} else {
320
originalBytes
321
}
322
}
323
324
private def transformReplClass(bytes: Array[Byte]): Array[Byte] = {
325
// Custom bytecode transformation logic
326
// For example: add debugging information, optimize code, etc.
327
bytes // Return transformed bytes
328
}
329
}
330
```
331
332
### Error Handling and Fallbacks
333
334
```scala
335
import org.apache.spark.repl.ExecutorClassLoader
336
import scala.util.{Try, Success, Failure}
337
338
class RobustExecutorClassLoader(
339
conf: SparkConf,
340
env: SparkEnv,
341
classUri: String,
342
parent: ClassLoader,
343
userClassPathFirst: Boolean
344
) extends ExecutorClassLoader(conf, env, classUri, parent, userClassPathFirst) {
345
346
override def findClass(name: String): Class[_] = {
347
// Try local first
348
findClassLocally(name) match {
349
case Some(clazz) => clazz
350
case None =>
351
// Try remote with error handling
352
Try(super.findClass(name)) match {
353
case Success(clazz) => clazz
354
case Failure(exception) =>
355
// Log error and try fallback strategies
356
logClassLoadFailure(name, exception)
357
tryFallbackStrategies(name)
358
}
359
}
360
}
361
362
private def logClassLoadFailure(name: String, error: Throwable): Unit = {
363
println(s"Failed to load class $name: ${error.getMessage}")
364
}
365
366
private def tryFallbackStrategies(name: String): Class[_] = {
367
// Implement fallback strategies
368
// 1. Try alternative class URIs
369
// 2. Check if class is available in system classpath
370
// 3. Generate stub class if necessary
371
throw new ClassNotFoundException(s"Could not load class $name with any strategy")
372
}
373
}
374
```