The artifact management system handles JAR files, Python packages, and other resources uploaded by clients. It provides session-isolated storage, dynamic class loading, and secure artifact handling.
Main class for managing artifacts within a session.
class SparkConnectArtifactManager(sessionHolder: SessionHolder) {
def getSparkConnectAddedJars: Seq[URL]
def getSparkConnectPythonIncludes: Seq[String]
def classloader: ClassLoader
def addArtifact(remoteRelativePath: Path, serverLocalStagingPath: Path, fragment: Option[String]): Unit
def artifactPath: Path
}Key Methods:
getSparkConnectAddedJars: Get all JARs added to this sessiongetSparkConnectPythonIncludes: Get Python packages and modulesclassloader: Get session-specific class loaderaddArtifact: Add artifact from local staging path to sessionartifactPath: Get the base path for session artifactsUtility functions for artifact processing and validation.
object ArtifactUtils {
def validateJar(jarBytes: Array[Byte]): Boolean
def extractJarMetadata(jarBytes: Array[Byte]): JarMetadata
def computeChecksum(artifactBytes: Array[Byte]): String
def isValidArtifactName(name: String): Boolean
}Key Methods:
validateJar: Validate JAR file format and contentsextractJarMetadata: Extract manifest and dependency informationcomputeChecksum: Generate artifact checksums for integrityisValidArtifactName: Validate artifact naming conventionsJAR files contain compiled Java/Scala classes and can be uploaded for use in user code.
case class JarArtifact(
name: String,
checksum: String,
size: Long,
uploadTime: Long,
metadata: JarMetadata
)
case class JarMetadata(
manifestVersion: String,
mainClass: Option[String],
dependencies: Seq[String],
exportedPackages: Seq[String]
)Python packages and modules for use in PySpark operations.
case class PythonArtifact(
name: String,
packageType: PythonPackageType,
checksum: String,
size: Long,
uploadTime: Long
)
sealed trait PythonPackageType
case object WheelPackage extends PythonPackageType
case object EggPackage extends PythonPackageType
case object SourcePackage extends PythonPackageTypeOther file types that may be needed by user applications.
case class FileArtifact(
name: String,
mimeType: String,
checksum: String,
size: Long,
uploadTime: Long
)Handles streaming artifact upload requests.
class SparkConnectAddArtifactsHandler(
responseObserver: StreamObserver[proto.AddArtifactsResponse]
) extends StreamObserver[proto.AddArtifactsRequest] {
def onNext(request: proto.AddArtifactsRequest): Unit
def onError(t: Throwable): Unit
def onCompleted(): Unit
}Handles artifact status and metadata queries.
class SparkConnectArtifactStatusesHandler(
responseObserver: StreamObserver[proto.ArtifactStatusesResponse]
) {
def handle(request: proto.ArtifactStatusesRequest): Unit
}import org.apache.spark.sql.connect.artifact.SparkConnectArtifactManager
import org.apache.spark.connect.proto
// Get artifact manager for session
val artifactManager = sessionHolder.artifactManager
// Create artifact chunk for upload
val jarBytes: Array[Byte] = // ... JAR file contents
val artifactChunk = proto.AddArtifactsRequest.ArtifactChunk.newBuilder()
.setName("my-library.jar")
.setData(ByteString.copyFrom(jarBytes))
.build()
// Add artifact to session
artifactManager.addArtifact(artifactChunk)
// Verify artifact was added
val addedJars = artifactManager.getSparkConnectAddedJars
println(s"Session has ${addedJars.length} JARs")import org.apache.spark.sql.connect.artifact.SparkConnectArtifactManager
// Get session class loader with uploaded JARs
val classLoader = artifactManager.classloader
// Load custom class from uploaded JAR
val customClass = classLoader.loadClass("com.mycompany.MyCustomClass")
val instance = customClass.getDeclaredConstructor().newInstance()
// Use in Spark operations
val spark = sessionHolder.session
import spark.implicits._
val df = spark.range(100).map { i =>
// Use custom class in transformation
val processor = instance.asInstanceOf[DataProcessor]
processor.process(i)
}import org.apache.spark.sql.connect.artifact.SparkConnectArtifactManager
import org.apache.spark.connect.proto
// Check status of uploaded artifacts
val statusRequest = proto.ArtifactStatusesRequest.newBuilder()
.addNames("my-library.jar")
.addNames("my-module.py")
.build()
// Get artifact statuses
val statuses: Map[String, ArtifactStatus] = // ... from handler response
statuses.foreach { case (name, status) =>
println(s"Artifact $name: ${status.state} (${status.size} bytes)")
}import org.apache.spark.sql.connect.artifact.SparkConnectArtifactManager
// Get Python includes for session
val pythonIncludes = artifactManager.getSparkConnectPythonIncludes
println(s"Python packages: ${pythonIncludes.mkString(", ")}")
// Python packages are automatically available in PySpark
val spark = sessionHolder.session
val pythonDF = spark.sql("""
SELECT my_custom_python_function(value) as result
FROM VALUES (1), (2), (3) as t(value)
""")All uploaded artifacts go through security validation:
import org.apache.spark.sql.connect.artifact.util.ArtifactUtils
// Validate JAR file
val jarBytes: Array[Byte] = // ... uploaded JAR
val isValid = ArtifactUtils.validateJar(jarBytes)
if (!isValid) {
throw new SecurityException("Invalid JAR file")
}
// Check artifact name
val artifactName = "user-library.jar"
val isValidName = ArtifactUtils.isValidArtifactName(artifactName)
if (!isValidName) {
throw new IllegalArgumentException("Invalid artifact name")
}Each session has its own class loader hierarchy:
// Session A uploads library-v1.jar
val sessionA = SparkConnectService.getOrCreateIsolatedSession("userA", "sessionA")
val classLoaderA = sessionA.artifactManager.classloader
// Session B uploads library-v2.jar
val sessionB = SparkConnectService.getOrCreateIsolatedSession("userB", "sessionB")
val classLoaderB = sessionB.artifactManager.classloader
// Classes are isolated between sessions
val classA = classLoaderA.loadClass("com.example.Library") // loads v1
val classB = classLoaderB.loadClass("com.example.Library") // loads v2// Artifacts are cached by checksum to avoid duplicate storage
val checksum1 = ArtifactUtils.computeChecksum(jarBytes1)
val checksum2 = ArtifactUtils.computeChecksum(jarBytes2)
// If checksums match, artifact is reused (copy-on-write)
if (checksum1 == checksum2) {
println("Artifact already exists, reusing cached version")
}Large artifacts are uploaded in chunks to avoid memory issues:
// Client streams large JAR in chunks
val chunks = splitIntoChunks(largeJarBytes, chunkSize = 1024 * 1024) // 1MB chunks
chunks.zipWithIndex.foreach { case (chunk, index) =>
val artifactChunk = proto.AddArtifactsRequest.ArtifactChunk.newBuilder()
.setName("large-library.jar")
.setData(ByteString.copyFrom(chunk))
.setChunkIndex(index)
.setIsLastChunk(index == chunks.length - 1)
.build()
// Stream chunk to server
artifactManager.addArtifact(artifactChunk)
}Key configuration parameters for artifact management:
spark.connect.artifacts.maxSizePerSession: Maximum total size per sessionspark.connect.artifacts.maxFileSize: Maximum individual file sizespark.connect.artifacts.allowedTypes: Allowed artifact file typesspark.connect.artifacts.cacheDir: Directory for artifact storagespark.connect.artifacts.cleanupInterval: Cleanup frequencyspark.connect.artifacts.validation.enabled: Enable artifact validationspark.connect.artifacts.scanning.enabled: Enable content scanningspark.connect.artifacts.checksum.algorithm: Checksum algorithmspark.connect.artifacts.quarantine.enabled: Quarantine suspicious filesCommon artifact upload errors and handling: