or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-types.mdencoders.mderror-handling.mdindex.mdrow-operations.mdstreaming-operations.mdutilities.md

error-handling.mddocs/

0

# Error Handling

1

2

Structured exception handling with detailed error information for query analysis and execution. Provides comprehensive error reporting and debugging support for Spark SQL operations.

3

4

## Capabilities

5

6

### Analysis Exception

7

8

Main exception type for query analysis failures.

9

10

```scala { .api }

11

/**

12

* Thrown when query analysis fails

13

*/

14

class AnalysisException extends Exception with SparkThrowable {

15

/** Add position information to the exception */

16

def withPosition(origin: Origin): AnalysisException

17

18

/** Get message without logical plan details */

19

def getSimpleMessage: String

20

}

21

22

/**

23

* AnalysisException constructors

24

*/

25

object AnalysisException {

26

// Primary constructor

27

def apply(

28

message: String,

29

line: Option[Int] = None,

30

startPosition: Option[Int] = None,

31

errorClass: Option[String] = None,

32

messageParameters: Map[String, String] = Map.empty,

33

context: Array[QueryContext] = Array.empty

34

): AnalysisException

35

36

// Alternative constructors

37

def apply(

38

errorClass: String,

39

messageParameters: Map[String, String],

40

cause: Option[Throwable]

41

): AnalysisException

42

43

def apply(

44

errorClass: String,

45

messageParameters: Map[String, String],

46

context: Array[QueryContext],

47

summary: String

48

): AnalysisException

49

50

def apply(

51

errorClass: String,

52

messageParameters: Map[String, String]

53

): AnalysisException

54

55

def apply(

56

errorClass: String,

57

messageParameters: Map[String, String],

58

origin: Origin

59

): AnalysisException

60

61

def apply(

62

errorClass: String,

63

messageParameters: Map[String, String],

64

origin: Origin,

65

cause: Option[Throwable]

66

): AnalysisException

67

}

68

```

69

70

### SparkThrowable Interface

71

72

Base interface for Spark-specific exceptions with structured error information.

73

74

```scala { .api }

75

/**

76

* Base interface for Spark exceptions with structured error information

77

*/

78

trait SparkThrowable {

79

/** Error class identifier */

80

def getErrorClass: String

81

82

/** Error message parameters */

83

def getMessageParameters: java.util.Map[String, String]

84

85

/** SQL state for the error */

86

def getSqlState: String

87

88

/** Query context information */

89

def getQueryContext: Array[QueryContext]

90

}

91

```

92

93

### Data Type Error Utilities

94

95

Centralized error handling utilities for data type operations.

96

97

```scala { .api }

98

/**

99

* Centralized error handling for data type operations

100

*/

101

object DataTypeErrors {

102

/** Decimal precision exceeds maximum allowed precision */

103

def decimalPrecisionExceedsMaxPrecisionError(

104

precision: Int,

105

maxPrecision: Int

106

): SparkArithmeticException

107

108

/** Decimal value is out of range */

109

def outOfDecimalTypeRangeError(str: UTF8String): SparkArithmeticException

110

111

/** Schema parsing failed */

112

def schemaFailToParseError(schema: String, e: Throwable): Throwable

113

114

/** Cannot merge incompatible data types */

115

def cannotMergeIncompatibleDataTypesError(

116

left: DataType,

117

right: DataType

118

): Throwable

119

120

/** Invalid field name access */

121

def invalidFieldName(

122

fieldName: Seq[String],

123

path: Seq[String],

124

context: Origin

125

): Throwable

126

127

/** Ambiguous column or field reference */

128

def ambiguousColumnOrFieldError(

129

name: Seq[String],

130

numMatches: Int,

131

context: Origin

132

): Throwable

133

134

/** Type casting causes overflow */

135

def castingCauseOverflowError(

136

t: String,

137

from: DataType,

138

to: DataType

139

): ArithmeticException

140

}

141

```

142

143

### Query Context Information

144

145

Context information for error reporting.

146

147

```scala { .api }

148

/**

149

* Query context information for error reporting

150

*/

151

case class QueryContext(

152

objectType: String,

153

objectName: String,

154

startIndex: Int,

155

stopIndex: Int,

156

fragment: String

157

)

158

```

159

160

## Usage Examples

161

162

**Basic exception handling:**

163

164

```scala

165

import org.apache.spark.sql.{AnalysisException, SparkThrowable}

166

import org.apache.spark.sql.types._

167

168

try {

169

// Some operation that might fail analysis

170

val schema = StructType.fromDDL("invalid ddl syntax here")

171

} catch {

172

case ae: AnalysisException =>

173

println(s"Analysis failed: ${ae.getMessage}")

174

println(s"Error class: ${ae.getErrorClass}")

175

println(s"Simple message: ${ae.getSimpleMessage}")

176

177

// Check for position information

178

ae.line.foreach(line => println(s"Error at line: $line"))

179

ae.startPosition.foreach(pos => println(s"Error at position: $pos"))

180

181

case st: SparkThrowable =>

182

println(s"Spark error: ${st.getMessage}")

183

println(s"SQL state: ${st.getSqlState}")

184

185

case ex: Exception =>

186

println(s"General error: ${ex.getMessage}")

187

}

188

```

189

190

**Working with error context:**

191

192

```scala

193

def handleAnalysisException(ae: AnalysisException): Unit = {

194

println(s"Analysis Exception Details:")

195

println(s"Message: ${ae.getMessage}")

196

197

// Error classification

198

ae.errorClass.foreach { errorClass =>

199

println(s"Error Class: $errorClass")

200

}

201

202

// Message parameters

203

if (ae.messageParameters.nonEmpty) {

204

println("Message Parameters:")

205

ae.messageParameters.foreach { case (key, value) =>

206

println(s" $key: $value")

207

}

208

}

209

210

// Query context

211

if (ae.context.nonEmpty) {

212

println("Query Context:")

213

ae.context.foreach { ctx =>

214

println(s" Object: ${ctx.objectType}.${ctx.objectName}")

215

println(s" Position: ${ctx.startIndex}-${ctx.stopIndex}")

216

println(s" Fragment: ${ctx.fragment}")

217

}

218

}

219

220

// Position information

221

(ae.line, ae.startPosition) match {

222

case (Some(line), Some(pos)) =>

223

println(s"Error at line $line, position $pos")

224

case (Some(line), None) =>

225

println(s"Error at line $line")

226

case (None, Some(pos)) =>

227

println(s"Error at position $pos")

228

case _ =>

229

println("No position information available")

230

}

231

}

232

```

233

234

**Data type error handling:**

235

236

```scala

237

import org.apache.spark.sql.errors.DataTypeErrors

238

import org.apache.spark.sql.types._

239

240

def handleDataTypeOperations(): Unit = {

241

try {

242

// Attempt to create invalid decimal type

243

val invalidDecimal = DecimalType(50, 10) // Exceeds max precision

244

} catch {

245

case ex: Exception =>

246

println(s"Decimal creation failed: ${ex.getMessage}")

247

}

248

249

try {

250

// Attempt to merge incompatible types

251

val stringType = StringType

252

val intType = IntegerType

253

// Some operation that tries to merge these incompatible types

254

} catch {

255

case ex: Exception =>

256

println(s"Type merge failed: ${ex.getMessage}")

257

}

258

}

259

260

// Using data type error utilities

261

def createCustomDataTypeError(): Unit = {

262

// Create precision exceeded error

263

val precisionError = DataTypeErrors.decimalPrecisionExceedsMaxPrecisionError(50, 38)

264

println(s"Precision error: ${precisionError.getMessage}")

265

266

// Create incompatible types error

267

val typeError = DataTypeErrors.cannotMergeIncompatibleDataTypesError(StringType, IntegerType)

268

println(s"Type error: ${typeError.getMessage}")

269

}

270

```

271

272

**Schema validation with error handling:**

273

274

```scala

275

def validateSchema(ddlString: String): Either[AnalysisException, StructType] = {

276

try {

277

val schema = StructType.fromDDL(ddlString)

278

Right(schema)

279

} catch {

280

case ae: AnalysisException =>

281

Left(ae)

282

}

283

}

284

285

// Usage

286

val validDDL = "id BIGINT, name STRING, age INT"

287

val invalidDDL = "id BIGINT INVALID SYNTAX name STRING"

288

289

validateSchema(validDDL) match {

290

case Right(schema) =>

291

println(s"Valid schema: ${schema.treeString}")

292

case Left(error) =>

293

println(s"Schema validation failed: ${error.getSimpleMessage}")

294

}

295

296

validateSchema(invalidDDL) match {

297

case Right(schema) =>

298

println(s"Valid schema: ${schema.treeString}")

299

case Left(error) =>

300

println(s"Schema validation failed: ${error.getSimpleMessage}")

301

handleAnalysisException(error)

302

}

303

```

304

305

**Row access error handling:**

306

307

```scala

308

import org.apache.spark.sql.Row

309

310

def safeRowAccess(row: Row): Unit = {

311

try {

312

// Safe field access with bounds checking

313

if (row.length > 0) {

314

val firstField = row.get(0)

315

println(s"First field: $firstField")

316

}

317

318

// Type-safe access with error handling

319

try {

320

val name = row.getAs[String]("name")

321

println(s"Name: $name")

322

} catch {

323

case _: IllegalArgumentException =>

324

println("Field 'name' not found in row")

325

case _: ClassCastException =>

326

println("Field 'name' is not a String type")

327

}

328

329

} catch {

330

case _: IndexOutOfBoundsException =>

331

println("Row index out of bounds")

332

case ex: Exception =>

333

println(s"Unexpected error accessing row: ${ex.getMessage}")

334

}

335

}

336

337

// Robust row processing

338

def processRowSafely(row: Row): Map[String, Any] = {

339

val result = scala.collection.mutable.Map[String, Any]()

340

341

try {

342

// Get schema if available

343

val schema = row.schema

344

if (schema != null) {

345

schema.fields.zipWithIndex.foreach { case (field, index) =>

346

try {

347

if (!row.isNullAt(index)) {

348

val value = field.dataType match {

349

case StringType => row.getString(index)

350

case IntegerType => row.getInt(index)

351

case LongType => row.getLong(index)

352

case DoubleType => row.getDouble(index)

353

case BooleanType => row.getBoolean(index)

354

case _ => row.get(index) // Generic access for other types

355

}

356

result(field.name) = value

357

} else {

358

result(field.name) = null

359

}

360

} catch {

361

case ex: Exception =>

362

println(s"Error accessing field ${field.name}: ${ex.getMessage}")

363

result(field.name) = s"ERROR: ${ex.getMessage}"

364

}

365

}

366

} else {

367

// No schema available, process by index

368

(0 until row.length).foreach { index =>

369

try {

370

result(s"field_$index") = row.get(index)

371

} catch {

372

case ex: Exception =>

373

println(s"Error accessing field at index $index: ${ex.getMessage}")

374

result(s"field_$index") = s"ERROR: ${ex.getMessage}"

375

}

376

}

377

}

378

} catch {

379

case ex: Exception =>

380

println(s"Error processing row: ${ex.getMessage}")

381

}

382

383

result.toMap

384

}

385

```

386

387

**Exception chaining and context:**

388

389

```scala

390

def chainExceptions(): Unit = {

391

try {

392

// Some complex operation that might fail at multiple levels

393

performComplexAnalysis()

394

} catch {

395

case ae: AnalysisException =>

396

// Add additional context to the exception

397

val contextualException = new AnalysisException(

398

s"Failed during complex analysis: ${ae.getMessage}",

399

ae.line,

400

ae.startPosition,

401

ae.errorClass,

402

ae.messageParameters + ("operation" -> "complex_analysis"),

403

ae.context

404

)

405

406

// Could re-throw with additional context

407

throw contextualException

408

409

case ex: Exception =>

410

// Wrap in AnalysisException with context

411

throw new AnalysisException(

412

s"Unexpected error during analysis: ${ex.getMessage}",

413

errorClass = Some("UNEXPECTED_ANALYSIS_ERROR"),

414

messageParameters = Map(

415

"original_error" -> ex.getClass.getSimpleName,

416

"original_message" -> ex.getMessage

417

)

418

)

419

}

420

}

421

422

def performComplexAnalysis(): Unit = {

423

// Simulate some operation that could fail

424

throw new RuntimeException("Simulated failure")

425

}

426

```

427

428

**Error recovery patterns:**

429

430

```scala

431

def robustSchemaProcessing(ddlStrings: List[String]): List[(String, Either[String, StructType])] = {

432

ddlStrings.map { ddl =>

433

try {

434

val schema = StructType.fromDDL(ddl)

435

ddl -> Right(schema)

436

} catch {

437

case ae: AnalysisException =>

438

ddl -> Left(s"Analysis error: ${ae.getSimpleMessage}")

439

case ex: Exception =>

440

ddl -> Left(s"Unexpected error: ${ex.getMessage}")

441

}

442

}

443

}

444

445

// Usage with error recovery

446

val ddlList = List(

447

"id BIGINT, name STRING", // Valid

448

"invalid syntax here", // Invalid

449

"score DOUBLE, active BOOLEAN" // Valid

450

)

451

452

val results = robustSchemaProcessing(ddlList)

453

results.foreach {

454

case (ddl, Right(schema)) =>

455

println(s"SUCCESS: $ddl")

456

println(s"Schema: ${schema.simpleString}")

457

case (ddl, Left(error)) =>

458

println(s"FAILED: $ddl")

459

println(s"Error: $error")

460

}

461

```