Snowflake SQLAlchemy Dialect providing comprehensive database connectivity and ORM support for Snowflake cloud data warehouse
—
Specialized SQL operations including MERGE statements for upserts and COPY INTO commands for bulk data loading from cloud storage with comprehensive formatting options.
Advanced MERGE INTO statements for complex upsert operations with conditional logic.
from snowflake.sqlalchemy import MergeInto
class MergeInto(UpdateBase):
"""MERGE INTO statement builder for upsert operations."""
def __init__(self, target, source, on):
"""
Create MERGE INTO statement.
Args:
target: Target table
source: Source table or query
on: Join condition
"""
def when_matched_then_update(self):
"""
Add WHEN MATCHED THEN UPDATE clause.
Returns:
clause: Clause object with .values(**kwargs) and .where(predicate) methods
"""
def when_matched_then_delete(self):
"""
Add WHEN MATCHED THEN DELETE clause.
Returns:
clause: Clause object with .where(predicate) method
"""
def when_not_matched_then_insert(self):
"""
Add WHEN NOT MATCHED THEN INSERT clause.
Returns:
clause: Clause object with .values(**kwargs) and .where(predicate) methods
"""Bulk data loading operations with comprehensive formatting and configuration options.
from snowflake.sqlalchemy import CopyIntoStorage
class CopyInto(UpdateBase):
"""Base class for COPY INTO operations."""
def __init__(self, from_, into, partition_by=None, formatter=None):
"""
Create COPY INTO statement.
Args:
from_: Source location
into: Target location
partition_by: Optional partition expression
formatter: File format specification
"""
def force(self, force: bool = True):
"""
Set FORCE option to reload files.
Args:
force: Whether to force reload
Returns:
CopyInto: Self for method chaining
"""
def single(self, single_file: bool = True):
"""
Set SINGLE option to load single file.
Args:
single_file: Whether to load single file
Returns:
CopyInto: Self for method chaining
"""
def maxfilesize(self, max_size: int):
"""
Set maximum file size.
Args:
max_size: Maximum file size
Returns:
CopyInto: Self for method chaining
"""
def files(self, file_names: List[str]):
"""
Set FILES option to specify file list.
Args:
file_names: List of file names
Returns:
CopyInto: Self for method chaining
"""
def pattern(self, pattern: str):
"""
Set PATTERN option for file matching.
Args:
pattern: Regex pattern for file matching
Returns:
CopyInto: Self for method chaining
"""
# CopyIntoStorage is an alias for CopyInto
CopyIntoStorage = CopyInto
class CopyIntoStorage(CopyInto):
"""COPY INTO statement for bulk data operations (alias for CopyInto)."""
def __init__(self, table, stage_location, file_format=None):
"""
Create COPY INTO statement.
Args:
table: Target table
stage_location: Stage or external location
file_format: File format specification
"""
# Inherits all methods from CopyInto base class:
# force(), single(), maxfilesize(), files(), pattern()Specialized formatters for different file types with comprehensive options.
from snowflake.sqlalchemy import (
CopyFormatter, CSVFormatter, JSONFormatter, PARQUETFormatter
)
class CopyFormatter:
"""Base formatter for COPY commands."""
def __init__(self):
"""Initialize base formatter."""
class CSVFormatter(CopyFormatter):
"""CSV-specific formatter with extensive options."""
def __init__(self):
"""Initialize CSV formatter."""
def compression(self, compression_type: str):
"""
Set compression type.
Args:
compression_type: Compression type (GZIP, BZIP2, etc.)
"""
def record_delimiter(self, delimiter: Union[str, int]):
"""
Set record delimiter.
Args:
delimiter: Record delimiter character or ASCII code
"""
def field_delimiter(self, delimiter: Union[str, int]):
"""
Set field delimiter.
Args:
delimiter: Field delimiter character or ASCII code
"""
def skip_header(self, lines: int):
"""
Set number of header lines to skip.
Args:
lines: Number of header lines
"""
def null_if(self, values: Sequence[str]):
"""
Set NULL replacement values.
Args:
values: List of strings to treat as NULL
"""
def error_on_column_count_mismatch(self, flag: bool):
"""
Set error behavior for column count mismatch.
Args:
flag: Whether to error on mismatch
"""
class JSONFormatter(CopyFormatter):
"""JSON-specific formatter."""
def __init__(self):
"""Initialize JSON formatter."""
def compression(self, compression_type: str):
"""
Set compression type.
Args:
compression_type: Compression type
"""
def file_extension(self, extension: str):
"""
Set file extension.
Args:
extension: File extension
"""
class PARQUETFormatter(CopyFormatter):
"""Parquet-specific formatter."""
def __init__(self):
"""Initialize Parquet formatter."""
def snappy_compression(self, enabled: bool):
"""
Enable/disable Snappy compression.
Args:
enabled: Whether Snappy compression is enabled
"""
def binary_as_text(self, flag: bool):
"""
Handle binary data as text.
Args:
flag: Whether to treat binary as text
"""DDL operations for creating and managing stages and file formats.
from snowflake.sqlalchemy import CreateStage, CreateFileFormat
class CreateStage(DDLElement):
"""CREATE STAGE DDL statement."""
def __init__(self, container, stage, replace_if_exists=False, *, temporary=False):
"""
Create stage creation statement.
Args:
container: Container (physical base for the stage)
stage: ExternalStage object
replace_if_exists: Whether to replace if exists
temporary: Whether stage is temporary
"""
class CreateFileFormat(DDLElement):
"""CREATE FILE FORMAT DDL statement."""
def __init__(self, format_name, formatter, replace_if_exists=False):
"""
Create file format creation statement.
Args:
format_name: File format name
formatter: Formatter object (CSVFormatter, JSONFormatter, etc.)
replace_if_exists: Whether to replace if exists
"""from snowflake.sqlalchemy import MergeInto
from sqlalchemy import MetaData, Table, Column, Integer, select
# Basic merge operation
target_table = Table('users', metadata, autoload_with=engine)
source_table = Table('user_updates', metadata, autoload_with=engine)
merge = MergeInto(
target=target_table,
source=source_table,
on=target_table.c.id == source_table.c.id
)
# Add clauses
update_clause = merge.when_matched_then_update().values(
name=source_table.c.name,
email=source_table.c.email
)
insert_clause = merge.when_not_matched_then_insert().values(
id=source_table.c.id,
name=source_table.c.name,
email=source_table.c.email
)
# Execute merge
engine.execute(merge)# Merge with conditional logic
merge = MergeInto(
target=target_table,
source=select(source_table).where(source_table.c.active == True),
on=target_table.c.id == source_table.c.id
)
# Add clauses with conditions
update_clause = merge.when_matched_then_update().values(
name=source_table.c.name,
updated_at=func.current_timestamp()
)
delete_clause = merge.when_matched_then_delete().where(
source_table.c.deleted == True
)
insert_clause = merge.when_not_matched_then_insert().values(
id=source_table.c.id,
name=source_table.c.name,
created_at=func.current_timestamp()
)from snowflake.sqlalchemy import CopyIntoStorage, CSVFormatter
# Create CSV formatter with options
csv_format = (CSVFormatter()
.compression('GZIP')
.field_delimiter(',')
.record_delimiter('\n')
.skip_header(1)
.null_if(['', 'NULL', 'null'])
.error_on_column_count_mismatch(True)
)
# Copy from external stage
copy_stmt = (CopyIntoStorage(
table=users_table,
stage_location='@my_stage/users/',
file_format=csv_format
)
.files(['users_001.csv.gz', 'users_002.csv.gz'])
.force(True)
)
engine.execute(copy_stmt)from snowflake.sqlalchemy import JSONFormatter
# JSON formatter
json_format = (JSONFormatter()
.compression('GZIP')
.file_extension('.json')
)
# Copy JSON data
copy_json = CopyIntoStorage(
table=events_table,
stage_location='s3://my-bucket/events/',
file_format=json_format
).pattern('.*events.*\.json\.gz')
engine.execute(copy_json)from snowflake.sqlalchemy import PARQUETFormatter
# Parquet formatter
parquet_format = (PARQUETFormatter()
.snappy_compression(True)
.binary_as_text(False)
)
# Copy Parquet files
copy_parquet = CopyIntoStorage(
table=analytics_table,
stage_location='@analytics_stage/',
file_format=parquet_format
).single(False)
engine.execute(copy_parquet)from snowflake.sqlalchemy import CreateStage, CreateFileFormat
# Create CSV formatter first
csv_formatter = (CSVFormatter()
.field_delimiter(',')
.skip_header(1)
.null_if(['NULL', ''])
)
# Create file format
create_csv_format = CreateFileFormat(
format_name='my_csv_format',
formatter=csv_formatter
)
# Create AWS bucket container
aws_container = (AWSBucket('my-bucket', 'data/')
.credentials(aws_key_id='key', aws_secret_key='secret')
)
# Create external stage
external_stage = ExternalStage('my_external_stage')
create_stage = CreateStage(
container=aws_container,
stage=external_stage
)
engine.execute(create_csv_format)
engine.execute(create_stage)Install with Tessl CLI
npx tessl i tessl/pypi-snowflake-sqlalchemy