or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

data-types.mdencoders.mderror-handling.mdindex.mdrow-operations.mdstreaming-operations.mdutilities.md
tile.json

error-handling.mddocs/

Error Handling

Structured exception handling with detailed error information for query analysis and execution. Provides comprehensive error reporting and debugging support for Spark SQL operations.

Capabilities

Analysis Exception

Main exception type for query analysis failures.

/**
 * Thrown when query analysis fails
 */
class AnalysisException extends Exception with SparkThrowable {
  /** Add position information to the exception */
  def withPosition(origin: Origin): AnalysisException
  
  /** Get message without logical plan details */
  def getSimpleMessage: String
}

/**
 * AnalysisException constructors
 */
object AnalysisException {
  // Primary constructor
  def apply(
    message: String,
    line: Option[Int] = None,
    startPosition: Option[Int] = None,
    errorClass: Option[String] = None,
    messageParameters: Map[String, String] = Map.empty,
    context: Array[QueryContext] = Array.empty
  ): AnalysisException
  
  // Alternative constructors
  def apply(
    errorClass: String,
    messageParameters: Map[String, String],
    cause: Option[Throwable]
  ): AnalysisException
  
  def apply(
    errorClass: String,
    messageParameters: Map[String, String],
    context: Array[QueryContext],
    summary: String
  ): AnalysisException
  
  def apply(
    errorClass: String,
    messageParameters: Map[String, String]
  ): AnalysisException
  
  def apply(
    errorClass: String,
    messageParameters: Map[String, String],
    origin: Origin
  ): AnalysisException
  
  def apply(
    errorClass: String,
    messageParameters: Map[String, String],
    origin: Origin,
    cause: Option[Throwable]
  ): AnalysisException
}

SparkThrowable Interface

Base interface for Spark-specific exceptions with structured error information.

/**
 * Base interface for Spark exceptions with structured error information
 */ 
trait SparkThrowable {
  /** Error class identifier */
  def getErrorClass: String
  
  /** Error message parameters */
  def getMessageParameters: java.util.Map[String, String]
  
  /** SQL state for the error */
  def getSqlState: String
  
  /** Query context information */
  def getQueryContext: Array[QueryContext]
}

Data Type Error Utilities

Centralized error handling utilities for data type operations.

/**
 * Centralized error handling for data type operations
 */
object DataTypeErrors {
  /** Decimal precision exceeds maximum allowed precision */
  def decimalPrecisionExceedsMaxPrecisionError(
    precision: Int, 
    maxPrecision: Int
  ): SparkArithmeticException
  
  /** Decimal value is out of range */
  def outOfDecimalTypeRangeError(str: UTF8String): SparkArithmeticException
  
  /** Schema parsing failed */
  def schemaFailToParseError(schema: String, e: Throwable): Throwable
  
  /** Cannot merge incompatible data types */
  def cannotMergeIncompatibleDataTypesError(
    left: DataType, 
    right: DataType
  ): Throwable
  
  /** Invalid field name access */
  def invalidFieldName(
    fieldName: Seq[String], 
    path: Seq[String], 
    context: Origin
  ): Throwable
  
  /** Ambiguous column or field reference */
  def ambiguousColumnOrFieldError(
    name: Seq[String], 
    numMatches: Int, 
    context: Origin
  ): Throwable
  
  /** Type casting causes overflow */
  def castingCauseOverflowError(
    t: String, 
    from: DataType, 
    to: DataType
  ): ArithmeticException
}

Query Context Information

Context information for error reporting.

/**
 * Query context information for error reporting
 */
case class QueryContext(
  objectType: String,
  objectName: String,
  startIndex: Int,
  stopIndex: Int,
  fragment: String
)

Usage Examples

Basic exception handling:

import org.apache.spark.sql.{AnalysisException, SparkThrowable}
import org.apache.spark.sql.types._

try {
  // Some operation that might fail analysis
  val schema = StructType.fromDDL("invalid ddl syntax here")
} catch {
  case ae: AnalysisException =>
    println(s"Analysis failed: ${ae.getMessage}")
    println(s"Error class: ${ae.getErrorClass}")
    println(s"Simple message: ${ae.getSimpleMessage}")
    
    // Check for position information
    ae.line.foreach(line => println(s"Error at line: $line"))
    ae.startPosition.foreach(pos => println(s"Error at position: $pos"))
    
  case st: SparkThrowable =>
    println(s"Spark error: ${st.getMessage}")
    println(s"SQL state: ${st.getSqlState}")
    
  case ex: Exception =>
    println(s"General error: ${ex.getMessage}")
}

Working with error context:

def handleAnalysisException(ae: AnalysisException): Unit = {
  println(s"Analysis Exception Details:")
  println(s"Message: ${ae.getMessage}")
  
  // Error classification
  ae.errorClass.foreach { errorClass =>
    println(s"Error Class: $errorClass")
  }
  
  // Message parameters
  if (ae.messageParameters.nonEmpty) {
    println("Message Parameters:")
    ae.messageParameters.foreach { case (key, value) =>
      println(s"  $key: $value")
    }
  }
  
  // Query context
  if (ae.context.nonEmpty) {
    println("Query Context:")
    ae.context.foreach { ctx =>
      println(s"  Object: ${ctx.objectType}.${ctx.objectName}")
      println(s"  Position: ${ctx.startIndex}-${ctx.stopIndex}")
      println(s"  Fragment: ${ctx.fragment}")
    }
  }
  
  // Position information
  (ae.line, ae.startPosition) match {
    case (Some(line), Some(pos)) =>
      println(s"Error at line $line, position $pos")
    case (Some(line), None) =>
      println(s"Error at line $line")
    case (None, Some(pos)) =>
      println(s"Error at position $pos")
    case _ =>
      println("No position information available")
  }
}

Data type error handling:

import org.apache.spark.sql.errors.DataTypeErrors
import org.apache.spark.sql.types._

def handleDataTypeOperations(): Unit = {
  try {
    // Attempt to create invalid decimal type
    val invalidDecimal = DecimalType(50, 10) // Exceeds max precision
  } catch {
    case ex: Exception =>
      println(s"Decimal creation failed: ${ex.getMessage}")
  }
  
  try {
    // Attempt to merge incompatible types
    val stringType = StringType
    val intType = IntegerType
    // Some operation that tries to merge these incompatible types
  } catch {
    case ex: Exception =>
      println(s"Type merge failed: ${ex.getMessage}")
  }
}

// Using data type error utilities
def createCustomDataTypeError(): Unit = {
  // Create precision exceeded error
  val precisionError = DataTypeErrors.decimalPrecisionExceedsMaxPrecisionError(50, 38)
  println(s"Precision error: ${precisionError.getMessage}")
  
  // Create incompatible types error
  val typeError = DataTypeErrors.cannotMergeIncompatibleDataTypesError(StringType, IntegerType)
  println(s"Type error: ${typeError.getMessage}")
}

Schema validation with error handling:

def validateSchema(ddlString: String): Either[AnalysisException, StructType] = {
  try {
    val schema = StructType.fromDDL(ddlString)
    Right(schema)
  } catch {
    case ae: AnalysisException =>
      Left(ae)
  }
}

// Usage
val validDDL = "id BIGINT, name STRING, age INT"
val invalidDDL = "id BIGINT INVALID SYNTAX name STRING"

validateSchema(validDDL) match {
  case Right(schema) =>
    println(s"Valid schema: ${schema.treeString}")
  case Left(error) =>
    println(s"Schema validation failed: ${error.getSimpleMessage}")
}

validateSchema(invalidDDL) match {
  case Right(schema) =>
    println(s"Valid schema: ${schema.treeString}")
  case Left(error) =>
    println(s"Schema validation failed: ${error.getSimpleMessage}")
    handleAnalysisException(error)
}

Row access error handling:

import org.apache.spark.sql.Row

def safeRowAccess(row: Row): Unit = {
  try {
    // Safe field access with bounds checking
    if (row.length > 0) {
      val firstField = row.get(0)
      println(s"First field: $firstField")
    }
    
    // Type-safe access with error handling
    try {
      val name = row.getAs[String]("name")
      println(s"Name: $name")
    } catch {
      case _: IllegalArgumentException =>
        println("Field 'name' not found in row")
      case _: ClassCastException =>
        println("Field 'name' is not a String type")
    }
    
  } catch {
    case _: IndexOutOfBoundsException =>
      println("Row index out of bounds")
    case ex: Exception =>
      println(s"Unexpected error accessing row: ${ex.getMessage}")
  }
}

// Robust row processing
def processRowSafely(row: Row): Map[String, Any] = {
  val result = scala.collection.mutable.Map[String, Any]()
  
  try {
    // Get schema if available
    val schema = row.schema
    if (schema != null) {
      schema.fields.zipWithIndex.foreach { case (field, index) =>
        try {
          if (!row.isNullAt(index)) {
            val value = field.dataType match {
              case StringType => row.getString(index)
              case IntegerType => row.getInt(index)
              case LongType => row.getLong(index)
              case DoubleType => row.getDouble(index)
              case BooleanType => row.getBoolean(index)
              case _ => row.get(index) // Generic access for other types
            }
            result(field.name) = value
          } else {
            result(field.name) = null
          }
        } catch {
          case ex: Exception =>
            println(s"Error accessing field ${field.name}: ${ex.getMessage}")
            result(field.name) = s"ERROR: ${ex.getMessage}"
        }
      }
    } else {
      // No schema available, process by index
      (0 until row.length).foreach { index =>
        try {
          result(s"field_$index") = row.get(index)
        } catch {
          case ex: Exception =>
            println(s"Error accessing field at index $index: ${ex.getMessage}")
            result(s"field_$index") = s"ERROR: ${ex.getMessage}"
        }
      }
    }
  } catch {
    case ex: Exception =>
      println(s"Error processing row: ${ex.getMessage}")
  }
  
  result.toMap
}

Exception chaining and context:

def chainExceptions(): Unit = {
  try {
    // Some complex operation that might fail at multiple levels
    performComplexAnalysis()
  } catch {
    case ae: AnalysisException =>
      // Add additional context to the exception
      val contextualException = new AnalysisException(
        s"Failed during complex analysis: ${ae.getMessage}",
        ae.line,
        ae.startPosition,
        ae.errorClass,
        ae.messageParameters + ("operation" -> "complex_analysis"),
        ae.context
      )
      
      // Could re-throw with additional context
      throw contextualException
      
    case ex: Exception =>
      // Wrap in AnalysisException with context
      throw new AnalysisException(
        s"Unexpected error during analysis: ${ex.getMessage}",
        errorClass = Some("UNEXPECTED_ANALYSIS_ERROR"),
        messageParameters = Map(
          "original_error" -> ex.getClass.getSimpleName,
          "original_message" -> ex.getMessage
        )
      )
  }
}

def performComplexAnalysis(): Unit = {
  // Simulate some operation that could fail
  throw new RuntimeException("Simulated failure")
}

Error recovery patterns:

def robustSchemaProcessing(ddlStrings: List[String]): List[(String, Either[String, StructType])] = {
  ddlStrings.map { ddl =>
    try {
      val schema = StructType.fromDDL(ddl)
      ddl -> Right(schema)
    } catch {
      case ae: AnalysisException =>
        ddl -> Left(s"Analysis error: ${ae.getSimpleMessage}")
      case ex: Exception =>
        ddl -> Left(s"Unexpected error: ${ex.getMessage}")
    }
  }
}

// Usage with error recovery
val ddlList = List(
  "id BIGINT, name STRING",           // Valid
  "invalid syntax here",              // Invalid
  "score DOUBLE, active BOOLEAN"      // Valid
)

val results = robustSchemaProcessing(ddlList)
results.foreach {
  case (ddl, Right(schema)) =>
    println(s"SUCCESS: $ddl")
    println(s"Schema: ${schema.simpleString}")
  case (ddl, Left(error)) =>
    println(s"FAILED: $ddl")
    println(s"Error: $error")
}