0
# Artifact Management
1
2
The artifact management system handles JAR files, Python packages, and other resources uploaded by clients. It provides session-isolated storage, dynamic class loading, and secure artifact handling.
3
4
## Core Artifact Components
5
6
### SparkConnectArtifactManager
7
8
Main class for managing artifacts within a session.
9
10
```scala { .api }
11
class SparkConnectArtifactManager(sessionHolder: SessionHolder) {
12
def getSparkConnectAddedJars: Seq[URL]
13
def getSparkConnectPythonIncludes: Seq[String]
14
def classloader: ClassLoader
15
def addArtifact(remoteRelativePath: Path, serverLocalStagingPath: Path, fragment: Option[String]): Unit
16
def artifactPath: Path
17
}
18
```
19
20
**Key Methods:**
21
- `getSparkConnectAddedJars`: Get all JARs added to this session
22
- `getSparkConnectPythonIncludes`: Get Python packages and modules
23
- `classloader`: Get session-specific class loader
24
- `addArtifact`: Add artifact from local staging path to session
25
- `artifactPath`: Get the base path for session artifacts
26
27
### ArtifactUtils
28
29
Utility functions for artifact processing and validation.
30
31
```scala { .api }
32
object ArtifactUtils {
33
def validateJar(jarBytes: Array[Byte]): Boolean
34
def extractJarMetadata(jarBytes: Array[Byte]): JarMetadata
35
def computeChecksum(artifactBytes: Array[Byte]): String
36
def isValidArtifactName(name: String): Boolean
37
}
38
```
39
40
**Key Methods:**
41
- `validateJar`: Validate JAR file format and contents
42
- `extractJarMetadata`: Extract manifest and dependency information
43
- `computeChecksum`: Generate artifact checksums for integrity
44
- `isValidArtifactName`: Validate artifact naming conventions
45
46
## Artifact Types
47
48
### JAR Files
49
50
JAR files contain compiled Java/Scala classes and can be uploaded for use in user code.
51
52
```scala { .api }
53
case class JarArtifact(
54
name: String,
55
checksum: String,
56
size: Long,
57
uploadTime: Long,
58
metadata: JarMetadata
59
)
60
61
case class JarMetadata(
62
manifestVersion: String,
63
mainClass: Option[String],
64
dependencies: Seq[String],
65
exportedPackages: Seq[String]
66
)
67
```
68
69
### Python Packages
70
71
Python packages and modules for use in PySpark operations.
72
73
```scala { .api }
74
case class PythonArtifact(
75
name: String,
76
packageType: PythonPackageType,
77
checksum: String,
78
size: Long,
79
uploadTime: Long
80
)
81
82
sealed trait PythonPackageType
83
case object WheelPackage extends PythonPackageType
84
case object EggPackage extends PythonPackageType
85
case object SourcePackage extends PythonPackageType
86
```
87
88
### Generic Files
89
90
Other file types that may be needed by user applications.
91
92
```scala { .api }
93
case class FileArtifact(
94
name: String,
95
mimeType: String,
96
checksum: String,
97
size: Long,
98
uploadTime: Long
99
)
100
```
101
102
## Artifact Request Handlers
103
104
### SparkConnectAddArtifactsHandler
105
106
Handles streaming artifact upload requests.
107
108
```scala { .api }
109
class SparkConnectAddArtifactsHandler(
110
responseObserver: StreamObserver[proto.AddArtifactsResponse]
111
) extends StreamObserver[proto.AddArtifactsRequest] {
112
def onNext(request: proto.AddArtifactsRequest): Unit
113
def onError(t: Throwable): Unit
114
def onCompleted(): Unit
115
}
116
```
117
118
### SparkConnectArtifactStatusesHandler
119
120
Handles artifact status and metadata queries.
121
122
```scala { .api }
123
class SparkConnectArtifactStatusesHandler(
124
responseObserver: StreamObserver[proto.ArtifactStatusesResponse]
125
) {
126
def handle(request: proto.ArtifactStatusesRequest): Unit
127
}
128
```
129
130
## Usage Examples
131
132
### Uploading JAR Files
133
134
```scala
135
import org.apache.spark.sql.connect.artifact.SparkConnectArtifactManager
136
import org.apache.spark.connect.proto
137
138
// Get artifact manager for session
139
val artifactManager = sessionHolder.artifactManager
140
141
// Create artifact chunk for upload
142
val jarBytes: Array[Byte] = // ... JAR file contents
143
val artifactChunk = proto.AddArtifactsRequest.ArtifactChunk.newBuilder()
144
.setName("my-library.jar")
145
.setData(ByteString.copyFrom(jarBytes))
146
.build()
147
148
// Add artifact to session
149
artifactManager.addArtifact(artifactChunk)
150
151
// Verify artifact was added
152
val addedJars = artifactManager.getSparkConnectAddedJars
153
println(s"Session has ${addedJars.length} JARs")
154
```
155
156
### Using Custom Classes
157
158
```scala
159
import org.apache.spark.sql.connect.artifact.SparkConnectArtifactManager
160
161
// Get session class loader with uploaded JARs
162
val classLoader = artifactManager.classloader
163
164
// Load custom class from uploaded JAR
165
val customClass = classLoader.loadClass("com.mycompany.MyCustomClass")
166
val instance = customClass.getDeclaredConstructor().newInstance()
167
168
// Use in Spark operations
169
val spark = sessionHolder.session
170
import spark.implicits._
171
172
val df = spark.range(100).map { i =>
173
// Use custom class in transformation
174
val processor = instance.asInstanceOf[DataProcessor]
175
processor.process(i)
176
}
177
```
178
179
### Checking Artifact Status
180
181
```scala
182
import org.apache.spark.sql.connect.artifact.SparkConnectArtifactManager
183
import org.apache.spark.connect.proto
184
185
// Check status of uploaded artifacts
186
val statusRequest = proto.ArtifactStatusesRequest.newBuilder()
187
.addNames("my-library.jar")
188
.addNames("my-module.py")
189
.build()
190
191
// Get artifact statuses
192
val statuses: Map[String, ArtifactStatus] = // ... from handler response
193
194
statuses.foreach { case (name, status) =>
195
println(s"Artifact $name: ${status.state} (${status.size} bytes)")
196
}
197
```
198
199
### Python Package Management
200
201
```scala
202
import org.apache.spark.sql.connect.artifact.SparkConnectArtifactManager
203
204
// Get Python includes for session
205
val pythonIncludes = artifactManager.getSparkConnectPythonIncludes
206
println(s"Python packages: ${pythonIncludes.mkString(", ")}")
207
208
// Python packages are automatically available in PySpark
209
val spark = sessionHolder.session
210
val pythonDF = spark.sql("""
211
SELECT my_custom_python_function(value) as result
212
FROM VALUES (1), (2), (3) as t(value)
213
""")
214
```
215
216
## Security and Validation
217
218
### Artifact Validation
219
220
All uploaded artifacts go through security validation:
221
222
```scala
223
import org.apache.spark.sql.connect.artifact.util.ArtifactUtils
224
225
// Validate JAR file
226
val jarBytes: Array[Byte] = // ... uploaded JAR
227
val isValid = ArtifactUtils.validateJar(jarBytes)
228
229
if (!isValid) {
230
throw new SecurityException("Invalid JAR file")
231
}
232
233
// Check artifact name
234
val artifactName = "user-library.jar"
235
val isValidName = ArtifactUtils.isValidArtifactName(artifactName)
236
237
if (!isValidName) {
238
throw new IllegalArgumentException("Invalid artifact name")
239
}
240
```
241
242
### Security Policies
243
244
- **File Type Validation**: Only allowed file types can be uploaded
245
- **Size Limits**: Configurable maximum file and total session sizes
246
- **Name Validation**: Artifact names must follow security guidelines
247
- **Content Scanning**: JAR files are scanned for malicious content
248
- **Checksum Verification**: Integrity checking for all uploads
249
250
## Session Isolation
251
252
### Isolated Class Loading
253
254
Each session has its own class loader hierarchy:
255
256
```scala
257
// Session A uploads library-v1.jar
258
val sessionA = SparkConnectService.getOrCreateIsolatedSession("userA", "sessionA")
259
val classLoaderA = sessionA.artifactManager.classloader
260
261
// Session B uploads library-v2.jar
262
val sessionB = SparkConnectService.getOrCreateIsolatedSession("userB", "sessionB")
263
val classLoaderB = sessionB.artifactManager.classloader
264
265
// Classes are isolated between sessions
266
val classA = classLoaderA.loadClass("com.example.Library") // loads v1
267
val classB = classLoaderB.loadClass("com.example.Library") // loads v2
268
```
269
270
### Resource Isolation
271
272
- **Storage**: Each session has separate artifact storage
273
- **Memory**: Artifacts count toward session memory limits
274
- **Cleanup**: Session artifacts are cleaned up when session ends
275
- **Access Control**: Sessions cannot access other sessions' artifacts
276
277
## Performance and Optimization
278
279
### Caching and Reuse
280
281
```scala
282
// Artifacts are cached by checksum to avoid duplicate storage
283
val checksum1 = ArtifactUtils.computeChecksum(jarBytes1)
284
val checksum2 = ArtifactUtils.computeChecksum(jarBytes2)
285
286
// If checksums match, artifact is reused (copy-on-write)
287
if (checksum1 == checksum2) {
288
println("Artifact already exists, reusing cached version")
289
}
290
```
291
292
### Streaming Uploads
293
294
Large artifacts are uploaded in chunks to avoid memory issues:
295
296
```scala
297
// Client streams large JAR in chunks
298
val chunks = splitIntoChunks(largeJarBytes, chunkSize = 1024 * 1024) // 1MB chunks
299
300
chunks.zipWithIndex.foreach { case (chunk, index) =>
301
val artifactChunk = proto.AddArtifactsRequest.ArtifactChunk.newBuilder()
302
.setName("large-library.jar")
303
.setData(ByteString.copyFrom(chunk))
304
.setChunkIndex(index)
305
.setIsLastChunk(index == chunks.length - 1)
306
.build()
307
308
// Stream chunk to server
309
artifactManager.addArtifact(artifactChunk)
310
}
311
```
312
313
## Configuration Options
314
315
### Artifact Limits
316
317
Key configuration parameters for artifact management:
318
319
- `spark.connect.artifacts.maxSizePerSession`: Maximum total size per session
320
- `spark.connect.artifacts.maxFileSize`: Maximum individual file size
321
- `spark.connect.artifacts.allowedTypes`: Allowed artifact file types
322
- `spark.connect.artifacts.cacheDir`: Directory for artifact storage
323
- `spark.connect.artifacts.cleanupInterval`: Cleanup frequency
324
325
### Security Settings
326
327
- `spark.connect.artifacts.validation.enabled`: Enable artifact validation
328
- `spark.connect.artifacts.scanning.enabled`: Enable content scanning
329
- `spark.connect.artifacts.checksum.algorithm`: Checksum algorithm
330
- `spark.connect.artifacts.quarantine.enabled`: Quarantine suspicious files
331
332
## Error Handling
333
334
### Upload Errors
335
336
Common artifact upload errors and handling:
337
338
- **Size Limit Exceeded**: Artifact exceeds configured size limits
339
- **Invalid Format**: Malformed or corrupted artifact files
340
- **Security Violation**: Artifact fails security validation
341
- **Storage Error**: Insufficient disk space or I/O errors
342
- **Network Error**: Connection issues during streaming upload
343
344
### Recovery and Retry
345
346
- **Partial Uploads**: Resume interrupted uploads from last successful chunk
347
- **Corruption Detection**: Verify checksums and retry on mismatch
348
- **Cleanup**: Automatic cleanup of failed or incomplete uploads
349
- **Error Reporting**: Detailed error messages with resolution guidance