or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

class-loading.mdindex.mdinteractive-shell.mdmain-entry.mdscala-compatibility.mdsignal-handling.md

class-loading.mddocs/

0

# Distributed Class Loading

1

2

Custom class loader system for loading REPL-compiled classes on remote Spark executors with support for RPC and Hadoop filesystem access.

3

4

## Capabilities

5

6

### ExecutorClassLoader

7

8

A ClassLoader that reads classes from a Hadoop FileSystem or Spark RPC endpoint, used to load classes defined by the interpreter when the REPL is used.

9

10

```scala { .api }

11

/**

12

* A ClassLoader for reading REPL-compiled classes from remote sources

13

* Supports both Spark RPC and Hadoop FileSystem for class distribution

14

* @param conf Spark configuration

15

* @param env Spark environment for RPC access

16

* @param classUri URI pointing to class file location (spark:// or hdfs://)

17

* @param parent Parent class loader

18

* @param userClassPathFirst Whether to prioritize user classpath over parent

19

*/

20

class ExecutorClassLoader(

21

conf: SparkConf,

22

env: SparkEnv,

23

classUri: String,

24

parent: ClassLoader,

25

userClassPathFirst: Boolean

26

) extends ClassLoader(null) with Logging {

27

28

/** URI parsing and directory path extraction */

29

val uri: URI

30

val directory: String

31

32

/** Parent class loader wrapper */

33

val parentLoader: ParentClassLoader

34

35

/**

36

* Find and load a class by name

37

* Respects userClassPathFirst setting for load order

38

* @param name Fully qualified class name

39

* @return Loaded Class object

40

*/

41

override def findClass(name: String): Class[_]

42

43

/**

44

* Find a class locally from the REPL class server

45

* @param name Fully qualified class name

46

* @return Some(Class) if found, None otherwise

47

*/

48

def findClassLocally(name: String): Option[Class[_]]

49

50

/**

51

* Read and transform class bytecode

52

* Special handling for REPL wrapper classes

53

* @param name Class name

54

* @param in Input stream with class bytes

55

* @return Transformed class bytecode

56

*/

57

def readAndTransformClass(name: String, in: InputStream): Array[Byte]

58

59

/**

60

* URL-encode a string, preserving only slashes

61

* @param str String to encode

62

* @return URL-encoded string

63

*/

64

def urlEncode(str: String): String

65

66

/** Get resource by name (delegates to parent) */

67

override def getResource(name: String): URL

68

69

/** Get all resources by name (delegates to parent) */

70

override def getResources(name: String): java.util.Enumeration[URL]

71

72

/** Get resource as input stream with REPL class support */

73

override def getResourceAsStream(name: String): InputStream

74

}

75

```

76

77

**Usage Examples:**

78

79

```scala

80

import org.apache.spark.repl.ExecutorClassLoader

81

import org.apache.spark.{SparkConf, SparkEnv}

82

83

// Create class loader for RPC-based class loading

84

val conf = new SparkConf()

85

val env = SparkEnv.get

86

val classUri = "spark://driver-host:port/classes"

87

val parentClassLoader = Thread.currentThread().getContextClassLoader

88

val userClassPathFirst = false

89

90

val classLoader = new ExecutorClassLoader(

91

conf, env, classUri, parentClassLoader, userClassPathFirst

92

)

93

94

// Load a REPL-compiled class

95

val clazz = classLoader.findClass("line123$iw$MyClass")

96

97

// Check if class exists locally

98

val maybeClass = classLoader.findClassLocally("com.example.MyClass")

99

```

100

101

### ConstructorCleaner

102

103

ASM visitor for cleaning REPL wrapper class constructors to prevent initialization issues.

104

105

```scala { .api }

106

/**

107

* ASM ClassVisitor that cleans constructor initialization code

108

* from REPL-generated wrapper classes to prevent execution issues

109

* @param className Name of the class being cleaned

110

* @param cv Target ClassVisitor for output

111

*/

112

class ConstructorCleaner(className: String, cv: ClassVisitor) extends ClassVisitor(ASM6, cv) {

113

/**

114

* Visit and potentially modify method definitions

115

* Replaces constructor bodies for wrapper classes

116

* @param access Method access flags

117

* @param name Method name

118

* @param desc Method descriptor

119

* @param sig Method signature

120

* @param exceptions Exception types

121

* @return MethodVisitor for method processing

122

*/

123

override def visitMethod(

124

access: Int,

125

name: String,

126

desc: String,

127

sig: String,

128

exceptions: Array[String]

129

): MethodVisitor

130

}

131

```

132

133

## Class Loading Strategies

134

135

### Load Order Control

136

137

The `userClassPathFirst` parameter controls class loading precedence:

138

139

**User-First Loading (`userClassPathFirst = true`)**:

140

1. Check REPL class server first

141

2. Fall back to parent class loader

142

143

**Parent-First Loading (`userClassPathFirst = false`)**:

144

1. Try parent class loader first

145

2. Check REPL class server on ClassNotFoundException

146

147

### Remote Class Access

148

149

Two methods for accessing remote classes:

150

151

**Spark RPC Protocol (`spark://`)**:

152

```scala

153

private def getClassFileInputStreamFromSparkRPC(path: String): InputStream = {

154

val channel = env.rpcEnv.openChannel(s"$classUri/$path")

155

new FilterInputStream(Channels.newInputStream(channel)) {

156

// Error handling converts exceptions to ClassNotFoundException

157

}

158

}

159

```

160

161

**Hadoop FileSystem (`hdfs://`, `file://`, etc.)**:

162

```scala

163

private def getClassFileInputStreamFromFileSystem(fileSystem: FileSystem)(pathInDirectory: String): InputStream = {

164

val path = new Path(directory, pathInDirectory)

165

try {

166

fileSystem.open(path)

167

} catch {

168

case _: FileNotFoundException =>

169

throw new ClassNotFoundException(s"Class file not found at path $path")

170

}

171

}

172

```

173

174

## Bytecode Transformation

175

176

### REPL Wrapper Class Handling

177

178

Special transformation for REPL-generated wrapper classes:

179

180

```scala

181

def readAndTransformClass(name: String, in: InputStream): Array[Byte] = {

182

if (name.startsWith("line") && name.endsWith("$iw$")) {

183

// Transform wrapper class constructor

184

val cr = new ClassReader(in)

185

val cw = new ClassWriter(ClassWriter.COMPUTE_FRAMES + ClassWriter.COMPUTE_MAXS)

186

val cleaner = new ConstructorCleaner(name, cw)

187

cr.accept(cleaner, 0)

188

cw.toByteArray

189

} else {

190

// Pass through unchanged

191

// ... standard byte copying logic

192

}

193

}

194

```

195

196

### Constructor Cleaning

197

198

The `ConstructorCleaner` replaces constructor bodies with minimal initialization:

199

200

1. **Load `this` reference**

201

2. **Call `Object.<init>()`**

202

3. **Return immediately**

203

4. **Skip original initialization code**

204

205

This prevents execution of REPL initialization code that should run later through reflection.

206

207

## Resource Handling

208

209

### Class File Resources

210

211

Special handling for `.class` files as resources:

212

213

```scala

214

private def getClassResourceAsStreamLocally(name: String): InputStream = {

215

try {

216

if (name.endsWith(".class")) fetchFn(name) else null

217

} catch {

218

case _: ClassNotFoundException => null

219

}

220

}

221

```

222

223

### Resource Delegation

224

225

Non-class resources delegate to parent class loader:

226

227

- **getResource**: Returns parent's resource URL

228

- **getResources**: Returns parent's resource enumeration

229

- **getResourceAsStream**: Tries REPL classes first (if user-first), then parent

230

231

## Error Handling

232

233

### Class Loading Errors

234

235

```scala

236

// Class not found locally

237

case e: ClassNotFoundException =>

238

logDebug(s"Did not load class $name from REPL class server at $uri", e)

239

None

240

241

// General loading errors

242

case e: Exception =>

243

logError(s"Failed to check existence of class $name on REPL class server at $uri", e)

244

None

245

```

246

247

### RPC Channel Errors

248

249

RPC errors are converted to ClassNotFoundException:

250

251

```scala

252

private def toClassNotFound(fn: => Int): Int = {

253

try {

254

fn

255

} catch {

256

case e: Exception =>

257

throw new ClassNotFoundException(path, e)

258

}

259

}

260

```

261

262

### FileSystem Errors

263

264

FileSystem errors map to class loading failures:

265

266

```scala

267

try {

268

fileSystem.open(path)

269

} catch {

270

case _: FileNotFoundException =>

271

throw new ClassNotFoundException(s"Class file not found at path $path")

272

}

273

```

274

275

## Configuration

276

277

### HTTP Timeout Control

278

279

Testing/debugging timeout configuration:

280

281

```scala

282

/** HTTP connection timeout in milliseconds (testing/debugging) */

283

private[repl] var httpUrlConnectionTimeoutMillis: Int = -1

284

```

285

286

### URL Encoding

287

288

Path encoding for safe transmission:

289

290

```scala

291

def urlEncode(str: String): String = {

292

str.split('/').map(part => URLEncoder.encode(part, "UTF-8")).mkString("/")

293

}

294

```

295

296

## Integration Points

297

298

### Spark Environment Integration

299

300

- Uses `SparkEnv.rpcEnv` for RPC-based class loading

301

- Integrates with Spark's configuration system

302

- Leverages Hadoop utilities for FileSystem access

303

304

### Scala Compiler Integration

305

306

- Works with Scala's class loading architecture

307

- Handles REPL-specific class naming conventions

308

- Supports dynamic class compilation and loading

309

310

### Executor Integration

311

312

- Designed for use on Spark executors

313

- Handles distributed class distribution

314

- Manages parent class loader relationships correctly