or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

auto-completion.mdcode-interpreter.mddistributed-class-loading.mdindex.mdinteractive-shell.md

distributed-class-loading.mddocs/

0

# Distributed Class Loading

1

2

The Distributed Class Loading system enables loading REPL-compiled classes on Spark executors across the cluster. It supports fetching classes from various sources including HTTP servers, Hadoop file systems, and Spark RPC endpoints.

3

4

## Capabilities

5

6

### ExecutorClassLoader Class

7

8

The main class loader for distributing REPL-defined classes to Spark executors.

9

10

```scala { .api }

11

/**

12

* ClassLoader for loading REPL-defined classes on executors from various sources

13

* @param conf SparkConf configuration

14

* @param env SparkEnv environment

15

* @param classUri URI where classes can be fetched (HTTP, HDFS, or Spark RPC)

16

* @param parent Parent ClassLoader

17

* @param userClassPathFirst Whether to prioritize user classpath over system classpath

18

*/

19

class ExecutorClassLoader(

20

conf: SparkConf,

21

env: SparkEnv,

22

classUri: String,

23

parent: ClassLoader,

24

userClassPathFirst: Boolean

25

) extends ClassLoader(parent) {

26

27

/**

28

* Find and load a class by name

29

* @param name Fully qualified class name

30

* @return Loaded Class instance

31

* @throws ClassNotFoundException if class cannot be found

32

*/

33

override def findClass(name: String): Class[_]

34

35

/**

36

* Get a resource by name from the class URI

37

* @param name Resource name

38

* @return URL to the resource or null if not found

39

*/

40

override def getResource(name: String): URL

41

42

/**

43

* Get all resources with a given name

44

* @param name Resource name

45

* @return Enumeration of URLs for matching resources

46

*/

47

override def getResources(name: String): java.util.Enumeration[URL]

48

}

49

```

50

51

**Usage Examples:**

52

53

```scala

54

import org.apache.spark.repl.ExecutorClassLoader

55

import org.apache.spark.{SparkConf, SparkEnv}

56

57

// Create configuration

58

val conf = new SparkConf()

59

val env = SparkEnv.get // Get current Spark environment

60

61

// Create class loader for HTTP-based class serving

62

val httpClassLoader = new ExecutorClassLoader(

63

conf = conf,

64

env = env,

65

classUri = "http://driver-host:12345/classes",

66

parent = Thread.currentThread().getContextClassLoader,

67

userClassPathFirst = true

68

)

69

70

// Load a REPL-defined class on executor

71

val className = "org.apache.spark.repl.ExecutorClassLoader$$anonfun$1"

72

val loadedClass = httpClassLoader.findClass(className)

73

println(s"Loaded class: ${loadedClass.getName}")

74

```

75

76

### Local Class Finding

77

78

Methods for finding classes locally before attempting remote fetch.

79

80

```scala { .api }

81

/**

82

* Find a class locally without remote fetching

83

* @param name Fully qualified class name

84

* @return Optional Class instance if found locally

85

*/

86

def findClassLocally(name: String): Option[Class[_]]

87

```

88

89

**Usage Example:**

90

91

```scala

92

val classLoader = new ExecutorClassLoader(conf, env, classUri, parent, true)

93

94

// Try to find class locally first

95

val localClass = classLoader.findClassLocally("com.example.LocalClass")

96

localClass match {

97

case Some(clazz) => println(s"Found locally: ${clazz.getName}")

98

case None => println("Class not available locally, will fetch remotely")

99

}

100

```

101

102

### Class Transformation

103

104

Methods for reading and transforming class bytecode during loading.

105

106

```scala { .api }

107

/**

108

* Read and transform class bytes from input stream

109

* @param name Class name for transformation context

110

* @param in InputStream containing class bytecode

111

* @return Transformed class bytecode

112

*/

113

def readAndTransformClass(name: String, in: InputStream): Array[Byte]

114

```

115

116

**Usage Example:**

117

118

```scala

119

import java.io.{FileInputStream, InputStream}

120

121

val classLoader = new ExecutorClassLoader(conf, env, classUri, parent, true)

122

123

// Read and transform class file

124

val classFile = new FileInputStream("/path/to/MyClass.class")

125

val transformedBytes = classLoader.readAndTransformClass(

126

"com.example.MyClass",

127

classFile

128

)

129

130

println(s"Transformed class size: ${transformedBytes.length} bytes")

131

classFile.close()

132

```

133

134

### URL Encoding Utilities

135

136

Utility methods for handling URL encoding in distributed environments.

137

138

```scala { .api }

139

/**

140

* URL-encode a string while preserving forward slashes

141

* @param str String to encode

142

* @return URL-encoded string with slashes preserved

143

*/

144

def urlEncode(str: String): String

145

```

146

147

**Usage Example:**

148

149

```scala

150

val classLoader = new ExecutorClassLoader(conf, env, classUri, parent, true)

151

152

// Encode class name for URL usage

153

val className = "com.example.MyClass$InnerClass"

154

val encodedName = classLoader.urlEncode(className)

155

println(s"Encoded class name: $encodedName")

156

157

// Can be used in HTTP requests

158

val classUrl = s"$classUri/${encodedName.replace('.', '/')}.class"

159

```

160

161

### Signal Handling

162

163

Utilities for handling interrupts and signals in REPL environment.

164

165

```scala { .api }

166

/**

167

* Signal handling utilities for the REPL

168

*/

169

object Signaling {

170

/**

171

* Register SIGINT handler for job cancellation

172

* Sets up interrupt handling to cancel running Spark jobs

173

*/

174

def cancelOnInterrupt(): Unit

175

}

176

```

177

178

**Usage Example:**

179

180

```scala

181

import org.apache.spark.repl.Signaling

182

183

// Set up interrupt handling for REPL

184

Signaling.cancelOnInterrupt()

185

186

// Now CTRL+C will properly cancel running Spark jobs

187

// instead of terminating the entire REPL session

188

```

189

190

## Class Loading Strategies

191

192

### HTTP-Based Class Serving

193

194

The most common approach for distributing REPL classes is through HTTP serving from the driver.

195

196

```scala

197

import org.apache.spark.repl.ExecutorClassLoader

198

import org.apache.spark.{SparkConf, SparkEnv}

199

200

// Configuration for HTTP class serving

201

val conf = new SparkConf()

202

.set("spark.repl.class.uri", "http://driver-host:12345/classes")

203

204

val env = SparkEnv.get

205

val httpUri = conf.get("spark.repl.class.uri")

206

207

val classLoader = new ExecutorClassLoader(

208

conf = conf,

209

env = env,

210

classUri = httpUri,

211

parent = Thread.currentThread().getContextClassLoader,

212

userClassPathFirst = true

213

)

214

215

// Classes will be fetched via HTTP when needed

216

val replClass = classLoader.findClass("$line1.$read$$iw$$iw$MyFunction")

217

```

218

219

### HDFS-Based Class Distribution

220

221

For clusters with shared storage, classes can be distributed via HDFS.

222

223

```scala

224

import org.apache.spark.repl.ExecutorClassLoader

225

226

val conf = new SparkConf()

227

.set("spark.repl.class.uri", "hdfs://namenode:9000/spark-repl/classes")

228

229

val hdfsClassLoader = new ExecutorClassLoader(

230

conf = conf,

231

env = SparkEnv.get,

232

classUri = "hdfs://namenode:9000/spark-repl/classes",

233

parent = Thread.currentThread().getContextClassLoader,

234

userClassPathFirst = true

235

)

236

237

// Classes will be fetched from HDFS

238

val distributedClass = hdfsClassLoader.findClass("$line5.$read$$iw$$iw$ProcessData")

239

```

240

241

### Spark RPC-Based Class Loading

242

243

Advanced setups can use Spark's internal RPC system for class distribution.

244

245

```scala

246

import org.apache.spark.repl.ExecutorClassLoader

247

248

val rpcClassLoader = new ExecutorClassLoader(

249

conf = new SparkConf(),

250

env = SparkEnv.get,

251

classUri = "spark-rpc://driver:7077/class-server",

252

parent = Thread.currentThread().getContextClassLoader,

253

userClassPathFirst = false // System classpath first for RPC

254

)

255

256

// Classes fetched via Spark RPC

257

val rpcClass = rpcClassLoader.findClass("$line3.$read$$iw$$iw$DataTransformer")

258

```

259

260

## Integration Patterns

261

262

### Setting Up Distributed Class Loading

263

264

```scala

265

import org.apache.spark.repl.{SparkIMain, ExecutorClassLoader}

266

import org.apache.spark.{SparkConf, SparkContext, SparkEnv}

267

268

// Create Spark configuration with class server

269

val conf = new SparkConf()

270

.setAppName("REPL with Distributed Classes")

271

.setMaster("spark://cluster:7077")

272

.set("spark.repl.class.outputDir", "/tmp/spark-repl-classes")

273

.set("spark.repl.class.uri", "http://driver:12345/classes")

274

275

// Initialize Spark context

276

val sc = new SparkContext(conf)

277

278

// Create interpreter that will compile classes to shared location

279

val interpreter = new SparkIMain()

280

interpreter.initializeSynchronous()

281

282

// Define function in REPL

283

interpreter.interpret("""

284

def processData(data: Array[Int]): Array[Int] = {

285

data.map(_ * 2).filter(_ > 10)

286

}

287

""")

288

289

// Create RDD that uses REPL-defined function

290

val rdd = sc.parallelize(1 to 100)

291

val processed = rdd.map(processData) // Function distributed automatically

292

293

val results = processed.collect()

294

println(s"Processed ${results.length} elements")

295

```

296

297

### Custom Class Transformation

298

299

```scala

300

import org.apache.spark.repl.ExecutorClassLoader

301

import java.io.{ByteArrayInputStream, InputStream}

302

303

// Custom class loader with transformation logic

304

class CustomExecutorClassLoader(

305

conf: SparkConf,

306

env: SparkEnv,

307

classUri: String,

308

parent: ClassLoader,

309

userClassPathFirst: Boolean

310

) extends ExecutorClassLoader(conf, env, classUri, parent, userClassPathFirst) {

311

312

override def readAndTransformClass(name: String, in: InputStream): Array[Byte] = {

313

val originalBytes = super.readAndTransformClass(name, in)

314

315

// Apply custom transformations

316

if (name.contains("$$iw$$iw")) {

317

// Transform REPL-generated classes

318

transformReplClass(originalBytes)

319

} else {

320

originalBytes

321

}

322

}

323

324

private def transformReplClass(bytes: Array[Byte]): Array[Byte] = {

325

// Custom bytecode transformation logic

326

// For example: add debugging information, optimize code, etc.

327

bytes // Return transformed bytes

328

}

329

}

330

```

331

332

### Error Handling and Fallbacks

333

334

```scala

335

import org.apache.spark.repl.ExecutorClassLoader

336

import scala.util.{Try, Success, Failure}

337

338

class RobustExecutorClassLoader(

339

conf: SparkConf,

340

env: SparkEnv,

341

classUri: String,

342

parent: ClassLoader,

343

userClassPathFirst: Boolean

344

) extends ExecutorClassLoader(conf, env, classUri, parent, userClassPathFirst) {

345

346

override def findClass(name: String): Class[_] = {

347

// Try local first

348

findClassLocally(name) match {

349

case Some(clazz) => clazz

350

case None =>

351

// Try remote with error handling

352

Try(super.findClass(name)) match {

353

case Success(clazz) => clazz

354

case Failure(exception) =>

355

// Log error and try fallback strategies

356

logClassLoadFailure(name, exception)

357

tryFallbackStrategies(name)

358

}

359

}

360

}

361

362

private def logClassLoadFailure(name: String, error: Throwable): Unit = {

363

println(s"Failed to load class $name: ${error.getMessage}")

364

}

365

366

private def tryFallbackStrategies(name: String): Class[_] = {

367

// Implement fallback strategies

368

// 1. Try alternative class URIs

369

// 2. Check if class is available in system classpath

370

// 3. Generate stub class if necessary

371

throw new ClassNotFoundException(s"Could not load class $name with any strategy")

372

}

373

}

374

```