or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

auto-completion.mdclass-loading.mdcode-interpretation.mdcommand-line.mdindex.mdinteractive-shell.md

class-loading.mddocs/

0

# Distributed Class Loading

1

2

ClassLoader implementation for loading REPL-defined classes from Hadoop FileSystem or HTTP URIs, enabling distributed execution of user-defined code across Spark clusters with support for both local and remote class loading strategies.

3

4

## Capabilities

5

6

### ExecutorClassLoader Class

7

8

Custom ClassLoader that reads classes from distributed storage systems, allowing REPL-defined classes to be loaded on Spark executor nodes.

9

10

```scala { .api }

11

/**

12

* ClassLoader for reading classes from Hadoop FileSystem or HTTP URI

13

* @param conf SparkConf configuration

14

* @param classUri URI pointing to class storage location

15

* @param parent Parent ClassLoader

16

* @param userClassPathFirst Whether to check user classpath before parent

17

*/

18

class ExecutorClassLoader(

19

conf: SparkConf,

20

classUri: String,

21

parent: ClassLoader,

22

userClassPathFirst: Boolean

23

) extends ClassLoader with Logging

24

25

/**

26

* Class URI as parsed URI object

27

*/

28

val uri: URI

29

30

/**

31

* Directory path from URI

32

*/

33

val directory: String

34

35

/**

36

* Parent class loader wrapper

37

*/

38

val parentLoader: ParentClassLoader

39

40

/**

41

* Hadoop FileSystem instance for accessing remote classes

42

*/

43

var fileSystem: FileSystem

44

```

45

46

**Usage Examples:**

47

48

```scala

49

import org.apache.spark.repl.ExecutorClassLoader

50

import org.apache.spark.SparkConf

51

52

// Create for HTTP class loading

53

val conf = new SparkConf()

54

val httpLoader = new ExecutorClassLoader(

55

conf,

56

"http://driver-host:8080/classes/",

57

getClass.getClassLoader,

58

userClassPathFirst = true

59

)

60

61

// Create for HDFS class loading

62

val hdfsLoader = new ExecutorClassLoader(

63

conf,

64

"hdfs://cluster/spark-classes/",

65

getClass.getClassLoader,

66

userClassPathFirst = false

67

)

68

```

69

70

### Class Loading Methods

71

72

Methods for finding and loading classes from distributed storage.

73

74

```scala { .api }

75

/**

76

* Find class by name, checking both local and remote sources

77

* @param name Fully qualified class name

78

* @return Class instance

79

* @throws ClassNotFoundException if class not found

80

*/

81

override def findClass(name: String): Class[_]

82

83

/**

84

* Find class locally without remote lookup

85

* @param name Fully qualified class name

86

* @return Optional class instance

87

*/

88

def findClassLocally(name: String): Option[Class[_]]

89

```

90

91

**Usage Examples:**

92

93

```scala

94

// Load class from distributed storage

95

try {

96

val clazz = classLoader.findClass("com.example.ReplDefinedClass")

97

val instance = clazz.newInstance()

98

} catch {

99

case _: ClassNotFoundException => println("Class not found")

100

}

101

102

// Try local loading first

103

classLoader.findClassLocally("com.example.LocalClass") match {

104

case Some(clazz) => println(s"Found locally: ${clazz.getName}")

105

case None => println("Not available locally")

106

}

107

```

108

109

### Resource Loading Methods

110

111

Methods for loading resources from distributed storage systems.

112

113

```scala { .api }

114

/**

115

* Get resource URL from distributed storage

116

* @param name Resource name

117

* @return Resource URL or null if not found

118

*/

119

override def getResource(name: String): URL

120

121

/**

122

* Get enumeration of resource URLs

123

* @param name Resource name

124

* @return Enumeration of matching resource URLs

125

*/

126

override def getResources(name: String): java.util.Enumeration[URL]

127

```

128

129

**Usage Examples:**

130

131

```scala

132

// Load resource from distributed storage

133

val resourceUrl = classLoader.getResource("config.properties")

134

if (resourceUrl != null) {

135

val inputStream = resourceUrl.openStream()

136

// Process resource

137

}

138

139

// Load multiple resources

140

import scala.collection.JavaConverters._

141

val resources = classLoader.getResources("META-INF/services/provider").asScala

142

resources.foreach { url =>

143

println(s"Found service provider: $url")

144

}

145

```

146

147

### Bytecode Transformation

148

149

Methods for reading and transforming class bytecode during loading.

150

151

```scala { .api }

152

/**

153

* Read class bytecode and apply transformations

154

* @param name Class name

155

* @param in InputStream containing class bytes

156

* @return Transformed class bytecode

157

*/

158

def readAndTransformClass(name: String, in: InputStream): Array[Byte]

159

160

/**

161

* URL encode string for HTTP requests

162

* @param str String to encode

163

* @return URL-encoded string

164

*/

165

def urlEncode(str: String): String

166

```

167

168

**Usage Examples:**

169

170

```scala

171

import java.io.FileInputStream

172

173

// Transform class during loading

174

val inputStream = new FileInputStream("MyClass.class")

175

val transformedBytes = classLoader.readAndTransformClass(

176

"com.example.MyClass",

177

inputStream

178

)

179

180

// URL encoding for HTTP paths

181

val encodedPath = classLoader.urlEncode("com/example/My Class.class")

182

// Result: "com/example/My%20Class.class"

183

```

184

185

### ConstructorCleaner Class

186

187

ASM-based bytecode transformer that cleans REPL wrapper constructors to enable proper class loading in distributed environments.

188

189

```scala { .api }

190

/**

191

* ASM ClassVisitor for cleaning constructor bytecode

192

* @param className Name of class being transformed

193

* @param cv Underlying ClassVisitor

194

*/

195

class ConstructorCleaner(className: String, cv: ClassVisitor) extends ClassVisitor(ASM5, cv)

196

197

/**

198

* Visit and potentially transform method bytecode

199

* @param access Method access flags

200

* @param name Method name

201

* @param desc Method descriptor

202

* @param signature Method signature

203

* @param exceptions Exception types

204

* @return MethodVisitor for transformation

205

*/

206

override def visitMethod(

207

access: Int,

208

name: String,

209

desc: String,

210

signature: String,

211

exceptions: Array[String]

212

): MethodVisitor

213

```

214

215

**Usage Examples:**

216

217

```scala

218

import org.apache.xbean.asm5.{ClassReader, ClassWriter}

219

import org.apache.spark.repl.ConstructorCleaner

220

221

// Transform class bytecode

222

val classReader = new ClassReader(originalBytes)

223

val classWriter = new ClassWriter(classReader, ClassWriter.COMPUTE_FRAMES)

224

val cleaner = new ConstructorCleaner("com.example.MyClass", classWriter)

225

226

classReader.accept(cleaner, 0)

227

val cleanedBytes = classWriter.toByteArray

228

```

229

230

### Configuration and Setup

231

232

Methods for configuring HTTP timeouts and connection parameters.

233

234

```scala { .api }

235

/**

236

* HTTP connection timeout in milliseconds (package-private for testing)

237

*/

238

private[repl] var httpUrlConnectionTimeoutMillis: Int

239

```

240

241

**Usage Examples:**

242

243

```scala

244

// Configure HTTP timeout for testing

245

classLoader.httpUrlConnectionTimeoutMillis = 10000 // 10 seconds

246

247

// Access URI information

248

println(s"Loading classes from: ${classLoader.uri}")

249

println(s"Class directory: ${classLoader.directory}")

250

251

// Check file system

252

if (classLoader.fileSystem != null) {

253

println("Using Hadoop FileSystem for class loading")

254

} else {

255

println("Using HTTP for class loading")

256

}

257

```

258

259

## Integration with Spark REPL

260

261

The ExecutorClassLoader integrates with the Spark REPL system to enable distributed execution:

262

263

**REPL-defined class distribution:**

264

```scala

265

// Classes defined in REPL are automatically made available to executors

266

val interpreter = new SparkIMain()

267

interpreter.interpret("class MyUDF extends Function1[String, Int] { ... }")

268

269

// The ExecutorClassLoader ensures MyUDF is available on all executor nodes

270

val rdd = sparkContext.textFile("data.txt")

271

val result = rdd.map(new MyUDF()) // Works on all executors

272

```

273

274

**Custom class loading configuration:**

275

```scala

276

// Configure class URI for executor class loading

277

val conf = new SparkConf()

278

conf.set("spark.repl.class.uri", "http://driver:8080/classes/")

279

280

// ExecutorClassLoader will use this URI on executor nodes

281

val classLoader = new ExecutorClassLoader(

282

conf,

283

conf.get("spark.repl.class.uri"),

284

Thread.currentThread().getContextClassLoader,

285

userClassPathFirst = true

286

)

287

```