or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

analysis.mdexpressions.mdindex.mdoptimization.mdquery-planning.mdrow-operations.mdtree-operations.mdtypes.md

query-planning.mddocs/

0

# Query Planning

1

2

Logical and physical query plan representations with transformation and optimization capabilities for building and manipulating query execution trees.

3

4

## Capabilities

5

6

### QueryPlan Base Class

7

8

Base class for all query plans providing common functionality.

9

10

```scala { .api }

11

/**

12

* Base class for all query plans, extends TreeNode

13

*/

14

abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanType] {

15

self: PlanType =>

16

17

/** Attributes output by this node */

18

def output: Seq[Attribute]

19

20

/** Set of output attributes */

21

def outputSet: AttributeSet

22

23

/** Attributes referenced in expressions */

24

def references: AttributeSet

25

26

/** Attributes input from children */

27

def inputSet: AttributeSet

28

29

/** Referenced but missing attributes */

30

def missingInput: AttributeSet

31

32

/**

33

* Transform expressions in this plan using the given rule

34

*/

35

def transformExpressions(rule: PartialFunction[Expression, Expression]): this.type

36

37

/**

38

* Transform expressions with specific traversal order

39

*/

40

def transformExpressionsDown(rule: PartialFunction[Expression, Expression]): this.type

41

def transformExpressionsUp(rule: PartialFunction[Expression, Expression]): this.type

42

}

43

```

44

45

### LogicalPlan Class

46

47

Base class for logical query plans representing query semantics.

48

49

```scala { .api }

50

/**

51

* Base class for logical query plans

52

*/

53

abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {

54

/** Whether plan has been analyzed */

55

def analyzed: Boolean

56

57

/** Mark plan as analyzed */

58

private[catalyst] def setAnalyzed(): Unit

59

60

/**

61

* Apply transformation rules to operators, skipping analyzed sub-trees

62

*/

63

def resolveOperators(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan

64

65

/**

66

* Transform expressions, skipping analyzed sub-trees

67

*/

68

def resolveExpressions(r: PartialFunction[Expression, Expression]): LogicalPlan

69

70

/** Compute plan statistics */

71

def statistics: Statistics

72

}

73

```

74

75

**Usage Examples:**

76

77

```scala

78

import org.apache.spark.sql.catalyst.plans.logical._

79

import org.apache.spark.sql.catalyst.expressions._

80

import org.apache.spark.sql.types._

81

82

// Create a simple logical plan

83

val relation = LocalRelation(

84

AttributeReference("id", IntegerType, false)(),

85

AttributeReference("name", StringType, true)()

86

)

87

88

// Access plan properties

89

val output = relation.output // Seq of attributes

90

val outputSet = relation.outputSet // AttributeSet

91

val analyzed = relation.analyzed // Boolean

92

93

// Create filter plan

94

val filterExpr = GreaterThan(

95

AttributeReference("id", IntegerType, false)(),

96

Literal(10, IntegerType)

97

)

98

val filtered = Filter(filterExpr, relation)

99

100

// Transform expressions in plan

101

val transformed = filtered.transformExpressions {

102

case Literal(value: Int, dataType) => Literal(value * 2, dataType)

103

}

104

```

105

106

### Join Types and Operations

107

108

Join type definitions and join plan operations.

109

110

```scala { .api }

111

/** Base trait for join types */

112

sealed abstract class JoinType {

113

def sql: String

114

}

115

116

/** Inner join */

117

case object Inner extends JoinType {

118

override def sql: String = "INNER"

119

}

120

121

/** Left outer join */

122

case object LeftOuter extends JoinType {

123

override def sql: String = "LEFT OUTER"

124

}

125

126

/** Right outer join */

127

case object RightOuter extends JoinType {

128

override def sql: String = "RIGHT OUTER"

129

}

130

131

/** Full outer join */

132

case object FullOuter extends JoinType {

133

override def sql: String = "FULL OUTER"

134

}

135

136

/** Left semi join */

137

case object LeftSemi extends JoinType {

138

override def sql: String = "LEFT SEMI"

139

}

140

141

/** Left anti join */

142

case object LeftAnti extends JoinType {

143

override def sql: String = "LEFT ANTI"

144

}

145

146

/**

147

* Join logical plan node

148

*/

149

case class Join(

150

left: LogicalPlan,

151

right: LogicalPlan,

152

joinType: JoinType,

153

condition: Option[Expression]) extends LogicalPlan {

154

155

override def children: Seq[LogicalPlan] = Seq(left, right)

156

157

override def output: Seq[Attribute] = {

158

joinType match {

159

case Inner | LeftOuter | RightOuter | FullOuter =>

160

left.output ++ right.output

161

case LeftSemi | LeftAnti =>

162

left.output

163

}

164

}

165

}

166

```

167

168

**Usage Examples:**

169

170

```scala

171

import org.apache.spark.sql.catalyst.plans.logical._

172

import org.apache.spark.sql.catalyst.plans._

173

import org.apache.spark.sql.catalyst.expressions._

174

175

// Create relations

176

val users = LocalRelation(

177

AttributeReference("user_id", IntegerType, false)(),

178

AttributeReference("name", StringType, true)()

179

)

180

181

val orders = LocalRelation(

182

AttributeReference("order_id", IntegerType, false)(),

183

AttributeReference("user_id", IntegerType, false)(),

184

AttributeReference("amount", DoubleType, true)()

185

)

186

187

// Create join condition

188

val joinCondition = EqualTo(

189

users.output.find(_.name == "user_id").get,

190

orders.output.find(_.name == "user_id").get

191

)

192

193

// Create different join types

194

val innerJoin = Join(users, orders, Inner, Some(joinCondition))

195

val leftJoin = Join(users, orders, LeftOuter, Some(joinCondition))

196

val rightJoin = Join(users, orders, RightOuter, Some(joinCondition))

197

val fullJoin = Join(users, orders, FullOuter, Some(joinCondition))

198

val semiJoin = Join(users, orders, LeftSemi, Some(joinCondition))

199

200

// Access join properties

201

val joinOutput = innerJoin.output // Combined output from both sides

202

val joinChildren = innerJoin.children // Seq(users, orders)

203

```

204

205

### Basic Logical Plan Operators

206

207

Fundamental logical plan operators for query construction.

208

209

```scala { .api }

210

/**

211

* Filter (WHERE clause) logical plan

212

*/

213

case class Filter(condition: Expression, child: LogicalPlan) extends UnaryNode {

214

override def output: Seq[Attribute] = child.output

215

}

216

217

/**

218

* Projection (SELECT clause) logical plan

219

*/

220

case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode {

221

override def output: Seq[Attribute] = projectList.map(_.toAttribute)

222

}

223

224

/**

225

* Aggregation (GROUP BY clause) logical plan

226

*/

227

case class Aggregate(

228

groupingExpressions: Seq[Expression],

229

aggregateExpressions: Seq[NamedExpression],

230

child: LogicalPlan) extends UnaryNode {

231

232

override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute)

233

}

234

235

/**

236

* Sort (ORDER BY clause) logical plan

237

*/

238

case class Sort(

239

order: Seq[SortOrder],

240

global: Boolean,

241

child: LogicalPlan) extends UnaryNode {

242

243

override def output: Seq[Attribute] = child.output

244

}

245

246

/**

247

* Limit logical plan

248

*/

249

case class Limit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode {

250

override def output: Seq[Attribute] = child.output

251

}

252

253

/**

254

* Union logical plan

255

*/

256

case class Union(children: Seq[LogicalPlan]) extends LogicalPlan {

257

override def output: Seq[Attribute] = children.head.output

258

}

259

260

/**

261

* Local relation with in-memory data

262

*/

263

case class LocalRelation(output: Seq[Attribute], data: Seq[InternalRow] = Nil) extends LeafNode {

264

// Leaf node with no children

265

}

266

```

267

268

**Usage Examples:**

269

270

```scala

271

import org.apache.spark.sql.catalyst.plans.logical._

272

import org.apache.spark.sql.catalyst.expressions._

273

import org.apache.spark.sql.types._

274

275

// Base relation

276

val table = LocalRelation(

277

AttributeReference("id", IntegerType, false)(),

278

AttributeReference("name", StringType, true)(),

279

AttributeReference("age", IntegerType, true)(),

280

AttributeReference("score", DoubleType, true)()

281

)

282

283

// Filter: WHERE age > 18

284

val filterExpr = GreaterThan(table.output(2), Literal(18, IntegerType))

285

val filtered = Filter(filterExpr, table)

286

287

// Project: SELECT id, name

288

val projectList = Seq(table.output(0), table.output(1))

289

val projected = Project(projectList, filtered)

290

291

// Aggregate: SELECT age, COUNT(*), AVG(score) GROUP BY age

292

val groupExpr = Seq(table.output(2)) // age

293

val aggExprs = Seq(

294

table.output(2).as("age"),

295

Count(Literal(1)).as("count"),

296

Average(table.output(3)).as("avg_score")

297

)

298

val aggregated = Aggregate(groupExpr, aggExprs, table)

299

300

// Sort: ORDER BY score DESC

301

val sortOrder = Seq(SortOrder(table.output(3), Descending))

302

val sorted = Sort(sortOrder, global = true, table)

303

304

// Limit: LIMIT 10

305

val limited = Limit(Literal(10, IntegerType), sorted)

306

307

// Union: Combine two relations

308

val table2 = LocalRelation(table.output) // Same schema

309

val unioned = Union(Seq(table, table2))

310

311

// Complex query: SELECT name FROM table WHERE age > 21 ORDER BY score LIMIT 5

312

val complexQuery = Limit(

313

Literal(5, IntegerType),

314

Sort(

315

Seq(SortOrder(table.output(3), Ascending)),

316

global = true,

317

Project(

318

Seq(table.output(1)), // name

319

Filter(

320

GreaterThan(table.output(2), Literal(21, IntegerType)), // age > 21

321

table

322

)

323

)

324

)

325

)

326

```

327

328

### Physical Plan Integration

329

330

Bridge between logical and physical planning.

331

332

```scala { .api }

333

/**

334

* Base class for physical execution plans

335

*/

336

abstract class SparkPlan extends QueryPlan[SparkPlan] {

337

/** Execute this plan and return RDD of results */

338

def execute(): RDD[InternalRow]

339

340

/** Prepare this plan for execution */

341

def prepare(): Unit

342

343

/** Reset statistics and metrics */

344

def resetMetrics(): Unit

345

}

346

347

/**

348

* Physical plan statistics

349

*/

350

case class Statistics(sizeInBytes: BigInt, rowCount: Option[BigInt] = None) {

351

/** Whether these statistics are considered big */

352

def isBroadcastable: Boolean = sizeInBytes <= autoBroadcastJoinThreshold

353

}

354

```

355

356

**Usage Examples:**

357

358

```scala

359

import org.apache.spark.sql.catalyst.plans.logical._

360

import org.apache.spark.sql.catalyst.plans.physical._

361

362

// Logical plan can be converted to physical plan

363

val logicalPlan = Project(

364

Seq(AttributeReference("id", IntegerType, false)()),

365

LocalRelation(AttributeReference("id", IntegerType, false)())

366

)

367

368

// Statistics computation

369

val stats = Statistics(sizeInBytes = 1000, rowCount = Some(100))

370

val broadcastable = stats.isBroadcastable // Check if suitable for broadcast

371

372

// Physical properties like partitioning and ordering are preserved

373

// during logical to physical plan conversion

374

```