The Signaling module provides interrupt and job cancellation functionality for graceful handling of Ctrl+C and job termination in interactive Spark REPL sessions.
Registers a SIGINT (Ctrl+C) handler that provides intelligent job cancellation behavior.
object Signaling extends Logging {
def cancelOnInterrupt(): Unit
}The signal handler is automatically registered when the REPL starts:
// Automatically called in Main object initialization
object Main extends Logging {
initializeLogIfNecessary(true)
Signaling.cancelOnInterrupt() // Signal handler setup
// ... rest of initialization
}The signal handler provides intelligent behavior based on the current state of the Spark application:
// Behavior when Ctrl+C is pressed:
SignalUtils.register("INT") {
SparkContext.getActive.map { ctx =>
if (!ctx.statusTracker.getActiveJobIds().isEmpty) {
// Active jobs are running - cancel them first
logWarning("Cancelling all active jobs, this can take a while. " +
"Press Ctrl+C again to exit now.")
ctx.cancelAllJobs()
true // Signal handled - don't exit yet
} else {
// No active jobs - allow normal exit
false // Let default handler exit the process
}
}.getOrElse(false) // No active SparkContext - allow normal exit
}When working interactively, users can safely interrupt long-running operations:
// In REPL session:
scala> val rdd = sc.parallelize(1 to 10000000)
scala> val result = rdd.map(expensiveOperation).collect()
// User presses Ctrl+C during execution
// Output: "Cancelling all active jobs, this can take a while. Press Ctrl+C again to exit now."
// Jobs are cancelled gracefullyThe signal handler implements a two-stage exit process:
// First Ctrl+C while jobs are running:
// - Logs warning message
// - Calls ctx.cancelAllJobs()
// - Returns true (signal handled, don't exit)
// Second Ctrl+C or first Ctrl+C with no active jobs:
// - Returns false (allow default exit behavior)
// - REPL process terminatesUses Spark's SignalUtils for cross-platform signal handling:
import org.apache.spark.util.SignalUtils
// Register handler for SIGINT (interrupt signal)
SignalUtils.register("INT") { /* handler logic */ }Checks for active jobs using the SparkContext's status tracker:
SparkContext.getActive.map { ctx =>
// Check if any jobs are currently running
val activeJobs = ctx.statusTracker.getActiveJobIds()
!activeJobs.isEmpty
}Integrates with Spark's logging system for user feedback:
object Signaling extends Logging {
// Uses logWarning for user-visible messages
logWarning("Cancelling all active jobs, this can take a while. " +
"Press Ctrl+C again to exit now.")
}Handles cases where no SparkContext is available:
SparkContext.getActive.map { ctx =>
// SparkContext exists - check for active jobs
// ... job cancellation logic
}.getOrElse(false) // No SparkContext - allow normal exitIf job cancellation fails or SparkContext is in an invalid state, the handler gracefully falls back to allowing normal process termination.
The signal handling works across different operating systems through Spark's SignalUtils abstraction, which handles platform-specific signal differences.
The signal handler is designed to be thread-safe and can be called from signal handling threads without interfering with the main REPL execution thread.
The signal handler behavior can be tested programmatically:
// In test code:
import org.apache.spark.repl.Signaling
// Setup test SparkContext with active jobs
val sc = new SparkContext(...)
val rdd = sc.parallelize(1 to 1000000)
val future = Future { rdd.map(expensiveOperation).collect() }
// Signal handler should cancel jobs when interrupted
// Test framework would need to simulate SIGINTFor unit testing, the signal handling logic can be isolated and tested with mock SparkContext instances to verify correct behavior under different conditions.