Interactive command-line interface with SQL completion, history management, signal handling, and comprehensive command processing for Spark SQL queries.
Main entry point for the Spark SQL command-line interface with interactive and batch processing capabilities.
/**
* Main entry point for Spark SQL CLI with interactive shell capabilities
*/
object SparkSQLCLIDriver {
/**
* Command-line entry point for the CLI
* @param args Command line arguments including SQL files, configuration options
*/
def main(args: Array[String]): Unit
/**
* Install interrupt signal handler for Ctrl+C support
* Enables cancellation of running queries and graceful shutdown
*/
def installSignalHandler(): Unit
/**
* Print command-line usage information
*/
def printUsage(): Unit
/**
* Exit the CLI application with specified code
* @param code Exit code (0 for success, non-zero for error)
*/
def exit(code: Int): Unit
}Usage Examples:
# Start interactive CLI
spark-sql
# Execute SQL file
spark-sql -f queries.sql
# Set Hive configuration
spark-sql --hiveconf hive.server2.thrift.port=10001
# Execute single statement
spark-sql -e "SELECT COUNT(*) FROM my_table"
# Enable verbose output
spark-sql --verbose
# Set variables
spark-sql --hivevar username=alice --hivevar threshold=100Interactive CLI driver implementation with command processing and session management.
/**
* Interactive CLI driver for Spark SQL with Hive compatibility
*/
class SparkSQLCLIDriver extends CliDriver {
/**
* Process a SQL command or CLI directive
* @param cmd Command string to process
* @return 0 on success, non-zero on error
*/
def processCmd(cmd: String): Int
/**
* Process a line of input with optional interrupt handling
* @param line Input line containing one or more SQL statements
* @param allowInterrupting Whether to enable Ctrl+C interrupt handling
* @return 0 on success, non-zero on error
*/
def processLine(line: String, allowInterrupting: Boolean): Int
/**
* Print Spark master and application ID information
*/
def printMasterAndAppId(): Unit
/**
* Set Hive variables in the Spark SQL context
* @param hiveVariables Map of variable names to values
*/
def setHiveVariables(hiveVariables: java.util.Map[String, String]): Unit
}Usage Examples:
import org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver
// Create CLI driver instance
val cliDriver = new SparkSQLCLIDriver()
// Set variables
val variables = Map("database" -> "sales", "year" -> "2023").asJava
cliDriver.setHiveVariables(variables)
// Process commands
val result1 = cliDriver.processCmd("USE ${database}")
val result2 = cliDriver.processCmd("SELECT COUNT(*) FROM orders WHERE year = ${year}")
// Print connection info
cliDriver.printMasterAndAppId()Comprehensive command processing supporting SQL statements, CLI directives, and system commands.
// Supported command types
val SQL_COMMANDS = Set("SELECT", "INSERT", "UPDATE", "DELETE", "CREATE", "DROP", "ALTER", "SHOW", "DESCRIBE", "EXPLAIN")
val CLI_COMMANDS = Set("quit", "exit", "source", "set", "reset", "add", "list", "delete")
val SYSTEM_COMMANDS = // Commands starting with "!"
// Command processing flow
def processCmd(cmd: String): Int = {
val trimmed = cmd.trim()
val tokens = trimmed.split("\\s+")
// Handle special commands
if (trimmed.toLowerCase.matches("quit|exit")) {
// Graceful shutdown
} else if (tokens(0).toLowerCase.equals("source")) {
// Execute SQL file
} else if (trimmed.startsWith("!")) {
// System command execution
} else {
// SQL statement processing with SparkSQLDriver
}
}Command Examples:
-- SQL statements
SELECT * FROM employees WHERE department = 'Engineering';
CREATE TABLE temp_results AS SELECT * FROM analysis;
-- CLI commands
!ls /tmp/data/
source /path/to/queries.sql
set spark.sql.adaptive.enabled=true
add jar /path/to/my-udf.jar
list jars
-- Variable substitution
set hivevar:table_name=sales_2023
SELECT COUNT(*) FROM ${table_name};Rich interactive shell with command completion, history, and advanced editing capabilities.
/**
* Command completion support for interactive shell
* @return Array of Completer objects for different command types
*/
def getCommandCompleter(): Array[Completer] = {
// SQL keyword completion
val sqlCompleter = new StringsCompleter(SQLKeywordUtils.keywords.asJava)
// Function name completion
val functionCompleter = new StringsCompleter(
FunctionRegistry.builtin.listFunction().map(_.funcName).asJava
)
// Configuration parameter completion
val confCompleter = new StringsCompleter(
SQLConf.get.getAllDefinedConfs.map(_._1).asJava
)
Array(sqlCompleter, functionCompleter, confCompleter)
}
// History management
val historyFile = System.getProperty("user.home") + "/.hivehistory"
reader.setHistory(new FileHistory(new File(historyFile)))
// Command prompt with current database
def promptWithCurrentDB: String = s"spark-sql (${currentDatabase})> "Interactive Features:
# Tab completion for SQL keywords
spark-sql> SEL<TAB>
SELECT
# Tab completion for functions
spark-sql> SELECT coun<TAB>
count( count_if(
# Tab completion for configuration
spark-sql> set spark.sql.adaptive.<TAB>
spark.sql.adaptive.enabled
spark.sql.adaptive.coalescePartitions.enabled
# Command history (up/down arrows)
spark-sql> SELECT COUNT(*) FROM orders;
100
spark-sql> <UP ARROW> # recalls previous command
# Multi-line commands
spark-sql> SELECT customer_id,
> COUNT(*) as order_count
> FROM orders
> GROUP BY customer_id;Comprehensive signal handling for graceful interruption and shutdown.
/**
* Signal handler for Ctrl+C interruption
*/
val signalHandler = new SignalHandler() {
private var interruptRequested: Boolean = false
override def handle(signal: Signal): Unit = {
val initialRequest = !interruptRequested
interruptRequested = true
if (!initialRequest) {
// Second Ctrl+C - force exit
println("Exiting the JVM")
SparkSQLCLIDriver.exit(-1)
} else {
// First Ctrl+C - interrupt current operation
println("Interrupting... Be patient, this might take some time.")
println("Press Ctrl+C again to kill JVM")
HiveInterruptUtils.interrupt()
}
}
}
// Install signal handler
Signal.handle(new Signal("INT"), signalHandler)Signal Handling Behavior:
spark-sql> SELECT COUNT(*) FROM huge_table;
# Long running query...
^C # First Ctrl+C
Interrupting... Be patient, this might take some time.
Press Ctrl+C again to kill JVM
^C # Second Ctrl+C
Exiting the JVMAdvanced SQL statement parsing with support for comments, multi-line statements, and proper semicolon handling.
/**
* Split input line into individual SQL statements
* Handles quoted strings, comments, and escaped characters
* @param line Input line containing one or more SQL statements
* @return JList of individual SQL statements
*/
def splitSemiColon(line: String): JList[String] = {
var insideSingleQuote = false
var insideDoubleQuote = false
var insideSimpleComment = false
var bracketedCommentLevel = 0
var escape = false
var beginIndex = 0
var isStatement = false
val ret = new JArrayList[String]
// Complex parsing logic for:
// - Quoted strings with embedded semicolons
// - Single-line comments (--)
// - Multi-line bracketed comments (/* */)
// - Escaped characters
// - Statement boundaries
}Parsing Examples:
-- Multiple statements on one line
SELECT 1; SELECT 'hello; world'; SELECT 2;
-- Comments with semicolons
SELECT * FROM table1; -- This is a comment with ; semicolon
SELECT * FROM table2;
-- Multi-line statements
SELECT customer_id,
SUM(amount) as total
FROM orders
WHERE order_date >= '2023-01-01'
AND status = 'completed'
GROUP BY customer_id;
-- Bracketed comments
SELECT /* this is a
multi-line comment with ; semicolon */
COUNT(*) FROM sales;Support for executing SQL files with comprehensive error handling and progress tracking.
/**
* Process SQL file with error handling
* @param fileName Path to SQL file to execute
* @return 0 on success, non-zero on error
*/
def processFile(fileName: String): Int = {
try {
val file = new File(fileName)
if (!file.exists()) {
throw new FileNotFoundException(s"File not found: $fileName")
}
// Process file line by line
Source.fromFile(file).getLines().foreach { line =>
if (!line.trim.startsWith("--") && line.trim.nonEmpty) {
val result = processLine(line, allowInterrupting = false)
if (result != 0) {
return result
}
}
}
0
} catch {
case e: FileNotFoundException =>
logError(s"Could not open input file for reading. (${e.getMessage})")
1
}
}File Processing Examples:
# Execute SQL file
spark-sql -f /path/to/setup.sql
# Execute with error handling
spark-sql -f /path/to/queries.sql --silentComprehensive configuration management supporting both Spark and Hive configuration parameters.
// Configuration sources and precedence:
// 1. Command line --hiveconf options
// 2. Hive configuration files
// 3. Spark configuration
// 4. Default values
// Set configuration during startup
val hiveConf = new HiveConf()
sessionState.cmdProperties.entrySet().asScala.foreach { item =>
val key = item.getKey.toString
val value = item.getValue.toString
hiveConf.set(key, value)
}
// Configuration parameter handling
def setConfMap(conf: SQLContext, confMap: java.util.Map[String, String]): Unit = {
confMap.asScala.foreach { case (key, value) =>
try {
conf.setConf(key, value)
} catch {
case e: Exception =>
println(s"Warning: Could not set $key = $value: ${e.getMessage}")
}
}
}Configuration Examples:
# Set configuration via command line
spark-sql --hiveconf spark.sql.adaptive.enabled=true \
--hiveconf hive.exec.dynamic.partition=true
# Set configuration in interactive mode
spark-sql> set spark.sql.shuffle.partitions=200;
spark-sql> set hive.exec.dynamic.partition.mode=nonstrict;
# Show current configuration
spark-sql> set;
spark-sql> set spark.sql.adaptive.enabled;Comprehensive error handling with proper recovery and user feedback.
/**
* Handle command execution errors with proper user feedback
*/
def handleCommandError(e: Throwable, command: String): Int = {
e match {
case st: SparkThrowable =>
val format = SparkSQLEnv.sqlContext.conf.errorMessageFormat
val message = SparkThrowableHelper.getMessage(st, format)
System.err.println(message)
// Print stack trace for non-user errors
if (format == ErrorMessageFormat.PRETTY &&
!sessionState.getIsSilent &&
(!e.isInstanceOf[AnalysisException] || e.getCause != null)) {
e.printStackTrace(System.err)
}
case _ =>
System.err.println(s"Error executing command: ${e.getMessage}")
if (!sessionState.getIsSilent) {
e.printStackTrace(System.err)
}
}
1 // Return error code
}
// Ignore errors mode
val ignoreErrors = HiveConf.getBoolVar(conf, HiveConf.ConfVars.CLIIGNOREERRORS)
if (commandResult != 0 && !ignoreErrors) {
return commandResult // Stop on first error
}Error Handling Examples:
# Error handling in interactive mode
spark-sql> SELECT * FROM non_existent_table;
Error in query: Table or view not found: non_existent_table
# Continue processing on errors
spark-sql --hiveconf hive.cli.errors.ignore=true -f queries_with_errors.sql
# Silent mode (suppress warnings)
spark-sql --silent -e "SELECT COUNT(*) FROM my_table"