0
# Schema Management
1
2
Database schema representation and validation for accurate parsing and optimization. Define table structures, column types, relationships, and hierarchical schema organization for multi-database environments.
3
4
## Capabilities
5
6
### Base Schema Interface
7
8
Abstract base class defining the schema interface for different implementations.
9
10
```python { .api }
11
class Schema:
12
"""Abstract base class for database schemas."""
13
14
dialect: str # SQL dialect for schema operations
15
16
def add_table(
17
self,
18
table: str | Expression,
19
column_mapping: Optional[Dict] = None,
20
dialect: str = None,
21
normalize: Optional[bool] = None,
22
match_depth: bool = True
23
) -> None:
24
"""
25
Register or update a table with column information.
26
27
Args:
28
table: Table name or Table expression
29
column_mapping: Dictionary mapping column names to types
30
dialect (str): SQL dialect for parsing table name
31
normalize (bool): Whether to normalize identifiers
32
match_depth (bool): Whether to enforce schema depth matching
33
"""
34
35
def column_names(
36
self,
37
table: str | Expression,
38
only_visible: bool = False,
39
dialect: str = None,
40
normalize: Optional[bool] = None
41
) -> List[str]:
42
"""
43
Get column names for a table.
44
45
Args:
46
table: Table name or Table expression
47
only_visible (bool): Return only visible columns
48
dialect (str): SQL dialect for parsing
49
normalize (bool): Whether to normalize identifiers
50
51
Returns:
52
List[str]: List of column names
53
"""
54
55
def get_column_type(
56
self,
57
table: str | Expression,
58
column: str | Expression,
59
dialect: str = None,
60
normalize: Optional[bool] = None
61
) -> Optional[str]:
62
"""
63
Get data type for a specific column.
64
65
Args:
66
table: Table name or expression
67
column: Column name or expression
68
dialect (str): SQL dialect for parsing
69
normalize (bool): Whether to normalize identifiers
70
71
Returns:
72
Optional[str]: Column data type or None if not found
73
"""
74
```
75
76
### Mapping Schema Implementation
77
78
Concrete schema implementation using dictionary-based storage.
79
80
```python { .api }
81
class MappingSchema(Schema):
82
"""Dictionary-based schema implementation."""
83
84
def __init__(
85
self,
86
schema: Optional[Dict] = None,
87
visible: Optional[Dict] = None,
88
dialect: str = None,
89
normalize: bool = True,
90
**kwargs
91
):
92
"""
93
Initialize schema with mapping data.
94
95
Args:
96
schema (Dict): Schema mapping in nested dictionary format
97
visible (Dict): Visibility mapping for columns
98
dialect (str): Default SQL dialect
99
normalize (bool): Whether to normalize identifiers by default
100
**kwargs: Additional schema options
101
"""
102
103
@property
104
def mapping(self) -> Dict:
105
"""Get the underlying schema mapping."""
106
107
def copy(self, **kwargs) -> MappingSchema:
108
"""Create a copy of the schema with optional modifications."""
109
110
def nested_get(self, *keys) -> Any:
111
"""Get nested value from schema mapping."""
112
113
def nested_set(self, keys: List[str], value: Any) -> None:
114
"""Set nested value in schema mapping."""
115
```
116
117
### Schema Utility Functions
118
119
Helper functions for working with schema structures and data.
120
121
```python { .api }
122
def ensure_schema(
123
schema: Optional[Union[Dict, Schema]],
124
dialect: str = None,
125
**kwargs
126
) -> Schema:
127
"""
128
Ensure input is a proper Schema instance.
129
130
Args:
131
schema: Schema dictionary or Schema instance
132
dialect (str): SQL dialect if creating new schema
133
**kwargs: Additional schema creation options
134
135
Returns:
136
Schema: Validated Schema instance
137
"""
138
139
def flatten_schema(
140
schema: Dict,
141
depth: int,
142
keys: Optional[List] = None
143
) -> List[List[str]]:
144
"""
145
Flatten nested schema dictionary to list of key paths.
146
147
Args:
148
schema (Dict): Nested schema dictionary
149
depth (int): Maximum depth to flatten
150
keys (List): Current key path (for recursion)
151
152
Returns:
153
List[List[str]]: List of key paths to leaf values
154
"""
155
156
def nested_get(
157
mapping: Dict,
158
*keys: Tuple[str, str]
159
) -> Any:
160
"""
161
Get value from nested dictionary using key path.
162
163
Args:
164
mapping (Dict): Nested dictionary
165
*keys: Tuple pairs of (key, fallback_key) for each level
166
167
Returns:
168
Any: Found value or None
169
"""
170
171
def nested_set(
172
mapping: Dict,
173
keys: List[str],
174
value: Any
175
) -> None:
176
"""
177
Set value in nested dictionary using key path.
178
179
Args:
180
mapping (Dict): Nested dictionary to modify
181
keys (List[str]): Key path to set value
182
value (Any): Value to set
183
"""
184
```
185
186
## Usage Examples
187
188
### Basic Schema Definition
189
190
```python
191
from sqlglot.schema import MappingSchema
192
193
# Define schema with table and column information
194
schema = MappingSchema({
195
"users": {
196
"id": "INT",
197
"name": "VARCHAR(100)",
198
"email": "VARCHAR(255)",
199
"age": "INT",
200
"created_date": "DATE",
201
"is_active": "BOOLEAN"
202
},
203
"orders": {
204
"id": "INT",
205
"user_id": "INT",
206
"product_name": "VARCHAR(200)",
207
"quantity": "INT",
208
"price": "DECIMAL(10,2)",
209
"order_date": "TIMESTAMP"
210
},
211
"products": {
212
"id": "INT",
213
"name": "VARCHAR(200)",
214
"category": "VARCHAR(50)",
215
"price": "DECIMAL(10,2)",
216
"in_stock": "BOOLEAN"
217
}
218
})
219
220
# Use schema for operations
221
print(schema.column_names("users"))
222
# ['id', 'name', 'email', 'age', 'created_date', 'is_active']
223
224
print(schema.get_column_type("users", "email"))
225
# 'VARCHAR(255)'
226
```
227
228
### Multi-Database Schema
229
230
```python
231
from sqlglot.schema import MappingSchema
232
233
# Define multi-database schema
234
schema = MappingSchema({
235
"production": {
236
"users": {
237
"id": "INT",
238
"name": "VARCHAR(100)",
239
"email": "VARCHAR(255)"
240
},
241
"orders": {
242
"id": "INT",
243
"user_id": "INT",
244
"total": "DECIMAL(10,2)"
245
}
246
},
247
"analytics": {
248
"user_metrics": {
249
"user_id": "INT",
250
"total_orders": "INT",
251
"lifetime_value": "DECIMAL(12,2)"
252
},
253
"daily_stats": {
254
"date": "DATE",
255
"total_users": "INT",
256
"total_revenue": "DECIMAL(15,2)"
257
}
258
}
259
})
260
261
# Access qualified table names
262
print(schema.column_names("production.users"))
263
print(schema.get_column_type("analytics.user_metrics", "lifetime_value"))
264
```
265
266
### Dynamic Schema Building
267
268
```python
269
from sqlglot.schema import MappingSchema
270
271
# Start with empty schema
272
schema = MappingSchema()
273
274
# Add tables dynamically
275
schema.add_table("customers", {
276
"customer_id": "INT",
277
"first_name": "VARCHAR(50)",
278
"last_name": "VARCHAR(50)",
279
"email": "VARCHAR(100)",
280
"phone": "VARCHAR(20)"
281
})
282
283
schema.add_table("invoices", {
284
"invoice_id": "INT",
285
"customer_id": "INT",
286
"invoice_date": "DATE",
287
"due_date": "DATE",
288
"amount": "DECIMAL(10,2)",
289
"status": "VARCHAR(20)"
290
})
291
292
# Add table with qualified name
293
schema.add_table("reporting.monthly_summary", {
294
"month": "DATE",
295
"total_customers": "INT",
296
"total_revenue": "DECIMAL(15,2)",
297
"avg_invoice_amount": "DECIMAL(10,2)"
298
})
299
300
print(schema.column_names("customers"))
301
print(schema.column_names("reporting.monthly_summary"))
302
```
303
304
### Schema with Optimization
305
306
```python
307
import sqlglot
308
from sqlglot.schema import MappingSchema
309
from sqlglot.optimizer import optimize
310
311
# Define comprehensive schema
312
schema = MappingSchema({
313
"ecommerce": {
314
"customers": {
315
"id": "INT",
316
"email": "VARCHAR(255)",
317
"first_name": "VARCHAR(100)",
318
"last_name": "VARCHAR(100)",
319
"registration_date": "DATE"
320
},
321
"orders": {
322
"id": "INT",
323
"customer_id": "INT",
324
"order_date": "TIMESTAMP",
325
"status": "VARCHAR(20)",
326
"total_amount": "DECIMAL(12,2)"
327
},
328
"order_items": {
329
"id": "INT",
330
"order_id": "INT",
331
"product_id": "INT",
332
"quantity": "INT",
333
"unit_price": "DECIMAL(10,2)"
334
},
335
"products": {
336
"id": "INT",
337
"name": "VARCHAR(200)",
338
"category": "VARCHAR(100)",
339
"price": "DECIMAL(10,2)",
340
"weight": "DECIMAL(8,3)"
341
}
342
}
343
})
344
345
# Use schema for query optimization
346
sql = """
347
SELECT c.first_name, c.last_name, SUM(o.total_amount) as total_spent
348
FROM customers c
349
JOIN orders o ON c.id = o.customer_id
350
WHERE c.registration_date >= '2023-01-01'
351
AND o.status = 'completed'
352
GROUP BY c.id, c.first_name, c.last_name
353
HAVING SUM(o.total_amount) > 1000
354
"""
355
356
# Optimize with schema information
357
optimized = optimize(sql, schema=schema, dialect="postgres")
358
print(optimized.sql(pretty=True))
359
```
360
361
### Schema Validation and Type Checking
362
363
```python
364
from sqlglot.schema import MappingSchema
365
from sqlglot.optimizer import annotate_types
366
import sqlglot
367
368
# Define schema with specific types
369
schema = MappingSchema({
370
"sales": {
371
"transaction_id": "BIGINT",
372
"customer_id": "INT",
373
"product_id": "INT",
374
"sale_date": "DATE",
375
"sale_timestamp": "TIMESTAMP",
376
"amount": "DECIMAL(10,2)",
377
"tax_rate": "DECIMAL(5,4)",
378
"currency": "CHAR(3)",
379
"notes": "TEXT"
380
}
381
})
382
383
# Parse and type-annotate query
384
sql = """
385
SELECT
386
customer_id,
387
COUNT(*) as transaction_count,
388
SUM(amount) as total_sales,
389
AVG(amount) as avg_sale,
390
MAX(sale_date) as last_sale_date
391
FROM sales
392
WHERE sale_date >= '2023-01-01'
393
AND amount > 10.00
394
GROUP BY customer_id
395
"""
396
397
expression = sqlglot.parse_one(sql)
398
typed_expression = annotate_types(expression, schema=schema)
399
400
# The expression now has type information
401
print(typed_expression.sql(pretty=True))
402
```
403
404
### Working with Nested Schema Data
405
406
```python
407
from sqlglot.schema import flatten_schema, nested_get, nested_set
408
409
# Complex nested schema
410
complex_schema = {
411
"company_a": {
412
"production": {
413
"users": {"id": "INT", "name": "VARCHAR"},
414
"orders": {"id": "INT", "user_id": "INT"}
415
},
416
"staging": {
417
"temp_users": {"id": "INT", "name": "VARCHAR"}
418
}
419
},
420
"company_b": {
421
"production": {
422
"customers": {"id": "INT", "email": "VARCHAR"}
423
}
424
}
425
}
426
427
# Flatten schema to get all table paths
428
flattened = flatten_schema(complex_schema, depth=3)
429
print("All table paths:")
430
for path in flattened:
431
print(".".join(path))
432
433
# Get specific nested values
434
user_id_type = nested_get(complex_schema,
435
("company_a", "company_a"),
436
("production", "production"),
437
("users", "users"),
438
("id", "id"))
439
print(f"User ID type: {user_id_type}")
440
441
# Set nested values
442
nested_set(complex_schema,
443
["company_a", "production", "users", "created_date"],
444
"TIMESTAMP")
445
446
print("Updated schema:", complex_schema["company_a"]["production"]["users"])
447
```
448
449
## Types
450
451
```python { .api }
452
class Schema:
453
"""Abstract base class for database schemas."""
454
455
dialect: str # SQL dialect for schema operations
456
457
def add_table(self, table, column_mapping=None, **opts) -> None: ...
458
def column_names(self, table, **opts) -> List[str]: ...
459
def get_column_type(self, table, column, **opts) -> Optional[str]: ...
460
461
class MappingSchema(Schema):
462
"""Dictionary-based schema implementation."""
463
464
mapping: Dict # Underlying schema data
465
visible: Dict # Column visibility mapping
466
normalize: bool # Whether to normalize identifiers
467
supported_table_args: Any # Supported table arguments
468
469
def __init__(self, schema=None, visible=None, dialect=None, normalize=True, **kwargs): ...
470
def copy(self, **kwargs) -> MappingSchema: ...
471
def nested_get(self, *keys) -> Any: ...
472
def nested_set(self, keys: List[str], value: Any) -> None: ...
473
474
# Type aliases for schema data structures
475
ColumnMapping = Union[Dict, str, List] # Column definition formats
476
SchemaDict = Dict[str, Any] # Nested schema dictionary
477
```