or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

artifact-management.mdconfiguration.mdindex.mdmonitoring-ui.mdplan-processing.mdplugin-system.mdserver-management.mdsession-management.md

artifact-management.mddocs/

0

# Artifact Management

1

2

The artifact management system handles JAR files, Python packages, and other resources uploaded by clients. It provides session-isolated storage, dynamic class loading, and secure artifact handling.

3

4

## Core Artifact Components

5

6

### SparkConnectArtifactManager

7

8

Main class for managing artifacts within a session.

9

10

```scala { .api }

11

class SparkConnectArtifactManager(sessionHolder: SessionHolder) {

12

def getSparkConnectAddedJars: Seq[URL]

13

def getSparkConnectPythonIncludes: Seq[String]

14

def classloader: ClassLoader

15

def addArtifact(remoteRelativePath: Path, serverLocalStagingPath: Path, fragment: Option[String]): Unit

16

def artifactPath: Path

17

}

18

```

19

20

**Key Methods:**

21

- `getSparkConnectAddedJars`: Get all JARs added to this session

22

- `getSparkConnectPythonIncludes`: Get Python packages and modules

23

- `classloader`: Get session-specific class loader

24

- `addArtifact`: Add artifact from local staging path to session

25

- `artifactPath`: Get the base path for session artifacts

26

27

### ArtifactUtils

28

29

Utility functions for artifact processing and validation.

30

31

```scala { .api }

32

object ArtifactUtils {

33

def validateJar(jarBytes: Array[Byte]): Boolean

34

def extractJarMetadata(jarBytes: Array[Byte]): JarMetadata

35

def computeChecksum(artifactBytes: Array[Byte]): String

36

def isValidArtifactName(name: String): Boolean

37

}

38

```

39

40

**Key Methods:**

41

- `validateJar`: Validate JAR file format and contents

42

- `extractJarMetadata`: Extract manifest and dependency information

43

- `computeChecksum`: Generate artifact checksums for integrity

44

- `isValidArtifactName`: Validate artifact naming conventions

45

46

## Artifact Types

47

48

### JAR Files

49

50

JAR files contain compiled Java/Scala classes and can be uploaded for use in user code.

51

52

```scala { .api }

53

case class JarArtifact(

54

name: String,

55

checksum: String,

56

size: Long,

57

uploadTime: Long,

58

metadata: JarMetadata

59

)

60

61

case class JarMetadata(

62

manifestVersion: String,

63

mainClass: Option[String],

64

dependencies: Seq[String],

65

exportedPackages: Seq[String]

66

)

67

```

68

69

### Python Packages

70

71

Python packages and modules for use in PySpark operations.

72

73

```scala { .api }

74

case class PythonArtifact(

75

name: String,

76

packageType: PythonPackageType,

77

checksum: String,

78

size: Long,

79

uploadTime: Long

80

)

81

82

sealed trait PythonPackageType

83

case object WheelPackage extends PythonPackageType

84

case object EggPackage extends PythonPackageType

85

case object SourcePackage extends PythonPackageType

86

```

87

88

### Generic Files

89

90

Other file types that may be needed by user applications.

91

92

```scala { .api }

93

case class FileArtifact(

94

name: String,

95

mimeType: String,

96

checksum: String,

97

size: Long,

98

uploadTime: Long

99

)

100

```

101

102

## Artifact Request Handlers

103

104

### SparkConnectAddArtifactsHandler

105

106

Handles streaming artifact upload requests.

107

108

```scala { .api }

109

class SparkConnectAddArtifactsHandler(

110

responseObserver: StreamObserver[proto.AddArtifactsResponse]

111

) extends StreamObserver[proto.AddArtifactsRequest] {

112

def onNext(request: proto.AddArtifactsRequest): Unit

113

def onError(t: Throwable): Unit

114

def onCompleted(): Unit

115

}

116

```

117

118

### SparkConnectArtifactStatusesHandler

119

120

Handles artifact status and metadata queries.

121

122

```scala { .api }

123

class SparkConnectArtifactStatusesHandler(

124

responseObserver: StreamObserver[proto.ArtifactStatusesResponse]

125

) {

126

def handle(request: proto.ArtifactStatusesRequest): Unit

127

}

128

```

129

130

## Usage Examples

131

132

### Uploading JAR Files

133

134

```scala

135

import org.apache.spark.sql.connect.artifact.SparkConnectArtifactManager

136

import org.apache.spark.connect.proto

137

138

// Get artifact manager for session

139

val artifactManager = sessionHolder.artifactManager

140

141

// Create artifact chunk for upload

142

val jarBytes: Array[Byte] = // ... JAR file contents

143

val artifactChunk = proto.AddArtifactsRequest.ArtifactChunk.newBuilder()

144

.setName("my-library.jar")

145

.setData(ByteString.copyFrom(jarBytes))

146

.build()

147

148

// Add artifact to session

149

artifactManager.addArtifact(artifactChunk)

150

151

// Verify artifact was added

152

val addedJars = artifactManager.getSparkConnectAddedJars

153

println(s"Session has ${addedJars.length} JARs")

154

```

155

156

### Using Custom Classes

157

158

```scala

159

import org.apache.spark.sql.connect.artifact.SparkConnectArtifactManager

160

161

// Get session class loader with uploaded JARs

162

val classLoader = artifactManager.classloader

163

164

// Load custom class from uploaded JAR

165

val customClass = classLoader.loadClass("com.mycompany.MyCustomClass")

166

val instance = customClass.getDeclaredConstructor().newInstance()

167

168

// Use in Spark operations

169

val spark = sessionHolder.session

170

import spark.implicits._

171

172

val df = spark.range(100).map { i =>

173

// Use custom class in transformation

174

val processor = instance.asInstanceOf[DataProcessor]

175

processor.process(i)

176

}

177

```

178

179

### Checking Artifact Status

180

181

```scala

182

import org.apache.spark.sql.connect.artifact.SparkConnectArtifactManager

183

import org.apache.spark.connect.proto

184

185

// Check status of uploaded artifacts

186

val statusRequest = proto.ArtifactStatusesRequest.newBuilder()

187

.addNames("my-library.jar")

188

.addNames("my-module.py")

189

.build()

190

191

// Get artifact statuses

192

val statuses: Map[String, ArtifactStatus] = // ... from handler response

193

194

statuses.foreach { case (name, status) =>

195

println(s"Artifact $name: ${status.state} (${status.size} bytes)")

196

}

197

```

198

199

### Python Package Management

200

201

```scala

202

import org.apache.spark.sql.connect.artifact.SparkConnectArtifactManager

203

204

// Get Python includes for session

205

val pythonIncludes = artifactManager.getSparkConnectPythonIncludes

206

println(s"Python packages: ${pythonIncludes.mkString(", ")}")

207

208

// Python packages are automatically available in PySpark

209

val spark = sessionHolder.session

210

val pythonDF = spark.sql("""

211

SELECT my_custom_python_function(value) as result

212

FROM VALUES (1), (2), (3) as t(value)

213

""")

214

```

215

216

## Security and Validation

217

218

### Artifact Validation

219

220

All uploaded artifacts go through security validation:

221

222

```scala

223

import org.apache.spark.sql.connect.artifact.util.ArtifactUtils

224

225

// Validate JAR file

226

val jarBytes: Array[Byte] = // ... uploaded JAR

227

val isValid = ArtifactUtils.validateJar(jarBytes)

228

229

if (!isValid) {

230

throw new SecurityException("Invalid JAR file")

231

}

232

233

// Check artifact name

234

val artifactName = "user-library.jar"

235

val isValidName = ArtifactUtils.isValidArtifactName(artifactName)

236

237

if (!isValidName) {

238

throw new IllegalArgumentException("Invalid artifact name")

239

}

240

```

241

242

### Security Policies

243

244

- **File Type Validation**: Only allowed file types can be uploaded

245

- **Size Limits**: Configurable maximum file and total session sizes

246

- **Name Validation**: Artifact names must follow security guidelines

247

- **Content Scanning**: JAR files are scanned for malicious content

248

- **Checksum Verification**: Integrity checking for all uploads

249

250

## Session Isolation

251

252

### Isolated Class Loading

253

254

Each session has its own class loader hierarchy:

255

256

```scala

257

// Session A uploads library-v1.jar

258

val sessionA = SparkConnectService.getOrCreateIsolatedSession("userA", "sessionA")

259

val classLoaderA = sessionA.artifactManager.classloader

260

261

// Session B uploads library-v2.jar

262

val sessionB = SparkConnectService.getOrCreateIsolatedSession("userB", "sessionB")

263

val classLoaderB = sessionB.artifactManager.classloader

264

265

// Classes are isolated between sessions

266

val classA = classLoaderA.loadClass("com.example.Library") // loads v1

267

val classB = classLoaderB.loadClass("com.example.Library") // loads v2

268

```

269

270

### Resource Isolation

271

272

- **Storage**: Each session has separate artifact storage

273

- **Memory**: Artifacts count toward session memory limits

274

- **Cleanup**: Session artifacts are cleaned up when session ends

275

- **Access Control**: Sessions cannot access other sessions' artifacts

276

277

## Performance and Optimization

278

279

### Caching and Reuse

280

281

```scala

282

// Artifacts are cached by checksum to avoid duplicate storage

283

val checksum1 = ArtifactUtils.computeChecksum(jarBytes1)

284

val checksum2 = ArtifactUtils.computeChecksum(jarBytes2)

285

286

// If checksums match, artifact is reused (copy-on-write)

287

if (checksum1 == checksum2) {

288

println("Artifact already exists, reusing cached version")

289

}

290

```

291

292

### Streaming Uploads

293

294

Large artifacts are uploaded in chunks to avoid memory issues:

295

296

```scala

297

// Client streams large JAR in chunks

298

val chunks = splitIntoChunks(largeJarBytes, chunkSize = 1024 * 1024) // 1MB chunks

299

300

chunks.zipWithIndex.foreach { case (chunk, index) =>

301

val artifactChunk = proto.AddArtifactsRequest.ArtifactChunk.newBuilder()

302

.setName("large-library.jar")

303

.setData(ByteString.copyFrom(chunk))

304

.setChunkIndex(index)

305

.setIsLastChunk(index == chunks.length - 1)

306

.build()

307

308

// Stream chunk to server

309

artifactManager.addArtifact(artifactChunk)

310

}

311

```

312

313

## Configuration Options

314

315

### Artifact Limits

316

317

Key configuration parameters for artifact management:

318

319

- `spark.connect.artifacts.maxSizePerSession`: Maximum total size per session

320

- `spark.connect.artifacts.maxFileSize`: Maximum individual file size

321

- `spark.connect.artifacts.allowedTypes`: Allowed artifact file types

322

- `spark.connect.artifacts.cacheDir`: Directory for artifact storage

323

- `spark.connect.artifacts.cleanupInterval`: Cleanup frequency

324

325

### Security Settings

326

327

- `spark.connect.artifacts.validation.enabled`: Enable artifact validation

328

- `spark.connect.artifacts.scanning.enabled`: Enable content scanning

329

- `spark.connect.artifacts.checksum.algorithm`: Checksum algorithm

330

- `spark.connect.artifacts.quarantine.enabled`: Quarantine suspicious files

331

332

## Error Handling

333

334

### Upload Errors

335

336

Common artifact upload errors and handling:

337

338

- **Size Limit Exceeded**: Artifact exceeds configured size limits

339

- **Invalid Format**: Malformed or corrupted artifact files

340

- **Security Violation**: Artifact fails security validation

341

- **Storage Error**: Insufficient disk space or I/O errors

342

- **Network Error**: Connection issues during streaming upload

343

344

### Recovery and Retry

345

346

- **Partial Uploads**: Resume interrupted uploads from last successful chunk

347

- **Corruption Detection**: Verify checksums and retry on mismatch

348

- **Cleanup**: Automatic cleanup of failed or incomplete uploads

349

- **Error Reporting**: Detailed error messages with resolution guidance