Structured exception handling with detailed error information for query analysis and execution. Provides comprehensive error reporting and debugging support for Spark SQL operations.
Main exception type for query analysis failures.
/**
* Thrown when query analysis fails
*/
class AnalysisException extends Exception with SparkThrowable {
/** Add position information to the exception */
def withPosition(origin: Origin): AnalysisException
/** Get message without logical plan details */
def getSimpleMessage: String
}
/**
* AnalysisException constructors
*/
object AnalysisException {
// Primary constructor
def apply(
message: String,
line: Option[Int] = None,
startPosition: Option[Int] = None,
errorClass: Option[String] = None,
messageParameters: Map[String, String] = Map.empty,
context: Array[QueryContext] = Array.empty
): AnalysisException
// Alternative constructors
def apply(
errorClass: String,
messageParameters: Map[String, String],
cause: Option[Throwable]
): AnalysisException
def apply(
errorClass: String,
messageParameters: Map[String, String],
context: Array[QueryContext],
summary: String
): AnalysisException
def apply(
errorClass: String,
messageParameters: Map[String, String]
): AnalysisException
def apply(
errorClass: String,
messageParameters: Map[String, String],
origin: Origin
): AnalysisException
def apply(
errorClass: String,
messageParameters: Map[String, String],
origin: Origin,
cause: Option[Throwable]
): AnalysisException
}Base interface for Spark-specific exceptions with structured error information.
/**
* Base interface for Spark exceptions with structured error information
*/
trait SparkThrowable {
/** Error class identifier */
def getErrorClass: String
/** Error message parameters */
def getMessageParameters: java.util.Map[String, String]
/** SQL state for the error */
def getSqlState: String
/** Query context information */
def getQueryContext: Array[QueryContext]
}Centralized error handling utilities for data type operations.
/**
* Centralized error handling for data type operations
*/
object DataTypeErrors {
/** Decimal precision exceeds maximum allowed precision */
def decimalPrecisionExceedsMaxPrecisionError(
precision: Int,
maxPrecision: Int
): SparkArithmeticException
/** Decimal value is out of range */
def outOfDecimalTypeRangeError(str: UTF8String): SparkArithmeticException
/** Schema parsing failed */
def schemaFailToParseError(schema: String, e: Throwable): Throwable
/** Cannot merge incompatible data types */
def cannotMergeIncompatibleDataTypesError(
left: DataType,
right: DataType
): Throwable
/** Invalid field name access */
def invalidFieldName(
fieldName: Seq[String],
path: Seq[String],
context: Origin
): Throwable
/** Ambiguous column or field reference */
def ambiguousColumnOrFieldError(
name: Seq[String],
numMatches: Int,
context: Origin
): Throwable
/** Type casting causes overflow */
def castingCauseOverflowError(
t: String,
from: DataType,
to: DataType
): ArithmeticException
}Context information for error reporting.
/**
* Query context information for error reporting
*/
case class QueryContext(
objectType: String,
objectName: String,
startIndex: Int,
stopIndex: Int,
fragment: String
)Basic exception handling:
import org.apache.spark.sql.{AnalysisException, SparkThrowable}
import org.apache.spark.sql.types._
try {
// Some operation that might fail analysis
val schema = StructType.fromDDL("invalid ddl syntax here")
} catch {
case ae: AnalysisException =>
println(s"Analysis failed: ${ae.getMessage}")
println(s"Error class: ${ae.getErrorClass}")
println(s"Simple message: ${ae.getSimpleMessage}")
// Check for position information
ae.line.foreach(line => println(s"Error at line: $line"))
ae.startPosition.foreach(pos => println(s"Error at position: $pos"))
case st: SparkThrowable =>
println(s"Spark error: ${st.getMessage}")
println(s"SQL state: ${st.getSqlState}")
case ex: Exception =>
println(s"General error: ${ex.getMessage}")
}Working with error context:
def handleAnalysisException(ae: AnalysisException): Unit = {
println(s"Analysis Exception Details:")
println(s"Message: ${ae.getMessage}")
// Error classification
ae.errorClass.foreach { errorClass =>
println(s"Error Class: $errorClass")
}
// Message parameters
if (ae.messageParameters.nonEmpty) {
println("Message Parameters:")
ae.messageParameters.foreach { case (key, value) =>
println(s" $key: $value")
}
}
// Query context
if (ae.context.nonEmpty) {
println("Query Context:")
ae.context.foreach { ctx =>
println(s" Object: ${ctx.objectType}.${ctx.objectName}")
println(s" Position: ${ctx.startIndex}-${ctx.stopIndex}")
println(s" Fragment: ${ctx.fragment}")
}
}
// Position information
(ae.line, ae.startPosition) match {
case (Some(line), Some(pos)) =>
println(s"Error at line $line, position $pos")
case (Some(line), None) =>
println(s"Error at line $line")
case (None, Some(pos)) =>
println(s"Error at position $pos")
case _ =>
println("No position information available")
}
}Data type error handling:
import org.apache.spark.sql.errors.DataTypeErrors
import org.apache.spark.sql.types._
def handleDataTypeOperations(): Unit = {
try {
// Attempt to create invalid decimal type
val invalidDecimal = DecimalType(50, 10) // Exceeds max precision
} catch {
case ex: Exception =>
println(s"Decimal creation failed: ${ex.getMessage}")
}
try {
// Attempt to merge incompatible types
val stringType = StringType
val intType = IntegerType
// Some operation that tries to merge these incompatible types
} catch {
case ex: Exception =>
println(s"Type merge failed: ${ex.getMessage}")
}
}
// Using data type error utilities
def createCustomDataTypeError(): Unit = {
// Create precision exceeded error
val precisionError = DataTypeErrors.decimalPrecisionExceedsMaxPrecisionError(50, 38)
println(s"Precision error: ${precisionError.getMessage}")
// Create incompatible types error
val typeError = DataTypeErrors.cannotMergeIncompatibleDataTypesError(StringType, IntegerType)
println(s"Type error: ${typeError.getMessage}")
}Schema validation with error handling:
def validateSchema(ddlString: String): Either[AnalysisException, StructType] = {
try {
val schema = StructType.fromDDL(ddlString)
Right(schema)
} catch {
case ae: AnalysisException =>
Left(ae)
}
}
// Usage
val validDDL = "id BIGINT, name STRING, age INT"
val invalidDDL = "id BIGINT INVALID SYNTAX name STRING"
validateSchema(validDDL) match {
case Right(schema) =>
println(s"Valid schema: ${schema.treeString}")
case Left(error) =>
println(s"Schema validation failed: ${error.getSimpleMessage}")
}
validateSchema(invalidDDL) match {
case Right(schema) =>
println(s"Valid schema: ${schema.treeString}")
case Left(error) =>
println(s"Schema validation failed: ${error.getSimpleMessage}")
handleAnalysisException(error)
}Row access error handling:
import org.apache.spark.sql.Row
def safeRowAccess(row: Row): Unit = {
try {
// Safe field access with bounds checking
if (row.length > 0) {
val firstField = row.get(0)
println(s"First field: $firstField")
}
// Type-safe access with error handling
try {
val name = row.getAs[String]("name")
println(s"Name: $name")
} catch {
case _: IllegalArgumentException =>
println("Field 'name' not found in row")
case _: ClassCastException =>
println("Field 'name' is not a String type")
}
} catch {
case _: IndexOutOfBoundsException =>
println("Row index out of bounds")
case ex: Exception =>
println(s"Unexpected error accessing row: ${ex.getMessage}")
}
}
// Robust row processing
def processRowSafely(row: Row): Map[String, Any] = {
val result = scala.collection.mutable.Map[String, Any]()
try {
// Get schema if available
val schema = row.schema
if (schema != null) {
schema.fields.zipWithIndex.foreach { case (field, index) =>
try {
if (!row.isNullAt(index)) {
val value = field.dataType match {
case StringType => row.getString(index)
case IntegerType => row.getInt(index)
case LongType => row.getLong(index)
case DoubleType => row.getDouble(index)
case BooleanType => row.getBoolean(index)
case _ => row.get(index) // Generic access for other types
}
result(field.name) = value
} else {
result(field.name) = null
}
} catch {
case ex: Exception =>
println(s"Error accessing field ${field.name}: ${ex.getMessage}")
result(field.name) = s"ERROR: ${ex.getMessage}"
}
}
} else {
// No schema available, process by index
(0 until row.length).foreach { index =>
try {
result(s"field_$index") = row.get(index)
} catch {
case ex: Exception =>
println(s"Error accessing field at index $index: ${ex.getMessage}")
result(s"field_$index") = s"ERROR: ${ex.getMessage}"
}
}
}
} catch {
case ex: Exception =>
println(s"Error processing row: ${ex.getMessage}")
}
result.toMap
}Exception chaining and context:
def chainExceptions(): Unit = {
try {
// Some complex operation that might fail at multiple levels
performComplexAnalysis()
} catch {
case ae: AnalysisException =>
// Add additional context to the exception
val contextualException = new AnalysisException(
s"Failed during complex analysis: ${ae.getMessage}",
ae.line,
ae.startPosition,
ae.errorClass,
ae.messageParameters + ("operation" -> "complex_analysis"),
ae.context
)
// Could re-throw with additional context
throw contextualException
case ex: Exception =>
// Wrap in AnalysisException with context
throw new AnalysisException(
s"Unexpected error during analysis: ${ex.getMessage}",
errorClass = Some("UNEXPECTED_ANALYSIS_ERROR"),
messageParameters = Map(
"original_error" -> ex.getClass.getSimpleName,
"original_message" -> ex.getMessage
)
)
}
}
def performComplexAnalysis(): Unit = {
// Simulate some operation that could fail
throw new RuntimeException("Simulated failure")
}Error recovery patterns:
def robustSchemaProcessing(ddlStrings: List[String]): List[(String, Either[String, StructType])] = {
ddlStrings.map { ddl =>
try {
val schema = StructType.fromDDL(ddl)
ddl -> Right(schema)
} catch {
case ae: AnalysisException =>
ddl -> Left(s"Analysis error: ${ae.getSimpleMessage}")
case ex: Exception =>
ddl -> Left(s"Unexpected error: ${ex.getMessage}")
}
}
}
// Usage with error recovery
val ddlList = List(
"id BIGINT, name STRING", // Valid
"invalid syntax here", // Invalid
"score DOUBLE, active BOOLEAN" // Valid
)
val results = robustSchemaProcessing(ddlList)
results.foreach {
case (ddl, Right(schema)) =>
println(s"SUCCESS: $ddl")
println(s"Schema: ${schema.simpleString}")
case (ddl, Left(error)) =>
println(s"FAILED: $ddl")
println(s"Error: $error")
}