0
# Analysis
1
2
Semantic analysis system for resolving unresolved references, type checking, and plan validation in Catalyst query plans.
3
4
## Capabilities
5
6
### Analyzer Class
7
8
The main analyzer for logical plan analysis and resolution.
9
10
```scala { .api }
11
/**
12
* Logical plan analyzer that resolves unresolved relations and expressions
13
*/
14
class Analyzer(
15
catalog: Catalog,
16
registry: FunctionRegistry,
17
conf: CatalystConf,
18
maxIterations: Int = 100) extends RuleExecutor[LogicalPlan] {
19
20
/**
21
* Analyze a logical plan by applying resolution rules
22
*/
23
def execute(plan: LogicalPlan): LogicalPlan
24
25
/** Sequence of rule batches for analysis */
26
def batches: Seq[Batch]
27
}
28
```
29
30
**Usage Examples:**
31
32
```scala
33
import org.apache.spark.sql.catalyst.analysis._
34
import org.apache.spark.sql.catalyst.plans.logical._
35
import org.apache.spark.sql.catalyst.expressions._
36
37
// Create analyzer components
38
val catalog = new SimpleCatalog()
39
val functionRegistry = FunctionRegistry.builtin
40
val conf = new SimpleCatalystConf()
41
42
// Create analyzer
43
val analyzer = new Analyzer(catalog, functionRegistry, conf)
44
45
// Analyze an unresolved plan
46
val unresolvedPlan = Project(
47
Seq(UnresolvedAttribute("name")),
48
UnresolvedRelation(TableIdentifier("users"))
49
)
50
51
val analyzedPlan = analyzer.execute(unresolvedPlan)
52
// Result: resolved plan with concrete attribute references and relations
53
```
54
55
### Catalog Interface
56
57
Interface for looking up relations, functions, and metadata.
58
59
```scala { .api }
60
/**
61
* Interface for looking up relations, functions, and types
62
*/
63
trait Catalog {
64
/**
65
* Look up a relation by name
66
*/
67
def lookupRelation(name: Seq[String]): LogicalPlan
68
69
/**
70
* Check if a function exists
71
*/
72
def functionExists(name: String): Boolean
73
74
/**
75
* Look up a function by name and create expression
76
*/
77
def lookupFunction(name: String, children: Seq[Expression]): Expression
78
79
/**
80
* Get table metadata
81
*/
82
def getTables(databaseName: Option[String]): Seq[(String, Boolean)]
83
84
/**
85
* Check if relation exists
86
*/
87
def tableExists(name: Seq[String]): Boolean
88
}
89
90
/**
91
* Simple in-memory catalog implementation
92
*/
93
class SimpleCatalog extends Catalog {
94
private val tables = scala.collection.mutable.HashMap[String, LogicalPlan]()
95
96
def registerTable(name: String, plan: LogicalPlan): Unit = {
97
tables(name) = plan
98
}
99
100
override def lookupRelation(name: Seq[String]): LogicalPlan = {
101
val tableName = name.mkString(".")
102
tables.getOrElse(tableName, throw new AnalysisException(s"Table not found: $tableName"))
103
}
104
}
105
```
106
107
**Usage Examples:**
108
109
```scala
110
import org.apache.spark.sql.catalyst.analysis._
111
import org.apache.spark.sql.catalyst.plans.logical._
112
import org.apache.spark.sql.catalyst.expressions._
113
import org.apache.spark.sql.types._
114
115
// Create and populate catalog
116
val catalog = new SimpleCatalog()
117
118
val userTable = LocalRelation(
119
AttributeReference("id", IntegerType, false)(),
120
AttributeReference("name", StringType, true)(),
121
AttributeReference("age", IntegerType, true)()
122
)
123
124
catalog.registerTable("users", userTable)
125
126
// Look up relations
127
val relation = catalog.lookupRelation(Seq("users"))
128
val exists = catalog.tableExists(Seq("users")) // true
129
val missing = catalog.tableExists(Seq("products")) // false
130
131
// Check functions
132
val functionExists = catalog.functionExists("upper") // depends on implementation
133
```
134
135
### Function Registry
136
137
Registry for SQL functions and user-defined functions.
138
139
```scala { .api }
140
/**
141
* Registry for SQL functions
142
*/
143
trait FunctionRegistry {
144
/**
145
* Register a function
146
*/
147
def registerFunction(
148
name: String,
149
info: ExpressionInfo,
150
builder: Seq[Expression] => Expression): Unit
151
152
/**
153
* Look up a function and create expression
154
*/
155
def lookupFunction(name: String, children: Seq[Expression]): Expression
156
157
/**
158
* Check if function exists
159
*/
160
def functionExists(name: String): Boolean
161
162
/**
163
* List all registered functions
164
*/
165
def listFunction(): Seq[String]
166
}
167
168
/**
169
* Built-in function registry with standard SQL functions
170
*/
171
object FunctionRegistry {
172
/** Get built-in function registry */
173
def builtin: FunctionRegistry
174
175
/** Create empty function registry */
176
def empty: FunctionRegistry
177
}
178
179
/**
180
* Function information for registry
181
*/
182
case class ExpressionInfo(
183
className: String,
184
name: String,
185
usage: String,
186
extended: String = "") {
187
188
def getUsage(): String = if (usage != null) usage else ""
189
def getExtended(): String = if (extended != null) extended else ""
190
}
191
```
192
193
**Usage Examples:**
194
195
```scala
196
import org.apache.spark.sql.catalyst.analysis._
197
import org.apache.spark.sql.catalyst.expressions._
198
import org.apache.spark.sql.types._
199
200
// Get built-in registry
201
val registry = FunctionRegistry.builtin
202
203
// Check if functions exist
204
val hasUpper = registry.functionExists("upper") // true
205
val hasCustom = registry.functionExists("my_func") // false
206
207
// Look up and create function expressions
208
val upperExpr = registry.lookupFunction("upper",
209
Seq(Literal("hello", StringType)))
210
211
val concatExpr = registry.lookupFunction("concat",
212
Seq(Literal("Hello ", StringType), Literal("World", StringType)))
213
214
// Register custom function
215
val customRegistry = FunctionRegistry.empty
216
val customInfo = ExpressionInfo(
217
"com.example.MyFunction",
218
"my_func",
219
"my_func(str) - Returns custom transformation of string"
220
)
221
222
customRegistry.registerFunction("my_func", customInfo, { expressions =>
223
// Custom function builder
224
MyCustomExpression(expressions.head)
225
})
226
227
// List all functions
228
val allFunctions = registry.listFunction() // Seq of function names
229
```
230
231
### Type Checking and Validation
232
233
Type checking results and validation utilities.
234
235
```scala { .api }
236
/**
237
* Result of type checking with success/failure information
238
*/
239
case class TypeCheckResult(isSuccess: Boolean, errorMessage: Option[String]) {
240
def isFailure: Boolean = !isSuccess
241
}
242
243
object TypeCheckResult {
244
/** Successful type check */
245
val TypeCheckSuccess: TypeCheckResult = TypeCheckResult(true, None)
246
247
/** Create failure result */
248
def TypeCheckFailure(message: String): TypeCheckResult =
249
TypeCheckResult(false, Some(message))
250
}
251
252
/**
253
* Validation rules for analyzed plans
254
*/
255
object CheckAnalysis {
256
/**
257
* Check that a plan is properly analyzed
258
*/
259
def checkAnalysis(plan: LogicalPlan): Unit = {
260
plan.foreachUp {
261
case p if !p.analyzed =>
262
// Check for unresolved references, missing attributes, etc.
263
case _ => // OK
264
}
265
}
266
}
267
```
268
269
**Usage Examples:**
270
271
```scala
272
import org.apache.spark.sql.catalyst.analysis._
273
import org.apache.spark.sql.catalyst.expressions._
274
import org.apache.spark.sql.types._
275
276
// Type checking in expressions
277
case class MyExpression(child: Expression) extends UnaryExpression {
278
override def checkInputDataTypes(): TypeCheckResult = {
279
if (child.dataType == StringType) {
280
TypeCheckResult.TypeCheckSuccess
281
} else {
282
TypeCheckResult.TypeCheckFailure(
283
s"Expected StringType, but got ${child.dataType}"
284
)
285
}
286
}
287
288
override def dataType: DataType = StringType
289
override def nullable: Boolean = child.nullable
290
override def eval(input: InternalRow): Any = {
291
// Implementation
292
}
293
}
294
295
// Use type checking
296
val validExpr = MyExpression(Literal("hello", StringType))
297
val invalidExpr = MyExpression(Literal(42, IntegerType))
298
299
val validCheck = validExpr.checkInputDataTypes() // Success
300
val invalidCheck = invalidExpr.checkInputDataTypes() // Failure
301
302
// Plan analysis checking
303
val plan = Project(
304
Seq(AttributeReference("name", StringType, true)()),
305
LocalRelation(AttributeReference("name", StringType, true)())
306
)
307
308
CheckAnalysis.checkAnalysis(plan) // Validates plan is properly analyzed
309
```
310
311
### Resolution Rules and Utilities
312
313
Common resolution patterns and utilities for analysis.
314
315
```scala { .api }
316
/**
317
* Type coercion rules for Hive compatibility
318
*/
319
object HiveTypeCoercion {
320
/** Implicit type conversions for binary operations */
321
object ImplicitTypeCasts extends Rule[LogicalPlan]
322
323
/** Boolean equality type coercion */
324
object BooleanEquality extends Rule[LogicalPlan]
325
326
/** String to integral type coercion */
327
object StringToIntegralCasts extends Rule[LogicalPlan]
328
329
/** Division type coercion */
330
object Division extends Rule[LogicalPlan]
331
}
332
333
/**
334
* Resolver function type for name resolution
335
*/
336
type Resolver = (String, String) => Boolean
337
338
/** Case insensitive name resolution */
339
val caseInsensitiveResolution: Resolver = (a: String, b: String) => a.equalsIgnoreCase(b)
340
341
/** Case sensitive name resolution */
342
val caseSensitiveResolution: Resolver = (a: String, b: String) => a == b
343
344
/**
345
* Implicit class for analysis error reporting
346
*/
347
implicit class AnalysisErrorAt(t: TreeNode[_]) {
348
/** Fails analysis with error message and position */
349
def failAnalysis(msg: String): Nothing = {
350
throw new AnalysisException(msg, t.origin.line, t.origin.startPosition)
351
}
352
}
353
354
/**
355
* Execute function with position context for error reporting
356
*/
357
def withPosition[A](t: TreeNode[_])(f: => A): A = {
358
try f catch {
359
case a: AnalysisException =>
360
throw a.withPosition(t.origin.line, t.origin.startPosition)
361
}
362
}
363
```
364
365
**Usage Examples:**
366
367
```scala
368
import org.apache.spark.sql.catalyst.analysis._
369
import org.apache.spark.sql.catalyst.expressions._
370
import org.apache.spark.sql.catalyst.plans.logical._
371
372
// Name resolution
373
val resolver = caseInsensitiveResolution
374
val matches1 = resolver("Name", "name") // true
375
val matches2 = resolver("ID", "id") // true
376
377
val strictResolver = caseSensitiveResolution
378
val matches3 = strictResolver("Name", "name") // false
379
380
// Error reporting with position
381
val expr = Add(
382
AttributeReference("a", IntegerType, false)(),
383
AttributeReference("b", StringType, false)() // Type mismatch
384
)
385
386
try {
387
withPosition(expr) {
388
expr.checkInputDataTypes() match {
389
case TypeCheckResult(false, Some(message)) =>
390
expr.failAnalysis(s"Type check failed: $message")
391
case _ => // OK
392
}
393
}
394
} catch {
395
case e: AnalysisException =>
396
println(s"Analysis error: ${e.getMessage}")
397
// Includes position information from expr.origin
398
}
399
400
// Type coercion example
401
val plan = Project(
402
Seq(Add(
403
AttributeReference("int_col", IntegerType, false)(),
404
AttributeReference("double_col", DoubleType, false)()
405
).as("sum")),
406
LocalRelation(
407
AttributeReference("int_col", IntegerType, false)(),
408
AttributeReference("double_col", DoubleType, false)()
409
)
410
)
411
412
// Type coercion rules would automatically convert int_col to double for the addition
413
val coercedPlan = HiveTypeCoercion.ImplicitTypeCasts.apply(plan)
414
```
415
416
### Multi-Instance Relations
417
418
Support for handling relations with multiple instances.
419
420
```scala { .api }
421
/**
422
* Trait for relations that can appear multiple times in a query
423
*/
424
trait MultiInstanceRelation {
425
/** Create a new instance with fresh expression IDs */
426
def newInstance(): LogicalPlan
427
}
428
429
/**
430
* Plans that produce the same result but have different expression IDs
431
*/
432
object MultiInstanceRelation {
433
/** Check if two plans are the same ignoring expression IDs */
434
def isSame(left: LogicalPlan, right: LogicalPlan): Boolean
435
}
436
```
437
438
**Usage Examples:**
439
440
```scala
441
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
442
import org.apache.spark.sql.catalyst.plans.logical._
443
import org.apache.spark.sql.catalyst.expressions._
444
445
// Self-join requires multiple instances of the same relation
446
val baseRelation = LocalRelation(
447
AttributeReference("id", IntegerType, false)(),
448
AttributeReference("name", StringType, true)()
449
)
450
451
// Create new instance for self-join
452
val relation1 = baseRelation
453
val relation2 = baseRelation.newInstance() // Fresh expression IDs
454
455
val selfJoin = Join(
456
relation1,
457
relation2,
458
Inner,
459
Some(EqualTo(relation1.output(0), relation2.output(0)))
460
)
461
462
// Check if plans are the same (ignoring expression IDs)
463
val same = MultiInstanceRelation.isSame(relation1, relation2) // true
464
```