or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connectors.mddata-types.mdexpressions.mdindex.mdquery-plans.md

query-plans.mddocs/

0

# Query Plans

1

2

Catalyst's query planning system provides logical and physical representations of SQL queries through a tree-based structure. The planning framework enables sophisticated query optimization including predicate pushdown, join reordering, and cost-based optimization.

3

4

## Core Imports

5

6

```scala

7

import org.apache.spark.sql.catalyst.plans.logical._

8

import org.apache.spark.sql.catalyst.plans.physical._

9

import org.apache.spark.sql.catalyst.plans._

10

import org.apache.spark.sql.catalyst.expressions._

11

```

12

13

## Plan Hierarchy

14

15

### Base Plan Classes

16

17

```scala { .api }

18

abstract class LogicalPlan extends QueryPlan[LogicalPlan] {

19

def output: Seq[Attribute]

20

def children: Seq[LogicalPlan]

21

def resolved: Boolean

22

def childrenResolved: Boolean

23

def outputSet: AttributeSet

24

def references: AttributeSet

25

def inputSet: AttributeSet

26

def producedAttributes: AttributeSet

27

def missingInput: AttributeSet

28

def schema: StructType

29

def allAttributes: AttributeSet

30

def isStreaming: Boolean

31

def refresh(): Unit

32

}

33

34

abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanType] {

35

def output: Seq[Attribute]

36

def outputSet: AttributeSet

37

def schema: StructType

38

def printSchema(): Unit

39

def simpleString: String

40

}

41

```

42

43

### Plan Node Types by Arity

44

45

```scala { .api }

46

abstract class LeafNode extends LogicalPlan {

47

override final def children: Seq[LogicalPlan] = Nil

48

}

49

50

abstract class UnaryNode extends LogicalPlan {

51

def child: LogicalPlan

52

override final def children: Seq[LogicalPlan] = child :: Nil

53

}

54

55

abstract class BinaryNode extends LogicalPlan {

56

def left: LogicalPlan

57

def right: LogicalPlan

58

override final def children: Seq[LogicalPlan] = Seq(left, right)

59

}

60

```

61

62

## Leaf Nodes

63

64

### Data Sources

65

66

```scala { .api }

67

case class LocalRelation(output: Seq[Attribute], data: Seq[InternalRow] = Nil, isStreaming: Boolean = false) extends LeafNode

68

69

case class OneRowRelation() extends LeafNode {

70

override def output: Seq[Attribute] = Nil

71

}

72

73

case class Range(

74

start: Long,

75

end: Long,

76

step: Long,

77

numSlices: Option[Int],

78

output: Seq[Attribute],

79

isStreaming: Boolean = false

80

) extends LeafNode

81

82

case class UnresolvedRelation(

83

multipartIdentifier: Seq[String],

84

options: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty(),

85

isStreaming: Boolean = false

86

) extends LeafNode

87

```

88

89

**Usage Example:**

90

```scala

91

// Create a local relation with data

92

val attributes = Seq(

93

AttributeReference("id", IntegerType, false)(),

94

AttributeReference("name", StringType, false)()

95

)

96

val data = Seq(

97

InternalRow(1, UTF8String.fromString("Alice")),

98

InternalRow(2, UTF8String.fromString("Bob"))

99

)

100

val localRelation = LocalRelation(attributes, data)

101

102

// Create a range relation

103

val rangeRelation = Range(1, 100, 1, Some(4),

104

Seq(AttributeReference("id", LongType, false)()))

105

106

// Reference a table by name

107

val tableRef = UnresolvedRelation(Seq("my_database", "my_table"))

108

```

109

110

## Unary Nodes

111

112

### Projection and Filtering

113

114

```scala { .api }

115

case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode {

116

override def output: Seq[Attribute] = projectList.map(_.toAttribute)

117

}

118

119

case class Filter(condition: Expression, child: LogicalPlan) extends UnaryNode {

120

override def output: Seq[Attribute] = child.output

121

}

122

123

case class SubqueryAlias(identifier: String, child: LogicalPlan) extends UnaryNode {

124

override def output: Seq[Attribute] = child.output.map(_.withQualifier(Seq(identifier)))

125

}

126

127

case class Limit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode {

128

override def output: Seq[Attribute] = child.output

129

}

130

```

131

132

**Usage Example:**

133

```scala

134

val baseRelation = UnresolvedRelation(Seq("users"))

135

val nameAttr = AttributeReference("name", StringType, false)()

136

val ageAttr = AttributeReference("age", IntegerType, true)()

137

138

// Project specific columns

139

val projection = Project(Seq(nameAttr, ageAttr), baseRelation)

140

141

// Filter rows

142

val filterCondition = GreaterThan(ageAttr, Literal(18))

143

val filteredPlan = Filter(filterCondition, projection)

144

145

// Add alias

146

val aliasedPlan = SubqueryAlias("u", filteredPlan)

147

148

// Limit results

149

val limitedPlan = Limit(Literal(100), aliasedPlan)

150

```

151

152

### Sorting and Grouping

153

154

```scala { .api }

155

case class Sort(order: Seq[SortOrder], global: Boolean, child: LogicalPlan) extends UnaryNode

156

157

case class SortOrder(child: Expression, direction: SortDirection, nullOrdering: NullOrdering, sameOrderExpressions: Set[Expression] = Set.empty)

158

159

case class Aggregate(

160

groupingExpressions: Seq[Expression],

161

aggregateExpressions: Seq[NamedExpression],

162

child: LogicalPlan

163

) extends UnaryNode

164

165

case class Expand(

166

projections: Seq[Seq[Expression]],

167

output: Seq[Attribute],

168

child: LogicalPlan

169

) extends UnaryNode

170

```

171

172

**Usage Example:**

173

```scala

174

val baseRelation = UnresolvedRelation(Seq("sales"))

175

val customerAttr = AttributeReference("customer_id", IntegerType, false)()

176

val amountAttr = AttributeReference("amount", DecimalType(10, 2), false)()

177

val dateAttr = AttributeReference("sale_date", DateType, false)()

178

179

// Sort by date descending

180

val sortOrder = SortOrder(dateAttr, Descending, NullsLast)

181

val sortedPlan = Sort(Seq(sortOrder), global = true, baseRelation)

182

183

// Group by customer and aggregate amount

184

val groupingExprs = Seq(customerAttr)

185

val aggregateExprs = Seq(

186

customerAttr,

187

Alias(Sum(amountAttr), "total_amount")()

188

)

189

val aggregatePlan = Aggregate(groupingExprs, aggregateExprs, baseRelation)

190

```

191

192

### Window Operations

193

194

```scala { .api }

195

case class Window(

196

windowExpressions: Seq[NamedExpression],

197

partitionSpec: Seq[Expression],

198

orderSpec: Seq[SortOrder],

199

child: LogicalPlan

200

) extends UnaryNode

201

202

case class WindowExpression(

203

windowFunction: Expression,

204

windowSpec: WindowSpecDefinition

205

) extends Expression

206

207

case class WindowSpecDefinition(

208

partitionSpec: Seq[Expression],

209

orderSpec: Seq[SortOrder],

210

frameSpecification: WindowFrame

211

)

212

```

213

214

**Usage Example:**

215

```scala

216

val baseRelation = UnresolvedRelation(Seq("employees"))

217

val deptAttr = AttributeReference("department", StringType, false)()

218

val salaryAttr = AttributeReference("salary", DecimalType(10, 2), false)()

219

220

// Window function: rank employees by salary within each department

221

val windowSpec = WindowSpecDefinition(

222

partitionSpec = Seq(deptAttr),

223

orderSpec = Seq(SortOrder(salaryAttr, Descending, NullsLast)),

224

frameSpecification = UnspecifiedFrame

225

)

226

val rankExpr = WindowExpression(Rank(Seq(salaryAttr)), windowSpec)

227

val windowExprs = Seq(Alias(rankExpr, "salary_rank")())

228

229

val windowPlan = Window(windowExprs, Seq(deptAttr),

230

Seq(SortOrder(salaryAttr, Descending, NullsLast)), baseRelation)

231

```

232

233

## Binary Nodes

234

235

### Join Operations

236

237

```scala { .api }

238

case class Join(

239

left: LogicalPlan,

240

right: LogicalPlan,

241

joinType: JoinType,

242

condition: Option[Expression],

243

hint: JoinHint = JoinHint.NONE

244

) extends BinaryNode

245

246

sealed abstract class JoinType {

247

def sql: String

248

}

249

case object Inner extends JoinType

250

case object LeftOuter extends JoinType

251

case object RightOuter extends JoinType

252

case object FullOuter extends JoinType

253

case object LeftSemi extends JoinType

254

case object LeftAnti extends JoinType

255

case object Cross extends JoinType

256

```

257

258

**Usage Example:**

259

```scala

260

val usersTable = UnresolvedRelation(Seq("users"))

261

val ordersTable = UnresolvedRelation(Seq("orders"))

262

263

val userIdAttr = AttributeReference("user_id", IntegerType, false)()

264

val orderUserIdAttr = AttributeReference("user_id", IntegerType, false)()

265

266

// Inner join users and orders

267

val joinCondition = EqualTo(userIdAttr, orderUserIdAttr)

268

val innerJoin = Join(usersTable, ordersTable, Inner, Some(joinCondition))

269

270

// Left outer join

271

val leftJoin = Join(usersTable, ordersTable, LeftOuter, Some(joinCondition))

272

273

// Cross join (Cartesian product)

274

val crossJoin = Join(usersTable, ordersTable, Cross, None)

275

```

276

277

### Set Operations

278

279

```scala { .api }

280

case class Union(children: Seq[LogicalPlan], byName: Boolean = false, allowMissingCol: Boolean = false) extends LogicalPlan

281

282

case class Intersect(left: LogicalPlan, right: LogicalPlan, isAll: Boolean) extends BinaryNode

283

284

case class Except(left: LogicalPlan, right: LogicalPlan, isAll: Boolean) extends BinaryNode

285

```

286

287

**Usage Example:**

288

```scala

289

val currentUsers = UnresolvedRelation(Seq("current_users"))

290

val formerUsers = UnresolvedRelation(Seq("former_users"))

291

val activeUsers = UnresolvedRelation(Seq("active_users"))

292

293

// Union all users

294

val allUsers = Union(Seq(currentUsers, formerUsers))

295

296

// Find users in both current and active

297

val commonUsers = Intersect(currentUsers, activeUsers, isAll = false)

298

299

// Find current users who are not active

300

val inactiveUsers = Except(currentUsers, activeUsers, isAll = false)

301

```

302

303

## Advanced Logical Plans

304

305

### Subqueries

306

307

```scala { .api }

308

case class SubqueryExpression(

309

plan: LogicalPlan,

310

children: Seq[Expression] = Seq.empty,

311

exprId: ExprId = NamedExpression.newExprId,

312

joinCond: Seq[Expression] = Seq.empty,

313

hint: Option[HintInfo] = None

314

) extends PlanExpression[LogicalPlan]

315

316

case class Exists(plan: LogicalPlan, children: Seq[Expression] = Seq.empty, exprId: ExprId = NamedExpression.newExprId) extends SubqueryExpression(plan, children, exprId)

317

318

case class ScalarSubquery(plan: LogicalPlan, children: Seq[Expression] = Seq.empty, exprId: ExprId = NamedExpression.newExprId) extends SubqueryExpression(plan, children, exprId)

319

320

case class ListQuery(plan: LogicalPlan, children: Seq[Expression] = Seq.empty, exprId: ExprId = NamedExpression.newExprId) extends SubqueryExpression(plan, children, exprId)

321

```

322

323

**Usage Example:**

324

```scala

325

val usersTable = UnresolvedRelation(Seq("users"))

326

val ordersTable = UnresolvedRelation(Seq("orders"))

327

val userIdAttr = AttributeReference("user_id", IntegerType, false)()

328

329

// EXISTS subquery

330

val existsSubquery = Filter(

331

EqualTo(AttributeReference("user_id", IntegerType, false)(), userIdAttr),

332

ordersTable

333

)

334

val usersWithOrders = Filter(Exists(existsSubquery), usersTable)

335

336

// Scalar subquery

337

val countSubquery = Aggregate(

338

Seq.empty,

339

Seq(Alias(Count(Literal(1)), "order_count")()),

340

Filter(EqualTo(AttributeReference("user_id", IntegerType, false)(), userIdAttr), ordersTable)

341

)

342

val usersWithOrderCount = Project(

343

Seq(userIdAttr, Alias(ScalarSubquery(countSubquery), "order_count")()),

344

usersTable

345

)

346

```

347

348

### Table Modification

349

350

```scala { .api }

351

case class InsertIntoStatement(

352

table: LogicalPlan,

353

partition: Map[String, Option[String]],

354

userSpecifiedCols: Seq[String],

355

query: LogicalPlan,

356

overwrite: Boolean,

357

ifPartitionNotExists: Boolean = false

358

) extends Command

359

360

case class DeleteFromTable(table: LogicalPlan, condition: Option[Expression]) extends Command

361

362

case class UpdateTable(table: LogicalPlan, assignments: Seq[Assignment], condition: Option[Expression]) extends Command

363

364

case class MergeIntoTable(

365

targetTable: LogicalPlan,

366

sourceTable: LogicalPlan,

367

mergeCondition: Expression,

368

matchedActions: Seq[MergeAction],

369

notMatchedActions: Seq[MergeAction]

370

) extends Command

371

```

372

373

### Data Definition

374

375

```scala { .api }

376

case class CreateTable(

377

tableDesc: CatalogTable,

378

mode: SaveMode,

379

query: Option[LogicalPlan]

380

) extends Command

381

382

case class DropTable(

383

identifier: Seq[String],

384

ifExists: Boolean,

385

isView: Boolean

386

) extends Command

387

388

case class AlterTable(

389

table: LogicalPlan,

390

changes: Seq[TableChange]

391

) extends Command

392

```

393

394

## Physical Planning Concepts

395

396

### Physical Plan Base

397

398

```scala { .api }

399

abstract class SparkPlan extends QueryPlan[SparkPlan] {

400

def execute(): RDD[InternalRow]

401

def executeCollect(): Array[InternalRow]

402

def executeBroadcast[T](): broadcast.Broadcast[T]

403

def executeColumnar(): RDD[ColumnarBatch]

404

def requiredChildDistribution: Seq[Distribution]

405

def requiredChildOrdering: Seq[Seq[SortOrder]]

406

def outputPartitioning: Partitioning

407

def outputOrdering: Seq[SortOrder]

408

def metrics: Map[String, SQLMetric]

409

}

410

```

411

412

### Distribution and Partitioning

413

414

```scala { .api }

415

sealed abstract class Distribution

416

417

case object UnspecifiedDistribution extends Distribution

418

case object AllTuples extends Distribution

419

case object BroadcastDistribution extends Distribution

420

case class HashClusteredDistribution(expressions: Seq[Expression]) extends Distribution

421

case class RangeDistribution(ordering: Seq[SortOrder]) extends Distribution

422

423

sealed abstract class Partitioning {

424

def numPartitions: Int

425

def satisfies(required: Distribution): Boolean

426

}

427

428

case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) extends Partitioning

429

case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int) extends Partitioning

430

case class RoundRobinPartitioning(numPartitions: Int) extends Partitioning

431

case object SinglePartition extends Partitioning

432

case object UnknownPartitioning extends Partitioning

433

```

434

435

## Plan Transformations

436

437

### Rule-Based Optimization

438

439

```scala { .api }

440

abstract class Rule[TreeType <: TreeNode[TreeType]] {

441

val ruleName: String

442

def apply(plan: TreeType): TreeType

443

}

444

445

case class Batch(name: String, strategy: Strategy, rules: Rule[LogicalPlan]*)

446

447

sealed abstract class Strategy

448

case object Once extends Strategy

449

case class FixedPoint(maxIterations: Int) extends Strategy

450

```

451

452

**Usage Example:**

453

```scala

454

// Custom optimization rule

455

object PushDownFilters extends Rule[LogicalPlan] {

456

val ruleName = "PushDownFilters"

457

458

def apply(plan: LogicalPlan): LogicalPlan = plan transform {

459

case Filter(condition, Join(left, right, joinType, joinCondition, hint)) =>

460

// Logic to push filter conditions down past joins

461

pushFilterThroughJoin(condition, Join(left, right, joinType, joinCondition, hint))

462

case other => other

463

}

464

465

private def pushFilterThroughJoin(filter: Expression, join: Join): LogicalPlan = {

466

// Implementation details for filter pushdown

467

join

468

}

469

}

470

471

// Apply optimization rules in batches

472

val optimizer = Seq(

473

Batch("Filter Pushdown", FixedPoint(10), PushDownFilters),

474

Batch("Constant Folding", Once, ConstantFolding)

475

)

476

```

477

478

## Common Query Plan Patterns

479

480

### Building Query Plans Programmatically

481

```scala

482

// Build a complex query plan

483

val usersTable = UnresolvedRelation(Seq("users"))

484

val ordersTable = UnresolvedRelation(Seq("orders"))

485

486

val userIdAttr = AttributeReference("user_id", IntegerType, false)()

487

val nameAttr = AttributeReference("name", StringType, false)()

488

val orderAmountAttr = AttributeReference("amount", DecimalType(10, 2), false)()

489

490

// SELECT u.name, SUM(o.amount) as total

491

// FROM users u JOIN orders o ON u.user_id = o.user_id

492

// WHERE u.active = true

493

// GROUP BY u.user_id, u.name

494

// HAVING SUM(o.amount) > 1000

495

// ORDER BY total DESC

496

// LIMIT 10

497

498

val joinCondition = EqualTo(userIdAttr, orderAmountAttr)

499

val joinedPlan = Join(usersTable, ordersTable, Inner, Some(joinCondition))

500

501

val filterCondition = EqualTo(AttributeReference("active", BooleanType, false)(), Literal(true))

502

val filteredPlan = Filter(filterCondition, joinedPlan)

503

504

val groupingExprs = Seq(userIdAttr, nameAttr)

505

val aggregateExprs = Seq(

506

nameAttr,

507

Alias(Sum(orderAmountAttr), "total")()

508

)

509

val aggregatedPlan = Aggregate(groupingExprs, aggregateExprs, filteredPlan)

510

511

val havingCondition = GreaterThan(Sum(orderAmountAttr), Literal(1000))

512

val havingPlan = Filter(havingCondition, aggregatedPlan)

513

514

val sortOrder = SortOrder(AttributeReference("total", DecimalType(10, 2), false)(), Descending, NullsLast)

515

val sortedPlan = Sort(Seq(sortOrder), global = true, havingPlan)

516

517

val finalPlan = Limit(Literal(10), sortedPlan)

518

```

519

520

The query planning system in Catalyst provides a comprehensive framework for representing and optimizing SQL queries through a rich tree-based structure that enables sophisticated transformations and optimizations.