Snowflake SQLAlchemy Dialect providing comprehensive database connectivity and ORM support for Snowflake cloud data warehouse
—
Advanced table types for modern data architectures including hybrid OLTP tables, dynamic tables for streaming analytics, and Iceberg tables for data lake integration.
Basic Snowflake table with clustering support and standard features.
from snowflake.sqlalchemy import SnowflakeTable
class SnowflakeTable(Table):
"""Standard Snowflake table with clustering support."""
def __init__(self, *args, **kwargs):
"""Create standard Snowflake table."""Snowflake hybrid table optimized for OLTP workloads with primary key enforcement.
from snowflake.sqlalchemy import HybridTable
class HybridTable(SnowflakeTable):
"""Hybrid table for OLTP workloads with primary key enforcement."""
_enforce_primary_keys = True
_support_structured_types = True
def __init__(self, *args, **kwargs):
"""Create hybrid table with OLTP optimizations."""Snowflake dynamic table for streaming and incremental data processing.
from snowflake.sqlalchemy import DynamicTable
from snowflake.sqlalchemy.sql.custom_schema.options import (
IdentifierOption, TargetLagOption
)
class DynamicTable(SnowflakeTable):
"""Dynamic table for streaming analytics and incremental processing."""
def __init__(self, warehouse, target_lag, refresh_mode=None, *args, **kwargs):
"""
Create dynamic table.
Args:
warehouse: Warehouse identifier for compute
target_lag: Target lag for refresh frequency
refresh_mode: Refresh mode (AUTO, FULL, INCREMENTAL)
"""
@property
def warehouse(self) -> IdentifierOption:
"""Get warehouse identifier option."""
@property
def target_lag(self) -> TargetLagOption:
"""Get target lag option."""Snowflake Iceberg table for data lake integration with external catalogs and storage.
from snowflake.sqlalchemy import IcebergTable
from snowflake.sqlalchemy.sql.custom_schema.options import LiteralOption
class IcebergTable(SnowflakeTable):
"""Iceberg table for data lake integration."""
_support_structured_types = True
def __init__(self, *args, **kwargs):
"""Create Iceberg table with data lake capabilities."""
@property
def external_volume(self) -> LiteralOption:
"""Get external volume option."""
@property
def base_location(self) -> LiteralOption:
"""Get base location option."""
@property
def catalog(self) -> LiteralOption:
"""Get catalog option."""Configuration options for customizing table behavior and properties.
from snowflake.sqlalchemy.sql.custom_schema.options import (
AsQueryOption, ClusterByOption, IdentifierOption,
KeywordOption, LiteralOption, TargetLagOption
)
class AsQueryOption:
"""AS query clause for CREATE TABLE AS SELECT."""
def __init__(self, query: Union[str, Selectable]):
"""
Create AS query option.
Args:
query: SQL query string or SQLAlchemy selectable
"""
class ClusterByOption:
"""CLUSTER BY clause for table clustering."""
def __init__(self, *expressions: Union[str, TextClause]):
"""
Create cluster by option.
Args:
*expressions: Column expressions for clustering
"""
class IdentifierOption:
"""Identifier option (unquoted)."""
def __init__(self, value: str):
"""
Create identifier option.
Args:
value: Identifier value
"""
class LiteralOption:
"""Literal option (quoted)."""
def __init__(self, value: Union[int, str]):
"""
Create literal option.
Args:
value: Literal value
"""
class TargetLagOption:
"""Target lag for dynamic tables."""
def __init__(self, time: int, unit: TimeUnit):
"""
Create target lag option.
Args:
time: Time value
unit: Time unit (SECONDS, MINUTES, HOURS, DAYS)
"""from snowflake.sqlalchemy.sql.custom_schema.options import (
SnowflakeKeyword, TableOptionKey, TimeUnit
)
class SnowflakeKeyword(Enum):
"""Snowflake-specific keywords."""
DOWNSTREAM = "DOWNSTREAM"
AUTO = "AUTO"
FULL = "FULL"
INCREMENTAL = "INCREMENTAL"
class TableOptionKey(Enum):
"""Table option keys."""
AS_QUERY = "AS_QUERY"
BASE_LOCATION = "BASE_LOCATION"
CATALOG = "CATALOG"
CATALOG_SYNC = "CATALOG_SYNC"
CLUSTER_BY = "CLUSTER_BY"
DATA_RETENTION_TIME_IN_DAYS = "DATA_RETENTION_TIME_IN_DAYS"
DEFAULT_DDL_COLLATION = "DEFAULT_DDL_COLLATION"
EXTERNAL_VOLUME = "EXTERNAL_VOLUME"
MAX_DATA_EXTENSION_TIME_IN_DAYS = "MAX_DATA_EXTENSION_TIME_IN_DAYS"
REFRESH_MODE = "REFRESH_MODE"
STORAGE_SERIALIZATION_POLICY = "STORAGE_SERIALIZATION_POLICY"
TARGET_LAG = "TARGET_LAG"
WAREHOUSE = "WAREHOUSE"
class TimeUnit(Enum):
"""Time units for target lag."""
SECONDS = "SECONDS"
MINUTES = "MINUTES"
HOURS = "HOURS"
DAYS = "DAYS"from sqlalchemy import Column, Integer, MetaData
from snowflake.sqlalchemy import SnowflakeTable, TEXT
from snowflake.sqlalchemy.sql.custom_schema.options import ClusterByOption
metadata = MetaData()
sales = SnowflakeTable(
'sales',
metadata,
Column('id', Integer, primary_key=True),
Column('customer_id', Integer),
Column('product_name', TEXT()),
Column('amount', DECIMAL(10, 2)),
cluster_by=ClusterByOption('customer_id', 'product_name')
)from snowflake.sqlalchemy import HybridTable, INTEGER, TEXT
users = HybridTable(
'users',
metadata,
Column('user_id', INTEGER, primary_key=True), # Required for hybrid tables
Column('username', TEXT(), unique=True),
Column('email', TEXT()),
Column('profile', VARIANT)
)from snowflake.sqlalchemy import DynamicTable
from snowflake.sqlalchemy.sql.custom_schema.options import (
TargetLagOption, TimeUnit, SnowflakeKeyword
)
# Create dynamic table with auto-refresh
sales_summary = DynamicTable(
'sales_summary',
metadata,
Column('customer_id', INTEGER),
Column('total_sales', DECIMAL(12, 2)),
Column('order_count', INTEGER),
warehouse='COMPUTE_WH',
target_lag=TargetLagOption(1, TimeUnit.HOURS),
refresh_mode=SnowflakeKeyword.AUTO,
as_query='SELECT customer_id, SUM(amount), COUNT(*) FROM sales GROUP BY customer_id'
)from snowflake.sqlalchemy import IcebergTable
events = IcebergTable(
'events',
metadata,
Column('event_id', INTEGER),
Column('timestamp', TIMESTAMP_TZ),
Column('event_data', VARIANT),
external_volume='my_external_volume',
base_location='s3://my-bucket/events/',
catalog='my_catalog'
)from snowflake.sqlalchemy.sql.custom_schema.options import (
AsQueryOption, ClusterByOption, IdentifierOption, LiteralOption
)
complex_table = SnowflakeTable(
'complex_table',
metadata,
Column('id', INTEGER),
Column('data', VARIANT),
cluster_by=ClusterByOption('id'),
data_retention_time_in_days=LiteralOption(30),
warehouse=IdentifierOption('ANALYTICS_WH'),
as_query=AsQueryOption('SELECT * FROM source_table WHERE active = true')
)Install with Tessl CLI
npx tessl i tessl/pypi-snowflake-sqlalchemy