or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

artifact-management.mdconfiguration.mdindex.mdmonitoring-ui.mdplan-processing.mdplugin-system.mdserver-management.mdsession-management.md
tile.json

artifact-management.mddocs/

Artifact Management

The artifact management system handles JAR files, Python packages, and other resources uploaded by clients. It provides session-isolated storage, dynamic class loading, and secure artifact handling.

Core Artifact Components

SparkConnectArtifactManager

Main class for managing artifacts within a session.

class SparkConnectArtifactManager(sessionHolder: SessionHolder) {
  def getSparkConnectAddedJars: Seq[URL]
  def getSparkConnectPythonIncludes: Seq[String]
  def classloader: ClassLoader
  def addArtifact(remoteRelativePath: Path, serverLocalStagingPath: Path, fragment: Option[String]): Unit
  def artifactPath: Path
}

Key Methods:

  • getSparkConnectAddedJars: Get all JARs added to this session
  • getSparkConnectPythonIncludes: Get Python packages and modules
  • classloader: Get session-specific class loader
  • addArtifact: Add artifact from local staging path to session
  • artifactPath: Get the base path for session artifacts

ArtifactUtils

Utility functions for artifact processing and validation.

object ArtifactUtils {
  def validateJar(jarBytes: Array[Byte]): Boolean
  def extractJarMetadata(jarBytes: Array[Byte]): JarMetadata
  def computeChecksum(artifactBytes: Array[Byte]): String
  def isValidArtifactName(name: String): Boolean
}

Key Methods:

  • validateJar: Validate JAR file format and contents
  • extractJarMetadata: Extract manifest and dependency information
  • computeChecksum: Generate artifact checksums for integrity
  • isValidArtifactName: Validate artifact naming conventions

Artifact Types

JAR Files

JAR files contain compiled Java/Scala classes and can be uploaded for use in user code.

case class JarArtifact(
  name: String,
  checksum: String,
  size: Long,
  uploadTime: Long,
  metadata: JarMetadata
)

case class JarMetadata(
  manifestVersion: String,
  mainClass: Option[String],
  dependencies: Seq[String],
  exportedPackages: Seq[String]
)

Python Packages

Python packages and modules for use in PySpark operations.

case class PythonArtifact(
  name: String,
  packageType: PythonPackageType,
  checksum: String,
  size: Long,
  uploadTime: Long
)

sealed trait PythonPackageType
case object WheelPackage extends PythonPackageType
case object EggPackage extends PythonPackageType
case object SourcePackage extends PythonPackageType

Generic Files

Other file types that may be needed by user applications.

case class FileArtifact(
  name: String,
  mimeType: String,
  checksum: String,
  size: Long,
  uploadTime: Long
)

Artifact Request Handlers

SparkConnectAddArtifactsHandler

Handles streaming artifact upload requests.

class SparkConnectAddArtifactsHandler(
  responseObserver: StreamObserver[proto.AddArtifactsResponse]
) extends StreamObserver[proto.AddArtifactsRequest] {
  def onNext(request: proto.AddArtifactsRequest): Unit
  def onError(t: Throwable): Unit  
  def onCompleted(): Unit
}

SparkConnectArtifactStatusesHandler

Handles artifact status and metadata queries.

class SparkConnectArtifactStatusesHandler(
  responseObserver: StreamObserver[proto.ArtifactStatusesResponse]
) {
  def handle(request: proto.ArtifactStatusesRequest): Unit
}

Usage Examples

Uploading JAR Files

import org.apache.spark.sql.connect.artifact.SparkConnectArtifactManager
import org.apache.spark.connect.proto

// Get artifact manager for session
val artifactManager = sessionHolder.artifactManager

// Create artifact chunk for upload
val jarBytes: Array[Byte] = // ... JAR file contents
val artifactChunk = proto.AddArtifactsRequest.ArtifactChunk.newBuilder()
  .setName("my-library.jar")
  .setData(ByteString.copyFrom(jarBytes))
  .build()

// Add artifact to session
artifactManager.addArtifact(artifactChunk)

// Verify artifact was added
val addedJars = artifactManager.getSparkConnectAddedJars
println(s"Session has ${addedJars.length} JARs")

Using Custom Classes

import org.apache.spark.sql.connect.artifact.SparkConnectArtifactManager

// Get session class loader with uploaded JARs
val classLoader = artifactManager.classloader

// Load custom class from uploaded JAR
val customClass = classLoader.loadClass("com.mycompany.MyCustomClass")
val instance = customClass.getDeclaredConstructor().newInstance()

// Use in Spark operations
val spark = sessionHolder.session
import spark.implicits._

val df = spark.range(100).map { i =>
  // Use custom class in transformation
  val processor = instance.asInstanceOf[DataProcessor]
  processor.process(i)
}

Checking Artifact Status

import org.apache.spark.sql.connect.artifact.SparkConnectArtifactManager
import org.apache.spark.connect.proto

// Check status of uploaded artifacts
val statusRequest = proto.ArtifactStatusesRequest.newBuilder()
  .addNames("my-library.jar")
  .addNames("my-module.py")
  .build()

// Get artifact statuses
val statuses: Map[String, ArtifactStatus] = // ... from handler response

statuses.foreach { case (name, status) =>
  println(s"Artifact $name: ${status.state} (${status.size} bytes)")
}

Python Package Management

import org.apache.spark.sql.connect.artifact.SparkConnectArtifactManager

// Get Python includes for session
val pythonIncludes = artifactManager.getSparkConnectPythonIncludes
println(s"Python packages: ${pythonIncludes.mkString(", ")}")

// Python packages are automatically available in PySpark
val spark = sessionHolder.session
val pythonDF = spark.sql("""
  SELECT my_custom_python_function(value) as result
  FROM VALUES (1), (2), (3) as t(value)
""")

Security and Validation

Artifact Validation

All uploaded artifacts go through security validation:

import org.apache.spark.sql.connect.artifact.util.ArtifactUtils

// Validate JAR file
val jarBytes: Array[Byte] = // ... uploaded JAR
val isValid = ArtifactUtils.validateJar(jarBytes)

if (!isValid) {
  throw new SecurityException("Invalid JAR file")
}

// Check artifact name
val artifactName = "user-library.jar"
val isValidName = ArtifactUtils.isValidArtifactName(artifactName)

if (!isValidName) {
  throw new IllegalArgumentException("Invalid artifact name")
}

Security Policies

  • File Type Validation: Only allowed file types can be uploaded
  • Size Limits: Configurable maximum file and total session sizes
  • Name Validation: Artifact names must follow security guidelines
  • Content Scanning: JAR files are scanned for malicious content
  • Checksum Verification: Integrity checking for all uploads

Session Isolation

Isolated Class Loading

Each session has its own class loader hierarchy:

// Session A uploads library-v1.jar
val sessionA = SparkConnectService.getOrCreateIsolatedSession("userA", "sessionA")
val classLoaderA = sessionA.artifactManager.classloader

// Session B uploads library-v2.jar  
val sessionB = SparkConnectService.getOrCreateIsolatedSession("userB", "sessionB")
val classLoaderB = sessionB.artifactManager.classloader

// Classes are isolated between sessions
val classA = classLoaderA.loadClass("com.example.Library") // loads v1
val classB = classLoaderB.loadClass("com.example.Library") // loads v2

Resource Isolation

  • Storage: Each session has separate artifact storage
  • Memory: Artifacts count toward session memory limits
  • Cleanup: Session artifacts are cleaned up when session ends
  • Access Control: Sessions cannot access other sessions' artifacts

Performance and Optimization

Caching and Reuse

// Artifacts are cached by checksum to avoid duplicate storage
val checksum1 = ArtifactUtils.computeChecksum(jarBytes1)
val checksum2 = ArtifactUtils.computeChecksum(jarBytes2)

// If checksums match, artifact is reused (copy-on-write)
if (checksum1 == checksum2) {
  println("Artifact already exists, reusing cached version")
}

Streaming Uploads

Large artifacts are uploaded in chunks to avoid memory issues:

// Client streams large JAR in chunks
val chunks = splitIntoChunks(largeJarBytes, chunkSize = 1024 * 1024) // 1MB chunks

chunks.zipWithIndex.foreach { case (chunk, index) =>
  val artifactChunk = proto.AddArtifactsRequest.ArtifactChunk.newBuilder()
    .setName("large-library.jar")
    .setData(ByteString.copyFrom(chunk))
    .setChunkIndex(index)
    .setIsLastChunk(index == chunks.length - 1)
    .build()
    
  // Stream chunk to server
  artifactManager.addArtifact(artifactChunk)
}

Configuration Options

Artifact Limits

Key configuration parameters for artifact management:

  • spark.connect.artifacts.maxSizePerSession: Maximum total size per session
  • spark.connect.artifacts.maxFileSize: Maximum individual file size
  • spark.connect.artifacts.allowedTypes: Allowed artifact file types
  • spark.connect.artifacts.cacheDir: Directory for artifact storage
  • spark.connect.artifacts.cleanupInterval: Cleanup frequency

Security Settings

  • spark.connect.artifacts.validation.enabled: Enable artifact validation
  • spark.connect.artifacts.scanning.enabled: Enable content scanning
  • spark.connect.artifacts.checksum.algorithm: Checksum algorithm
  • spark.connect.artifacts.quarantine.enabled: Quarantine suspicious files

Error Handling

Upload Errors

Common artifact upload errors and handling:

  • Size Limit Exceeded: Artifact exceeds configured size limits
  • Invalid Format: Malformed or corrupted artifact files
  • Security Violation: Artifact fails security validation
  • Storage Error: Insufficient disk space or I/O errors
  • Network Error: Connection issues during streaming upload

Recovery and Retry

  • Partial Uploads: Resume interrupted uploads from last successful chunk
  • Corruption Detection: Verify checksums and retry on mismatch
  • Cleanup: Automatic cleanup of failed or incomplete uploads
  • Error Reporting: Detailed error messages with resolution guidance