or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

analysis.mdcode-generation.mddata-types.mdexpressions.mdindex.mdoptimization.mdparsing.mdquery-plans.mdutilities.md

query-plans.mddocs/

0

# Query Plans

1

2

This section covers the query plan system in Spark Catalyst, including logical and physical plan representations with tree-based transformations and optimization support. Query plans form the core of Catalyst's query processing pipeline.

3

4

## Core Imports

5

6

```scala

7

import org.apache.spark.sql.catalyst.plans._

8

import org.apache.spark.sql.catalyst.plans.logical._

9

import org.apache.spark.sql.catalyst.plans.physical._

10

import org.apache.spark.sql.catalyst.plans.joinTypes._

11

import org.apache.spark.sql.catalyst.expressions._

12

```

13

14

## Query Plan Hierarchy

15

16

### QueryPlan (abstract class)

17

18

Base class for all query plans (logical and physical), extending the tree framework.

19

20

```scala { .api }

21

abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanType] {

22

def output: Seq[Attribute]

23

def outputSet: AttributeSet

24

def references: AttributeSet

25

def inputSet: AttributeSet

26

def schema: StructType

27

def outputOrdering: Seq[SortOrder]

28

def maxRows: Option[Long]

29

def transformExpressions(rule: PartialFunction[Expression, Expression]): PlanType

30

def transformExpressionsDown(rule: PartialFunction[Expression, Expression]): PlanType

31

def transformExpressionsUp(rule: PartialFunction[Expression, Expression]): PlanType

32

}

33

```

34

35

### LogicalPlan (abstract class)

36

37

Base class for all logical query plans representing the query structure before physical optimization.

38

39

```scala { .api }

40

abstract class LogicalPlan extends QueryPlan[LogicalPlan] {

41

def resolved: Boolean

42

def childrenResolved: Boolean

43

def statistics: Statistics

44

}

45

```

46

47

#### Plan Node Types

48

49

```scala { .api }

50

abstract class LeafNode extends LogicalPlan {

51

override final def children: Seq[LogicalPlan] = Nil

52

}

53

54

abstract class UnaryNode extends LogicalPlan {

55

def child: LogicalPlan

56

override final def children: Seq[LogicalPlan] = child :: Nil

57

}

58

59

abstract class BinaryNode extends LogicalPlan {

60

def left: LogicalPlan

61

def right: LogicalPlan

62

override final def children: Seq[LogicalPlan] = left :: right :: Nil

63

}

64

```

65

66

#### Usage Example

67

68

```scala

69

import org.apache.spark.sql.catalyst.plans.logical._

70

import org.apache.spark.sql.catalyst.expressions._

71

72

// Create a simple query plan

73

val relation = UnresolvedRelation(TableIdentifier("users"))

74

val filter = Filter(EqualTo(UnresolvedAttribute("age"), Literal(25)), relation)

75

val project = Project(Seq(UnresolvedAttribute("name")), filter)

76

77

// Check if plan is resolved

78

val isResolved = project.resolved

79

80

// Get output schema

81

val schema = project.schema

82

```

83

84

## Join Operations

85

86

### Join Types

87

88

```scala { .api }

89

sealed abstract class JoinType {

90

def sql: String

91

}

92

93

case object Inner extends JoinType

94

case object LeftOuter extends JoinType

95

case object RightOuter extends JoinType

96

case object FullOuter extends JoinType

97

case object LeftSemi extends JoinType

98

case object LeftAnti extends JoinType

99

case object Cross extends JoinType

100

```

101

102

### Join Plan

103

104

```scala { .api }

105

case class Join(

106

left: LogicalPlan,

107

right: LogicalPlan,

108

joinType: JoinType,

109

condition: Option[Expression]

110

) extends BinaryNode {

111

override def output: Seq[Attribute] = {

112

joinType match {

113

case LeftSemi | LeftAnti => left.output

114

case _ => left.output ++ right.output

115

}

116

}

117

}

118

```

119

120

#### Usage Example

121

122

```scala

123

import org.apache.spark.sql.catalyst.plans.logical._

124

import org.apache.spark.sql.catalyst.plans.joinTypes._

125

126

// Create join operation

127

val users = UnresolvedRelation(TableIdentifier("users"))

128

val orders = UnresolvedRelation(TableIdentifier("orders"))

129

val joinCondition = EqualTo(

130

UnresolvedAttribute("users.id"),

131

UnresolvedAttribute("orders.user_id")

132

)

133

val join = Join(users, orders, Inner, Some(joinCondition))

134

```

135

136

## Basic Logical Operators

137

138

### Project (SELECT)

139

140

```scala { .api }

141

case class Project(

142

projectList: Seq[NamedExpression],

143

child: LogicalPlan

144

) extends UnaryNode {

145

override def output: Seq[Attribute] = projectList.map(_.toAttribute)

146

}

147

```

148

149

### Filter (WHERE)

150

151

```scala { .api }

152

case class Filter(

153

condition: Expression,

154

child: LogicalPlan

155

) extends UnaryNode {

156

override def output: Seq[Attribute] = child.output

157

}

158

```

159

160

### Aggregate (GROUP BY)

161

162

```scala { .api }

163

case class Aggregate(

164

groupingExpressions: Seq[Expression],

165

aggregateExpressions: Seq[NamedExpression],

166

child: LogicalPlan

167

) extends UnaryNode {

168

override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute)

169

}

170

```

171

172

### Sort (ORDER BY)

173

174

```scala { .api }

175

case class Sort(

176

order: Seq[SortOrder],

177

global: Boolean,

178

child: LogicalPlan

179

) extends UnaryNode {

180

override def output: Seq[Attribute] = child.output

181

}

182

183

case class SortOrder(

184

child: Expression,

185

direction: SortDirection,

186

nullOrdering: NullOrdering,

187

sameOrderExpressions: Set[Expression] = Set.empty

188

) extends Expression with Unevaluable {

189

override def dataType: DataType = child.dataType

190

override def nullable: Boolean = child.nullable

191

}

192

193

abstract class SortDirection {

194

def sql: String

195

}

196

case object Ascending extends SortDirection

197

case object Descending extends SortDirection

198

199

abstract class NullOrdering {

200

def sql: String

201

}

202

case object NullsFirst extends NullOrdering

203

case object NullsLast extends NullOrdering

204

```

205

206

### Limit

207

208

```scala { .api }

209

case class Limit(

210

limitExpr: Expression,

211

child: LogicalPlan

212

) extends UnaryNode {

213

override def output: Seq[Attribute] = child.output

214

}

215

```

216

217

### Union

218

219

```scala { .api }

220

case class Union(children: Seq[LogicalPlan]) extends LogicalPlan {

221

override def output: Seq[Attribute] = children.head.output

222

}

223

```

224

225

### Distinct

226

227

```scala { .api }

228

case class Distinct(child: LogicalPlan) extends UnaryNode {

229

override def output: Seq[Attribute] = child.output

230

}

231

```

232

233

#### Usage Example

234

235

```scala

236

import org.apache.spark.sql.catalyst.plans.logical._

237

import org.apache.spark.sql.catalyst.expressions._

238

239

// Build a complex query plan: SELECT name FROM users WHERE age > 18 ORDER BY name LIMIT 10

240

val relation = UnresolvedRelation(TableIdentifier("users"))

241

242

val filter = Filter(

243

GreaterThan(UnresolvedAttribute("age"), Literal(18)),

244

relation

245

)

246

247

val project = Project(

248

Seq(UnresolvedAttribute("name")),

249

filter

250

)

251

252

val sort = Sort(

253

Seq(SortOrder(UnresolvedAttribute("name"), Ascending, NullsLast)),

254

global = true,

255

project

256

)

257

258

val limit = Limit(Literal(10), sort)

259

260

// The final plan represents: SELECT name FROM users WHERE age > 18 ORDER BY name LIMIT 10

261

```

262

263

## Data Sources

264

265

### UnresolvedRelation

266

267

```scala { .api }

268

case class UnresolvedRelation(

269

tableIdentifier: TableIdentifier,

270

alias: Option[String] = None

271

) extends LeafNode {

272

override def output: Seq[Attribute] = Nil

273

}

274

275

case class TableIdentifier(

276

table: String,

277

database: Option[String] = None

278

) {

279

def identifier: String = database.map(_ + ".").getOrElse("") + table

280

def quotedString: String = database.map(quoteIdentifier).map(_ + ".").getOrElse("") + quoteIdentifier(table)

281

def unquotedString: String = identifier

282

}

283

```

284

285

### LocalRelation

286

287

```scala { .api }

288

case class LocalRelation(

289

output: Seq[Attribute],

290

data: Seq[InternalRow] = Nil,

291

isStreaming: Boolean = false

292

) extends LeafNode {

293

// Constructor for single row

294

def this(output: Seq[Attribute], data: InternalRow) = this(output, data :: Nil)

295

}

296

```

297

298

#### Usage Example

299

300

```scala

301

import org.apache.spark.sql.catalyst.plans.logical._

302

import org.apache.spark.sql.types._

303

import org.apache.spark.sql.catalyst.expressions._

304

305

// Create a local relation with data

306

val schema = Seq(

307

AttributeReference("id", IntegerType, nullable = false)(),

308

AttributeReference("name", StringType, nullable = true)()

309

)

310

311

val data = Seq(

312

InternalRow(1, UTF8String.fromString("Alice")),

313

InternalRow(2, UTF8String.fromString("Bob"))

314

)

315

316

val localRelation = LocalRelation(schema, data)

317

```

318

319

## Statistics and Cost Information

320

321

### Statistics

322

323

```scala { .api }

324

case class Statistics(

325

sizeInBytes: BigInt,

326

rowCount: Option[BigInt] = None,

327

attributeStats: AttributeMap[ColumnStat] = AttributeMap.empty,

328

hints: HintInfo = HintInfo()

329

) {

330

def simpleString: String = {

331

s"sizeInBytes=${Utils.bytesToString(sizeInBytes)}, " +

332

s"rowCount=${rowCount.map(_.toString).getOrElse("unknown")}"

333

}

334

}

335

336

case class ColumnStat(

337

distinctCount: Option[BigInt] = None,

338

min: Option[Any] = None,

339

max: Option[Any] = None,

340

nullCount: Option[BigInt] = None,

341

avgLen: Option[Long] = None,

342

maxLen: Option[Long] = None,

343

histogram: Option[Histogram] = None

344

)

345

346

case class HintInfo(

347

broadcast: Boolean = false,

348

cartesianProduct: Boolean = false

349

)

350

```

351

352

#### Usage Example

353

354

```scala

355

import org.apache.spark.sql.catalyst.plans.logical._

356

357

// Create statistics for a relation

358

val stats = Statistics(

359

sizeInBytes = 1024 * 1024, // 1MB

360

rowCount = Some(1000),

361

hints = HintInfo(broadcast = true)

362

)

363

364

// Attach statistics to a plan

365

val relation = LocalRelation(schema)

366

val relationWithStats = relation.copy().withNewChildren(relation.children).transform {

367

case plan => plan.withStats(stats)

368

}

369

```

370

371

## Physical Plans

372

373

While most physical plan details are in the execution engine, Catalyst defines the base physical plan structure:

374

375

### Physical Plan Base

376

377

```scala { .api }

378

abstract class SparkPlan extends QueryPlan[SparkPlan] {

379

def execute(): RDD[InternalRow]

380

def executeCollect(): Array[InternalRow]

381

def executeTake(n: Int): Array[InternalRow]

382

def executeToIterator(): Iterator[InternalRow]

383

384

def metrics: Map[String, SQLMetric]

385

def longMetric(name: String): SQLMetric

386

}

387

```

388

389

## Tree Transformations

390

391

Query plans inherit tree transformation capabilities:

392

393

### Common Transformation Patterns

394

395

```scala

396

// Transform all expressions in a plan

397

val transformedPlan = plan.transformExpressions {

398

case expr if expr.dataType == StringType =>

399

Upper(expr)

400

}

401

402

// Transform the plan structure

403

val optimizedPlan = plan.transform {

404

case Filter(condition, child) if condition == Literal(true) =>

405

child // Remove always-true filters

406

}

407

408

// Collect information from the plan

409

val allFilters = plan.collect {

410

case filter: Filter => filter.condition

411

}

412

```

413

414

#### Usage Example

415

416

```scala

417

import org.apache.spark.sql.catalyst.plans.logical._

418

import org.apache.spark.sql.catalyst.expressions._

419

420

// Original plan with nested filters

421

val relation = UnresolvedRelation(TableIdentifier("users"))

422

val filter1 = Filter(GreaterThan(UnresolvedAttribute("age"), Literal(18)), relation)

423

val filter2 = Filter(EqualTo(UnresolvedAttribute("active"), Literal(true)), filter1)

424

425

// Combine filters into a single AND condition

426

val combinedPlan = filter2.transform {

427

case Filter(condition1, Filter(condition2, child)) =>

428

Filter(And(condition1, condition2), child)

429

}

430

431

// Result: Filter(And(active = true, age > 18), UnresolvedRelation(users))

432

```

433

434

## Plan Validation

435

436

### Resolution Status

437

438

```scala

439

// Check if a plan is fully resolved

440

def isFullyResolved(plan: LogicalPlan): Boolean = {

441

plan.resolved && plan.children.forall(isFullyResolved)

442

}

443

444

// Find unresolved references

445

def findUnresolvedReferences(plan: LogicalPlan): Seq[UnresolvedAttribute] = {

446

plan.collect {

447

case expr: UnresolvedAttribute => expr

448

}

449

}

450

```

451

452

This comprehensive query plan system enables Catalyst to represent, transform, and optimize SQL queries through a flexible tree-based structure that supports both logical planning and physical execution strategies.