A comprehensive SQL parser, transpiler, optimizer, and engine supporting 30+ dialects
Database schema representation and validation for accurate parsing and optimization. Define table structures, column types, relationships, and hierarchical schema organization for multi-database environments.
Abstract base class defining the schema interface for different implementations.
class Schema:
"""Abstract base class for database schemas."""
dialect: str # SQL dialect for schema operations
def add_table(
self,
table: str | Expression,
column_mapping: Optional[Dict] = None,
dialect: str = None,
normalize: Optional[bool] = None,
match_depth: bool = True
) -> None:
"""
Register or update a table with column information.
Args:
table: Table name or Table expression
column_mapping: Dictionary mapping column names to types
dialect (str): SQL dialect for parsing table name
normalize (bool): Whether to normalize identifiers
match_depth (bool): Whether to enforce schema depth matching
"""
def column_names(
self,
table: str | Expression,
only_visible: bool = False,
dialect: str = None,
normalize: Optional[bool] = None
) -> List[str]:
"""
Get column names for a table.
Args:
table: Table name or Table expression
only_visible (bool): Return only visible columns
dialect (str): SQL dialect for parsing
normalize (bool): Whether to normalize identifiers
Returns:
List[str]: List of column names
"""
def get_column_type(
self,
table: str | Expression,
column: str | Expression,
dialect: str = None,
normalize: Optional[bool] = None
) -> Optional[str]:
"""
Get data type for a specific column.
Args:
table: Table name or expression
column: Column name or expression
dialect (str): SQL dialect for parsing
normalize (bool): Whether to normalize identifiers
Returns:
Optional[str]: Column data type or None if not found
"""Concrete schema implementation using dictionary-based storage.
class MappingSchema(Schema):
"""Dictionary-based schema implementation."""
def __init__(
self,
schema: Optional[Dict] = None,
visible: Optional[Dict] = None,
dialect: str = None,
normalize: bool = True,
**kwargs
):
"""
Initialize schema with mapping data.
Args:
schema (Dict): Schema mapping in nested dictionary format
visible (Dict): Visibility mapping for columns
dialect (str): Default SQL dialect
normalize (bool): Whether to normalize identifiers by default
**kwargs: Additional schema options
"""
@property
def mapping(self) -> Dict:
"""Get the underlying schema mapping."""
def copy(self, **kwargs) -> MappingSchema:
"""Create a copy of the schema with optional modifications."""
def nested_get(self, *keys) -> Any:
"""Get nested value from schema mapping."""
def nested_set(self, keys: List[str], value: Any) -> None:
"""Set nested value in schema mapping."""Helper functions for working with schema structures and data.
def ensure_schema(
schema: Optional[Union[Dict, Schema]],
dialect: str = None,
**kwargs
) -> Schema:
"""
Ensure input is a proper Schema instance.
Args:
schema: Schema dictionary or Schema instance
dialect (str): SQL dialect if creating new schema
**kwargs: Additional schema creation options
Returns:
Schema: Validated Schema instance
"""
def flatten_schema(
schema: Dict,
depth: int,
keys: Optional[List] = None
) -> List[List[str]]:
"""
Flatten nested schema dictionary to list of key paths.
Args:
schema (Dict): Nested schema dictionary
depth (int): Maximum depth to flatten
keys (List): Current key path (for recursion)
Returns:
List[List[str]]: List of key paths to leaf values
"""
def nested_get(
mapping: Dict,
*keys: Tuple[str, str]
) -> Any:
"""
Get value from nested dictionary using key path.
Args:
mapping (Dict): Nested dictionary
*keys: Tuple pairs of (key, fallback_key) for each level
Returns:
Any: Found value or None
"""
def nested_set(
mapping: Dict,
keys: List[str],
value: Any
) -> None:
"""
Set value in nested dictionary using key path.
Args:
mapping (Dict): Nested dictionary to modify
keys (List[str]): Key path to set value
value (Any): Value to set
"""from sqlglot.schema import MappingSchema
# Define schema with table and column information
schema = MappingSchema({
"users": {
"id": "INT",
"name": "VARCHAR(100)",
"email": "VARCHAR(255)",
"age": "INT",
"created_date": "DATE",
"is_active": "BOOLEAN"
},
"orders": {
"id": "INT",
"user_id": "INT",
"product_name": "VARCHAR(200)",
"quantity": "INT",
"price": "DECIMAL(10,2)",
"order_date": "TIMESTAMP"
},
"products": {
"id": "INT",
"name": "VARCHAR(200)",
"category": "VARCHAR(50)",
"price": "DECIMAL(10,2)",
"in_stock": "BOOLEAN"
}
})
# Use schema for operations
print(schema.column_names("users"))
# ['id', 'name', 'email', 'age', 'created_date', 'is_active']
print(schema.get_column_type("users", "email"))
# 'VARCHAR(255)'from sqlglot.schema import MappingSchema
# Define multi-database schema
schema = MappingSchema({
"production": {
"users": {
"id": "INT",
"name": "VARCHAR(100)",
"email": "VARCHAR(255)"
},
"orders": {
"id": "INT",
"user_id": "INT",
"total": "DECIMAL(10,2)"
}
},
"analytics": {
"user_metrics": {
"user_id": "INT",
"total_orders": "INT",
"lifetime_value": "DECIMAL(12,2)"
},
"daily_stats": {
"date": "DATE",
"total_users": "INT",
"total_revenue": "DECIMAL(15,2)"
}
}
})
# Access qualified table names
print(schema.column_names("production.users"))
print(schema.get_column_type("analytics.user_metrics", "lifetime_value"))from sqlglot.schema import MappingSchema
# Start with empty schema
schema = MappingSchema()
# Add tables dynamically
schema.add_table("customers", {
"customer_id": "INT",
"first_name": "VARCHAR(50)",
"last_name": "VARCHAR(50)",
"email": "VARCHAR(100)",
"phone": "VARCHAR(20)"
})
schema.add_table("invoices", {
"invoice_id": "INT",
"customer_id": "INT",
"invoice_date": "DATE",
"due_date": "DATE",
"amount": "DECIMAL(10,2)",
"status": "VARCHAR(20)"
})
# Add table with qualified name
schema.add_table("reporting.monthly_summary", {
"month": "DATE",
"total_customers": "INT",
"total_revenue": "DECIMAL(15,2)",
"avg_invoice_amount": "DECIMAL(10,2)"
})
print(schema.column_names("customers"))
print(schema.column_names("reporting.monthly_summary"))import sqlglot
from sqlglot.schema import MappingSchema
from sqlglot.optimizer import optimize
# Define comprehensive schema
schema = MappingSchema({
"ecommerce": {
"customers": {
"id": "INT",
"email": "VARCHAR(255)",
"first_name": "VARCHAR(100)",
"last_name": "VARCHAR(100)",
"registration_date": "DATE"
},
"orders": {
"id": "INT",
"customer_id": "INT",
"order_date": "TIMESTAMP",
"status": "VARCHAR(20)",
"total_amount": "DECIMAL(12,2)"
},
"order_items": {
"id": "INT",
"order_id": "INT",
"product_id": "INT",
"quantity": "INT",
"unit_price": "DECIMAL(10,2)"
},
"products": {
"id": "INT",
"name": "VARCHAR(200)",
"category": "VARCHAR(100)",
"price": "DECIMAL(10,2)",
"weight": "DECIMAL(8,3)"
}
}
})
# Use schema for query optimization
sql = """
SELECT c.first_name, c.last_name, SUM(o.total_amount) as total_spent
FROM customers c
JOIN orders o ON c.id = o.customer_id
WHERE c.registration_date >= '2023-01-01'
AND o.status = 'completed'
GROUP BY c.id, c.first_name, c.last_name
HAVING SUM(o.total_amount) > 1000
"""
# Optimize with schema information
optimized = optimize(sql, schema=schema, dialect="postgres")
print(optimized.sql(pretty=True))from sqlglot.schema import MappingSchema
from sqlglot.optimizer import annotate_types
import sqlglot
# Define schema with specific types
schema = MappingSchema({
"sales": {
"transaction_id": "BIGINT",
"customer_id": "INT",
"product_id": "INT",
"sale_date": "DATE",
"sale_timestamp": "TIMESTAMP",
"amount": "DECIMAL(10,2)",
"tax_rate": "DECIMAL(5,4)",
"currency": "CHAR(3)",
"notes": "TEXT"
}
})
# Parse and type-annotate query
sql = """
SELECT
customer_id,
COUNT(*) as transaction_count,
SUM(amount) as total_sales,
AVG(amount) as avg_sale,
MAX(sale_date) as last_sale_date
FROM sales
WHERE sale_date >= '2023-01-01'
AND amount > 10.00
GROUP BY customer_id
"""
expression = sqlglot.parse_one(sql)
typed_expression = annotate_types(expression, schema=schema)
# The expression now has type information
print(typed_expression.sql(pretty=True))from sqlglot.schema import flatten_schema, nested_get, nested_set
# Complex nested schema
complex_schema = {
"company_a": {
"production": {
"users": {"id": "INT", "name": "VARCHAR"},
"orders": {"id": "INT", "user_id": "INT"}
},
"staging": {
"temp_users": {"id": "INT", "name": "VARCHAR"}
}
},
"company_b": {
"production": {
"customers": {"id": "INT", "email": "VARCHAR"}
}
}
}
# Flatten schema to get all table paths
flattened = flatten_schema(complex_schema, depth=3)
print("All table paths:")
for path in flattened:
print(".".join(path))
# Get specific nested values
user_id_type = nested_get(complex_schema,
("company_a", "company_a"),
("production", "production"),
("users", "users"),
("id", "id"))
print(f"User ID type: {user_id_type}")
# Set nested values
nested_set(complex_schema,
["company_a", "production", "users", "created_date"],
"TIMESTAMP")
print("Updated schema:", complex_schema["company_a"]["production"]["users"])class Schema:
"""Abstract base class for database schemas."""
dialect: str # SQL dialect for schema operations
def add_table(self, table, column_mapping=None, **opts) -> None: ...
def column_names(self, table, **opts) -> List[str]: ...
def get_column_type(self, table, column, **opts) -> Optional[str]: ...
class MappingSchema(Schema):
"""Dictionary-based schema implementation."""
mapping: Dict # Underlying schema data
visible: Dict # Column visibility mapping
normalize: bool # Whether to normalize identifiers
supported_table_args: Any # Supported table arguments
def __init__(self, schema=None, visible=None, dialect=None, normalize=True, **kwargs): ...
def copy(self, **kwargs) -> MappingSchema: ...
def nested_get(self, *keys) -> Any: ...
def nested_set(self, keys: List[str], value: Any) -> None: ...
# Type aliases for schema data structures
ColumnMapping = Union[Dict, str, List] # Column definition formats
SchemaDict = Dict[str, Any] # Nested schema dictionaryInstall with Tessl CLI
npx tessl i tessl/pypi-sqlglot