or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

accumulators.mdapplication-context.mdbroadcast-variables.mdindex.mdjava-api.mdpartitioning.mdrdd-operations.mdserialization.mdstorage-persistence.md
tile.json

application-context.mddocs/

Application Context

Application setup and cluster connection management through SparkContext and SparkConf for coordinating distributed Spark applications.

Capabilities

SparkContext

Main entry point for Spark functionality, representing the connection to a Spark cluster. Only one SparkContext should be active per JVM.

/**
 * Main entry point for Spark functionality
 * @param config Spark configuration object
 */
class SparkContext(config: SparkConf) {
  /** Create RDD from Scala collection */
  def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]
  
  /** Read text files from HDFS, local filesystem, or any Hadoop-supported URI */
  def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
  
  /** Create RDD from Hadoop InputFormat */
  def hadoopRDD[K, V](
    conf: JobConf,
    inputFormatClass: Class[_ <: InputFormat[K, V]],
    keyClass: Class[K],
    valueClass: Class[V],
    minPartitions: Int = defaultMinPartitions
  ): RDD[(K, V)]
  
  /** Create RDD from new Hadoop API InputFormat */
  def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
    conf: Configuration,
    fClass: Class[F],
    kClass: Class[K],
    vClass: Class[V]
  ): RDD[(K, V)]
  
  /** Create broadcast variable */
  def broadcast[T: ClassTag](value: T): Broadcast[T]
  
  /** Create accumulator */
  def longAccumulator(name: String): LongAccumulator
  def doubleAccumulator(name: String): DoubleAccumulator
  def collectionAccumulator[T](name: String): CollectionAccumulator[T]
  
  /** Add file to be downloaded with this Spark job */
  def addFile(path: String, recursive: Boolean = false): Unit
  
  /** Add JAR file to be distributed to cluster */
  def addJar(path: String): Unit
  
  /** Set checkpoint directory for RDD checkpointing */
  def setCheckpointDir(directory: String): Unit
  
  /** Stop SparkContext and release resources */
  def stop(): Unit
  
  /** Get current status tracker */
  def statusTracker: SparkStatusTracker
  
  /** Default parallelism level */
  def defaultParallelism: Int
  
  /** Spark version */
  def version: String
}

Usage Examples:

import org.apache.spark.{SparkContext, SparkConf}

// Basic setup
val conf = new SparkConf()
  .setAppName("MyApp")
  .setMaster("local[2]")
val sc = new SparkContext(conf)

// Create RDD from collection
val numbers = sc.parallelize(1 to 100)

// Read text file
val lines = sc.textFile("hdfs://data/input.txt")

// Add dependencies
sc.addFile("config.properties")
sc.addJar("lib/custom.jar")

// Cleanup
sc.stop()

SparkConf

Configuration for Spark applications with key-value pairs for runtime settings.

/**
 * Configuration for Spark applications
 * @param loadDefaults whether to load default configurations
 */
class SparkConf(loadDefaults: Boolean = true) {
  /** Set configuration property */
  def set(key: String, value: String): SparkConf
  
  /** Set application name */
  def setAppName(name: String): SparkConf
  
  /** Set master URL */
  def setMaster(master: String): SparkConf
  
  /** Set Spark home directory */
  def setSparkHome(home: String): SparkConf
  
  /** Set JAR files to distribute to cluster */
  def setJars(jars: Seq[String]): SparkConf
  
  /** Set executor memory */
  def set(key: String, value: String): SparkConf
  
  /** Get configuration value */
  def get(key: String): String
  def get(key: String, defaultValue: String): String
  
  /** Get all configuration as key-value pairs */
  def getAll: Array[(String, String)]
  
  /** Check if configuration contains key */
  def contains(key: String): Boolean
  
  /** Remove configuration property */
  def remove(key: String): SparkConf
  
  /** Clone configuration */
  def clone(): SparkConf
}

Usage Examples:

val conf = new SparkConf()
  .setAppName("Data Processing")
  .setMaster("yarn")
  .set("spark.executor.memory", "2g")
  .set("spark.executor.cores", "4")
  .set("spark.sql.adaptive.enabled", "true")
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

// Advanced configuration
conf.set("spark.hadoop.fs.s3a.access.key", accessKey)
    .set("spark.hadoop.fs.s3a.secret.key", secretKey)
    .set("spark.sql.adaptive.coalescePartitions.enabled", "true")

SparkStatusTracker

Low-level status API for monitoring Spark applications and job execution.

/**
 * Low-level status tracking API
 */
class SparkStatusTracker {
  /** Get IDs of all known jobs */
  def getJobIdsForGroup(jobGroup: String): Array[Int]
  
  /** Get information for specific job */
  def getJobInfo(jobId: Int): Option[SparkJobInfo]
  
  /** Get information for specific stage */
  def getStageInfo(stageId: Int): Option[SparkStageInfo]
  
  /** Get active job IDs */
  def getActiveJobIds(): Array[Int]
  
  /** Get active stage IDs */
  def getActiveStageIds(): Array[Int]
  
  /** Get executor information */
  def getExecutorInfos(): Array[SparkExecutorInfo]
}

case class SparkJobInfo(
  jobId: Int,
  stageIds: Array[Int],
  status: JobExecutionStatus
)

case class SparkStageInfo(
  stageId: Int,
  currentAttemptId: Int,
  name: String,
  numTasks: Int,
  numActiveTasks: Int,
  numCompleteTasks: Int,
  numFailedTasks: Int
)

SparkFiles

Utility for resolving paths to files added through SparkContext.addFile().

/**
 * Resolves paths to files added through SparkContext.addFile()
 */
object SparkFiles {
  /** Get local path to file added with SparkContext.addFile() */
  def get(filename: String): String
  
  /** Get root directory containing files added with SparkContext.addFile() */
  def getRootDirectory(): String
}

Exception Handling

class SparkException(message: String, cause: Throwable = null) 
  extends Exception(message, cause)

class TaskNotSerializableException(taskName: String, cause: NotSerializableException)
  extends SparkException(s"Task not serializable: $taskName", cause)

class SparkFileAlreadyExistsException(outputPath: String)
  extends SparkException(s"Output path $outputPath already exists")

Common exceptions include serialization errors when tasks contain non-serializable closures, and file system errors when output paths already exist.