CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-snowflake-sqlalchemy

Snowflake SQLAlchemy Dialect providing comprehensive database connectivity and ORM support for Snowflake cloud data warehouse

Pending
Overview
Eval results
Files

data-operations.mddocs/

Data Operations

Specialized SQL operations including MERGE statements for upserts and COPY INTO commands for bulk data loading from cloud storage with comprehensive formatting options.

Capabilities

MERGE Operations

Advanced MERGE INTO statements for complex upsert operations with conditional logic.

from snowflake.sqlalchemy import MergeInto

class MergeInto(UpdateBase):
    """MERGE INTO statement builder for upsert operations."""
    
    def __init__(self, target, source, on):
        """
        Create MERGE INTO statement.
        
        Args:
            target: Target table
            source: Source table or query
            on: Join condition
        """
    
    def when_matched_then_update(self):
        """
        Add WHEN MATCHED THEN UPDATE clause.
        
        Returns:
            clause: Clause object with .values(**kwargs) and .where(predicate) methods
        """
    
    def when_matched_then_delete(self):
        """
        Add WHEN MATCHED THEN DELETE clause.
        
        Returns:
            clause: Clause object with .where(predicate) method
        """
    
    def when_not_matched_then_insert(self):
        """
        Add WHEN NOT MATCHED THEN INSERT clause.
        
        Returns:
            clause: Clause object with .values(**kwargs) and .where(predicate) methods
        """

COPY INTO Operations

Bulk data loading operations with comprehensive formatting and configuration options.

from snowflake.sqlalchemy import CopyIntoStorage

class CopyInto(UpdateBase):
    """Base class for COPY INTO operations."""
    
    def __init__(self, from_, into, partition_by=None, formatter=None):
        """
        Create COPY INTO statement.
        
        Args:
            from_: Source location
            into: Target location
            partition_by: Optional partition expression
            formatter: File format specification
        """
    
    def force(self, force: bool = True):
        """
        Set FORCE option to reload files.
        
        Args:
            force: Whether to force reload
        
        Returns:
            CopyInto: Self for method chaining
        """
    
    def single(self, single_file: bool = True):
        """
        Set SINGLE option to load single file.
        
        Args:
            single_file: Whether to load single file
        
        Returns:
            CopyInto: Self for method chaining
        """
    
    def maxfilesize(self, max_size: int):
        """
        Set maximum file size.
        
        Args:
            max_size: Maximum file size
        
        Returns:
            CopyInto: Self for method chaining
        """
    
    def files(self, file_names: List[str]):
        """
        Set FILES option to specify file list.
        
        Args:
            file_names: List of file names
        
        Returns:
            CopyInto: Self for method chaining
        """
    
    def pattern(self, pattern: str):
        """
        Set PATTERN option for file matching.
        
        Args:
            pattern: Regex pattern for file matching
        
        Returns:
            CopyInto: Self for method chaining
        """

# CopyIntoStorage is an alias for CopyInto
CopyIntoStorage = CopyInto

class CopyIntoStorage(CopyInto):
    """COPY INTO statement for bulk data operations (alias for CopyInto)."""
    
    def __init__(self, table, stage_location, file_format=None):
        """
        Create COPY INTO statement.
        
        Args:
            table: Target table
            stage_location: Stage or external location
            file_format: File format specification
        """
    
    # Inherits all methods from CopyInto base class:
    # force(), single(), maxfilesize(), files(), pattern()

File Format Classes

Specialized formatters for different file types with comprehensive options.

from snowflake.sqlalchemy import (
    CopyFormatter, CSVFormatter, JSONFormatter, PARQUETFormatter
)

class CopyFormatter:
    """Base formatter for COPY commands."""
    
    def __init__(self):
        """Initialize base formatter."""

class CSVFormatter(CopyFormatter):
    """CSV-specific formatter with extensive options."""
    
    def __init__(self):
        """Initialize CSV formatter."""
    
    def compression(self, compression_type: str):
        """
        Set compression type.
        
        Args:
            compression_type: Compression type (GZIP, BZIP2, etc.)
        """
    
    def record_delimiter(self, delimiter: Union[str, int]):
        """
        Set record delimiter.
        
        Args:
            delimiter: Record delimiter character or ASCII code
        """
    
    def field_delimiter(self, delimiter: Union[str, int]):
        """
        Set field delimiter.
        
        Args:
            delimiter: Field delimiter character or ASCII code
        """
    
    def skip_header(self, lines: int):
        """
        Set number of header lines to skip.
        
        Args:
            lines: Number of header lines
        """
    
    def null_if(self, values: Sequence[str]):
        """
        Set NULL replacement values.
        
        Args:
            values: List of strings to treat as NULL
        """
    
    def error_on_column_count_mismatch(self, flag: bool):
        """
        Set error behavior for column count mismatch.
        
        Args:
            flag: Whether to error on mismatch
        """

class JSONFormatter(CopyFormatter):
    """JSON-specific formatter."""
    
    def __init__(self):
        """Initialize JSON formatter."""
    
    def compression(self, compression_type: str):
        """
        Set compression type.
        
        Args:
            compression_type: Compression type
        """
    
    def file_extension(self, extension: str):
        """
        Set file extension.
        
        Args:
            extension: File extension
        """

class PARQUETFormatter(CopyFormatter):
    """Parquet-specific formatter."""
    
    def __init__(self):
        """Initialize Parquet formatter."""
    
    def snappy_compression(self, enabled: bool):
        """
        Enable/disable Snappy compression.
        
        Args:
            enabled: Whether Snappy compression is enabled
        """
    
    def binary_as_text(self, flag: bool):
        """
        Handle binary data as text.
        
        Args:
            flag: Whether to treat binary as text
        """

Stage Management

DDL operations for creating and managing stages and file formats.

from snowflake.sqlalchemy import CreateStage, CreateFileFormat

class CreateStage(DDLElement):
    """CREATE STAGE DDL statement."""
    
    def __init__(self, container, stage, replace_if_exists=False, *, temporary=False):
        """
        Create stage creation statement.
        
        Args:
            container: Container (physical base for the stage)
            stage: ExternalStage object
            replace_if_exists: Whether to replace if exists
            temporary: Whether stage is temporary
        """

class CreateFileFormat(DDLElement):
    """CREATE FILE FORMAT DDL statement."""
    
    def __init__(self, format_name, formatter, replace_if_exists=False):
        """
        Create file format creation statement.
        
        Args:
            format_name: File format name
            formatter: Formatter object (CSVFormatter, JSONFormatter, etc.)
            replace_if_exists: Whether to replace if exists
        """

Usage Examples

MERGE Operations

from snowflake.sqlalchemy import MergeInto
from sqlalchemy import MetaData, Table, Column, Integer, select

# Basic merge operation
target_table = Table('users', metadata, autoload_with=engine)
source_table = Table('user_updates', metadata, autoload_with=engine)

merge = MergeInto(
    target=target_table,
    source=source_table,
    on=target_table.c.id == source_table.c.id
)

# Add clauses
update_clause = merge.when_matched_then_update().values(
    name=source_table.c.name,
    email=source_table.c.email
)

insert_clause = merge.when_not_matched_then_insert().values(
    id=source_table.c.id,
    name=source_table.c.name,
    email=source_table.c.email
)

# Execute merge
engine.execute(merge)

Complex MERGE with Conditions

# Merge with conditional logic
merge = MergeInto(
    target=target_table,
    source=select(source_table).where(source_table.c.active == True),
    on=target_table.c.id == source_table.c.id
)

# Add clauses with conditions
update_clause = merge.when_matched_then_update().values(
    name=source_table.c.name,
    updated_at=func.current_timestamp()
)

delete_clause = merge.when_matched_then_delete().where(
    source_table.c.deleted == True
)

insert_clause = merge.when_not_matched_then_insert().values(
    id=source_table.c.id,
    name=source_table.c.name,
    created_at=func.current_timestamp()
)

CSV Copy Operations

from snowflake.sqlalchemy import CopyIntoStorage, CSVFormatter

# Create CSV formatter with options
csv_format = (CSVFormatter()
    .compression('GZIP')
    .field_delimiter(',')
    .record_delimiter('\n')
    .skip_header(1)
    .null_if(['', 'NULL', 'null'])
    .error_on_column_count_mismatch(True)
)

# Copy from external stage
copy_stmt = (CopyIntoStorage(
        table=users_table,
        stage_location='@my_stage/users/',
        file_format=csv_format
    )
    .files(['users_001.csv.gz', 'users_002.csv.gz'])
    .force(True)
)

engine.execute(copy_stmt)

JSON Copy Operations

from snowflake.sqlalchemy import JSONFormatter

# JSON formatter
json_format = (JSONFormatter()
    .compression('GZIP')
    .file_extension('.json')
)

# Copy JSON data
copy_json = CopyIntoStorage(
    table=events_table,
    stage_location='s3://my-bucket/events/',
    file_format=json_format
).pattern('.*events.*\.json\.gz')

engine.execute(copy_json)

Parquet Copy Operations

from snowflake.sqlalchemy import PARQUETFormatter

# Parquet formatter
parquet_format = (PARQUETFormatter()
    .snappy_compression(True)
    .binary_as_text(False)
)

# Copy Parquet files
copy_parquet = CopyIntoStorage(
    table=analytics_table,
    stage_location='@analytics_stage/',
    file_format=parquet_format
).single(False)

engine.execute(copy_parquet)

Stage and File Format Creation

from snowflake.sqlalchemy import CreateStage, CreateFileFormat

# Create CSV formatter first
csv_formatter = (CSVFormatter()
    .field_delimiter(',')
    .skip_header(1)
    .null_if(['NULL', ''])
)

# Create file format
create_csv_format = CreateFileFormat(
    format_name='my_csv_format',
    formatter=csv_formatter
)

# Create AWS bucket container
aws_container = (AWSBucket('my-bucket', 'data/')
    .credentials(aws_key_id='key', aws_secret_key='secret')
)

# Create external stage
external_stage = ExternalStage('my_external_stage')
create_stage = CreateStage(
    container=aws_container,
    stage=external_stage
)

engine.execute(create_csv_format)
engine.execute(create_stage)

Install with Tessl CLI

npx tessl i tessl/pypi-snowflake-sqlalchemy

docs

cloud-storage.md

connection-config.md

data-operations.md

data-types.md

index.md

table-types.md

tile.json