or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-parsing.mddialects.mdexecution.mdexpression-building.mdindex.mdoptimization.mdschema.mdutilities.md

schema.mddocs/

0

# Schema Management

1

2

Database schema representation and validation for accurate parsing and optimization. Define table structures, column types, relationships, and hierarchical schema organization for multi-database environments.

3

4

## Capabilities

5

6

### Base Schema Interface

7

8

Abstract base class defining the schema interface for different implementations.

9

10

```python { .api }

11

class Schema:

12

"""Abstract base class for database schemas."""

13

14

dialect: str # SQL dialect for schema operations

15

16

def add_table(

17

self,

18

table: str | Expression,

19

column_mapping: Optional[Dict] = None,

20

dialect: str = None,

21

normalize: Optional[bool] = None,

22

match_depth: bool = True

23

) -> None:

24

"""

25

Register or update a table with column information.

26

27

Args:

28

table: Table name or Table expression

29

column_mapping: Dictionary mapping column names to types

30

dialect (str): SQL dialect for parsing table name

31

normalize (bool): Whether to normalize identifiers

32

match_depth (bool): Whether to enforce schema depth matching

33

"""

34

35

def column_names(

36

self,

37

table: str | Expression,

38

only_visible: bool = False,

39

dialect: str = None,

40

normalize: Optional[bool] = None

41

) -> List[str]:

42

"""

43

Get column names for a table.

44

45

Args:

46

table: Table name or Table expression

47

only_visible (bool): Return only visible columns

48

dialect (str): SQL dialect for parsing

49

normalize (bool): Whether to normalize identifiers

50

51

Returns:

52

List[str]: List of column names

53

"""

54

55

def get_column_type(

56

self,

57

table: str | Expression,

58

column: str | Expression,

59

dialect: str = None,

60

normalize: Optional[bool] = None

61

) -> Optional[str]:

62

"""

63

Get data type for a specific column.

64

65

Args:

66

table: Table name or expression

67

column: Column name or expression

68

dialect (str): SQL dialect for parsing

69

normalize (bool): Whether to normalize identifiers

70

71

Returns:

72

Optional[str]: Column data type or None if not found

73

"""

74

```

75

76

### Mapping Schema Implementation

77

78

Concrete schema implementation using dictionary-based storage.

79

80

```python { .api }

81

class MappingSchema(Schema):

82

"""Dictionary-based schema implementation."""

83

84

def __init__(

85

self,

86

schema: Optional[Dict] = None,

87

visible: Optional[Dict] = None,

88

dialect: str = None,

89

normalize: bool = True,

90

**kwargs

91

):

92

"""

93

Initialize schema with mapping data.

94

95

Args:

96

schema (Dict): Schema mapping in nested dictionary format

97

visible (Dict): Visibility mapping for columns

98

dialect (str): Default SQL dialect

99

normalize (bool): Whether to normalize identifiers by default

100

**kwargs: Additional schema options

101

"""

102

103

@property

104

def mapping(self) -> Dict:

105

"""Get the underlying schema mapping."""

106

107

def copy(self, **kwargs) -> MappingSchema:

108

"""Create a copy of the schema with optional modifications."""

109

110

def nested_get(self, *keys) -> Any:

111

"""Get nested value from schema mapping."""

112

113

def nested_set(self, keys: List[str], value: Any) -> None:

114

"""Set nested value in schema mapping."""

115

```

116

117

### Schema Utility Functions

118

119

Helper functions for working with schema structures and data.

120

121

```python { .api }

122

def ensure_schema(

123

schema: Optional[Union[Dict, Schema]],

124

dialect: str = None,

125

**kwargs

126

) -> Schema:

127

"""

128

Ensure input is a proper Schema instance.

129

130

Args:

131

schema: Schema dictionary or Schema instance

132

dialect (str): SQL dialect if creating new schema

133

**kwargs: Additional schema creation options

134

135

Returns:

136

Schema: Validated Schema instance

137

"""

138

139

def flatten_schema(

140

schema: Dict,

141

depth: int,

142

keys: Optional[List] = None

143

) -> List[List[str]]:

144

"""

145

Flatten nested schema dictionary to list of key paths.

146

147

Args:

148

schema (Dict): Nested schema dictionary

149

depth (int): Maximum depth to flatten

150

keys (List): Current key path (for recursion)

151

152

Returns:

153

List[List[str]]: List of key paths to leaf values

154

"""

155

156

def nested_get(

157

mapping: Dict,

158

*keys: Tuple[str, str]

159

) -> Any:

160

"""

161

Get value from nested dictionary using key path.

162

163

Args:

164

mapping (Dict): Nested dictionary

165

*keys: Tuple pairs of (key, fallback_key) for each level

166

167

Returns:

168

Any: Found value or None

169

"""

170

171

def nested_set(

172

mapping: Dict,

173

keys: List[str],

174

value: Any

175

) -> None:

176

"""

177

Set value in nested dictionary using key path.

178

179

Args:

180

mapping (Dict): Nested dictionary to modify

181

keys (List[str]): Key path to set value

182

value (Any): Value to set

183

"""

184

```

185

186

## Usage Examples

187

188

### Basic Schema Definition

189

190

```python

191

from sqlglot.schema import MappingSchema

192

193

# Define schema with table and column information

194

schema = MappingSchema({

195

"users": {

196

"id": "INT",

197

"name": "VARCHAR(100)",

198

"email": "VARCHAR(255)",

199

"age": "INT",

200

"created_date": "DATE",

201

"is_active": "BOOLEAN"

202

},

203

"orders": {

204

"id": "INT",

205

"user_id": "INT",

206

"product_name": "VARCHAR(200)",

207

"quantity": "INT",

208

"price": "DECIMAL(10,2)",

209

"order_date": "TIMESTAMP"

210

},

211

"products": {

212

"id": "INT",

213

"name": "VARCHAR(200)",

214

"category": "VARCHAR(50)",

215

"price": "DECIMAL(10,2)",

216

"in_stock": "BOOLEAN"

217

}

218

})

219

220

# Use schema for operations

221

print(schema.column_names("users"))

222

# ['id', 'name', 'email', 'age', 'created_date', 'is_active']

223

224

print(schema.get_column_type("users", "email"))

225

# 'VARCHAR(255)'

226

```

227

228

### Multi-Database Schema

229

230

```python

231

from sqlglot.schema import MappingSchema

232

233

# Define multi-database schema

234

schema = MappingSchema({

235

"production": {

236

"users": {

237

"id": "INT",

238

"name": "VARCHAR(100)",

239

"email": "VARCHAR(255)"

240

},

241

"orders": {

242

"id": "INT",

243

"user_id": "INT",

244

"total": "DECIMAL(10,2)"

245

}

246

},

247

"analytics": {

248

"user_metrics": {

249

"user_id": "INT",

250

"total_orders": "INT",

251

"lifetime_value": "DECIMAL(12,2)"

252

},

253

"daily_stats": {

254

"date": "DATE",

255

"total_users": "INT",

256

"total_revenue": "DECIMAL(15,2)"

257

}

258

}

259

})

260

261

# Access qualified table names

262

print(schema.column_names("production.users"))

263

print(schema.get_column_type("analytics.user_metrics", "lifetime_value"))

264

```

265

266

### Dynamic Schema Building

267

268

```python

269

from sqlglot.schema import MappingSchema

270

271

# Start with empty schema

272

schema = MappingSchema()

273

274

# Add tables dynamically

275

schema.add_table("customers", {

276

"customer_id": "INT",

277

"first_name": "VARCHAR(50)",

278

"last_name": "VARCHAR(50)",

279

"email": "VARCHAR(100)",

280

"phone": "VARCHAR(20)"

281

})

282

283

schema.add_table("invoices", {

284

"invoice_id": "INT",

285

"customer_id": "INT",

286

"invoice_date": "DATE",

287

"due_date": "DATE",

288

"amount": "DECIMAL(10,2)",

289

"status": "VARCHAR(20)"

290

})

291

292

# Add table with qualified name

293

schema.add_table("reporting.monthly_summary", {

294

"month": "DATE",

295

"total_customers": "INT",

296

"total_revenue": "DECIMAL(15,2)",

297

"avg_invoice_amount": "DECIMAL(10,2)"

298

})

299

300

print(schema.column_names("customers"))

301

print(schema.column_names("reporting.monthly_summary"))

302

```

303

304

### Schema with Optimization

305

306

```python

307

import sqlglot

308

from sqlglot.schema import MappingSchema

309

from sqlglot.optimizer import optimize

310

311

# Define comprehensive schema

312

schema = MappingSchema({

313

"ecommerce": {

314

"customers": {

315

"id": "INT",

316

"email": "VARCHAR(255)",

317

"first_name": "VARCHAR(100)",

318

"last_name": "VARCHAR(100)",

319

"registration_date": "DATE"

320

},

321

"orders": {

322

"id": "INT",

323

"customer_id": "INT",

324

"order_date": "TIMESTAMP",

325

"status": "VARCHAR(20)",

326

"total_amount": "DECIMAL(12,2)"

327

},

328

"order_items": {

329

"id": "INT",

330

"order_id": "INT",

331

"product_id": "INT",

332

"quantity": "INT",

333

"unit_price": "DECIMAL(10,2)"

334

},

335

"products": {

336

"id": "INT",

337

"name": "VARCHAR(200)",

338

"category": "VARCHAR(100)",

339

"price": "DECIMAL(10,2)",

340

"weight": "DECIMAL(8,3)"

341

}

342

}

343

})

344

345

# Use schema for query optimization

346

sql = """

347

SELECT c.first_name, c.last_name, SUM(o.total_amount) as total_spent

348

FROM customers c

349

JOIN orders o ON c.id = o.customer_id

350

WHERE c.registration_date >= '2023-01-01'

351

AND o.status = 'completed'

352

GROUP BY c.id, c.first_name, c.last_name

353

HAVING SUM(o.total_amount) > 1000

354

"""

355

356

# Optimize with schema information

357

optimized = optimize(sql, schema=schema, dialect="postgres")

358

print(optimized.sql(pretty=True))

359

```

360

361

### Schema Validation and Type Checking

362

363

```python

364

from sqlglot.schema import MappingSchema

365

from sqlglot.optimizer import annotate_types

366

import sqlglot

367

368

# Define schema with specific types

369

schema = MappingSchema({

370

"sales": {

371

"transaction_id": "BIGINT",

372

"customer_id": "INT",

373

"product_id": "INT",

374

"sale_date": "DATE",

375

"sale_timestamp": "TIMESTAMP",

376

"amount": "DECIMAL(10,2)",

377

"tax_rate": "DECIMAL(5,4)",

378

"currency": "CHAR(3)",

379

"notes": "TEXT"

380

}

381

})

382

383

# Parse and type-annotate query

384

sql = """

385

SELECT

386

customer_id,

387

COUNT(*) as transaction_count,

388

SUM(amount) as total_sales,

389

AVG(amount) as avg_sale,

390

MAX(sale_date) as last_sale_date

391

FROM sales

392

WHERE sale_date >= '2023-01-01'

393

AND amount > 10.00

394

GROUP BY customer_id

395

"""

396

397

expression = sqlglot.parse_one(sql)

398

typed_expression = annotate_types(expression, schema=schema)

399

400

# The expression now has type information

401

print(typed_expression.sql(pretty=True))

402

```

403

404

### Working with Nested Schema Data

405

406

```python

407

from sqlglot.schema import flatten_schema, nested_get, nested_set

408

409

# Complex nested schema

410

complex_schema = {

411

"company_a": {

412

"production": {

413

"users": {"id": "INT", "name": "VARCHAR"},

414

"orders": {"id": "INT", "user_id": "INT"}

415

},

416

"staging": {

417

"temp_users": {"id": "INT", "name": "VARCHAR"}

418

}

419

},

420

"company_b": {

421

"production": {

422

"customers": {"id": "INT", "email": "VARCHAR"}

423

}

424

}

425

}

426

427

# Flatten schema to get all table paths

428

flattened = flatten_schema(complex_schema, depth=3)

429

print("All table paths:")

430

for path in flattened:

431

print(".".join(path))

432

433

# Get specific nested values

434

user_id_type = nested_get(complex_schema,

435

("company_a", "company_a"),

436

("production", "production"),

437

("users", "users"),

438

("id", "id"))

439

print(f"User ID type: {user_id_type}")

440

441

# Set nested values

442

nested_set(complex_schema,

443

["company_a", "production", "users", "created_date"],

444

"TIMESTAMP")

445

446

print("Updated schema:", complex_schema["company_a"]["production"]["users"])

447

```

448

449

## Types

450

451

```python { .api }

452

class Schema:

453

"""Abstract base class for database schemas."""

454

455

dialect: str # SQL dialect for schema operations

456

457

def add_table(self, table, column_mapping=None, **opts) -> None: ...

458

def column_names(self, table, **opts) -> List[str]: ...

459

def get_column_type(self, table, column, **opts) -> Optional[str]: ...

460

461

class MappingSchema(Schema):

462

"""Dictionary-based schema implementation."""

463

464

mapping: Dict # Underlying schema data

465

visible: Dict # Column visibility mapping

466

normalize: bool # Whether to normalize identifiers

467

supported_table_args: Any # Supported table arguments

468

469

def __init__(self, schema=None, visible=None, dialect=None, normalize=True, **kwargs): ...

470

def copy(self, **kwargs) -> MappingSchema: ...

471

def nested_get(self, *keys) -> Any: ...

472

def nested_set(self, keys: List[str], value: Any) -> None: ...

473

474

# Type aliases for schema data structures

475

ColumnMapping = Union[Dict, str, List] # Column definition formats

476

SchemaDict = Dict[str, Any] # Nested schema dictionary

477

```