CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-snowflake

Provider package apache-airflow-providers-snowflake for Apache Airflow

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

transfers.mddocs/

Data Transfer Operations

Specialized operators for efficient bulk data loading from cloud storage services into Snowflake using COPY INTO operations. These operators provide optimized data ingestion capabilities with support for multiple cloud storage platforms, file formats, and advanced loading options.

Capabilities

External Stage Copy Operator

Primary operator for loading data from external cloud storage stages (S3, GCS, Azure Blob) into Snowflake tables using COPY INTO commands with comprehensive configuration options.

class CopyFromExternalStageToSnowflakeOperator(BaseOperator):
    """
    Execute COPY INTO command to load files from external stage to Snowflake.
    Supports S3, GCS, and Azure Blob Storage with flexible file pattern matching
    and comprehensive loading options.
    """
    
    template_fields: Sequence[str] = ("files",)
    template_fields_renderers = {"files": "json"}
    
    def __init__(
        self,
        *,
        files: list | None = None,
        table: str,
        stage: str,
        prefix: str | None = None,
        file_format: str,
        schema: str | None = None,
        columns_array: list | None = None,
        pattern: str | None = None,
        warehouse: str | None = None,
        database: str | None = None,
        autocommit: bool = True,
        snowflake_conn_id: str = "snowflake_default",
        role: str | None = None,
        authenticator: str | None = None,
        session_parameters: dict | None = None,
        copy_options: str | None = None,
        validation_mode: str | None = None,
        **kwargs,
    ):
        """
        Initialize external stage copy operator.
        
        Parameters:
        - files: List of specific files to copy (optional, can use pattern instead)
        - table: Target Snowflake table name
        - stage: External stage name (e.g., '@my_s3_stage')
        - prefix: File path prefix within stage
        - file_format: Named file format or inline format specification
        - schema: Target schema name (optional, uses default if not specified)
        - columns_array: List of column names for selective loading
        - pattern: File pattern for matching files (alternative to files list)
        - warehouse: Snowflake warehouse name
        - database: Snowflake database name
        - autocommit: Enable autocommit for the COPY operation
        - snowflake_conn_id: Snowflake connection ID
        - role: Snowflake role name
        - authenticator: Authentication method
        - session_parameters: Session-level parameters
        - copy_options: Additional COPY INTO options (e.g., 'ON_ERROR=CONTINUE')
        - validation_mode: Validation mode ('RETURN_ERRORS', 'RETURN_ALL_ERRORS')
        """

Execution Method

def execute(self, context: Any) -> None:
    """
    Execute the COPY INTO command to load data from external stage.
    
    Parameters:
    - context: Airflow task execution context
    
    Returns:
    Copy operation results and statistics
    """

OpenLineage Integration

def get_openlineage_facets_on_complete(self, task_instance):
    """
    Get OpenLineage facets after COPY operation completion.
    Provides data lineage information for the copy operation.
    
    Parameters:
    - task_instance: Airflow TaskInstance object
    
    Returns:
    OpenLineage facets dictionary with lineage metadata
    """

Internal Methods

def _extract_openlineage_unique_dataset_paths(
    self, 
    query_result: list[dict[str, Any]]
) -> tuple[list[tuple[str, str]], list[str]]:
    """
    Extract unique dataset paths for OpenLineage tracking.
    
    Parameters:
    - query_result: COPY command result data
    
    Returns:
    Tuple of (dataset_paths, file_paths) for lineage tracking
    """

Validation Function

def _validate_parameter(param_name: str, value: str | None) -> str | None:
    """
    Validate parameter to ensure it doesn't contain invalid patterns.
    Prevents SQL injection by checking for semicolons and other dangerous patterns.
    
    Parameters:
    - param_name: Name of parameter being validated
    - value: Parameter value to validate
    
    Returns:
    Validated parameter value or None
    
    Raises:
    ValueError: If parameter contains invalid patterns
    """

Usage Examples

Basic File Copy from S3

from airflow import DAG
from airflow.providers.snowflake.transfers.copy_into_snowflake import CopyFromExternalStageToSnowflakeOperator
from datetime import datetime

with DAG(
    'data_ingestion_pipeline',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
    catchup=False
) as dag:
    
    # Copy CSV files from S3 to Snowflake
    load_sales_data = CopyFromExternalStageToSnowflakeOperator(
        task_id='load_daily_sales',
        snowflake_conn_id='snowflake_prod',
        table='raw.sales_transactions',
        stage='@s3_data_stage',
        prefix='sales/daily/{{ ds }}/',
        file_format='csv_format',
        warehouse='LOADING_WH',
        database='RAW_DATA',
        schema='PUBLIC',
        copy_options='ON_ERROR=CONTINUE FORCE=TRUE',
        autocommit=True
    )

Pattern-Based File Loading

from airflow.providers.snowflake.transfers.copy_into_snowflake import CopyFromExternalStageToSnowflakeOperator

# Load files matching specific pattern
load_with_pattern = CopyFromExternalStageToSnowflakeOperator(
    task_id='load_json_files',
    snowflake_conn_id='snowflake_prod',
    table='staging.json_events',
    stage='@gcs_events_stage',
    pattern='events/year={{ macros.ds_format(ds, "%Y-%m-%d", "%Y") }}/month={{ macros.ds_format(ds, "%Y-%m-%d", "%m") }}/.*\\.json',
    file_format='json_format',
    warehouse='ETL_WH',
    database='STAGING',
    copy_options='MATCH_BY_COLUMN_NAME=CASE_INSENSITIVE'
)

Specific File List Loading

from airflow.providers.snowflake.transfers.copy_into_snowflake import CopyFromExternalStageToSnowflakeOperator

# Load specific files by name
load_specific_files = CopyFromExternalStageToSnowflakeOperator(
    task_id='load_critical_files',
    snowflake_conn_id='snowflake_prod',
    table='critical.financial_data',
    stage='@azure_secure_stage',
    files=[
        'financial/transactions/{{ ds }}/morning_batch.parquet',
        'financial/transactions/{{ ds }}/evening_batch.parquet',
        'financial/reconciliation/{{ ds }}/daily_summary.parquet'
    ],
    file_format='parquet_format',
    warehouse='SECURE_WH',
    database='FINANCIAL',
    schema='SECURE',
    copy_options='ON_ERROR=ABORT_STATEMENT',
    validation_mode='RETURN_ERRORS'
)

Selective Column Loading

from airflow.providers.snowflake.transfers.copy_into_snowflake import CopyFromExternalStageToSnowflakeOperator

# Load only specific columns from source files
load_partial_columns = CopyFromExternalStageToSnowflakeOperator(
    task_id='load_customer_subset',
    snowflake_conn_id='snowflake_prod',
    table='analytics.customer_subset',
    stage='@s3_customer_stage',
    prefix='customers/export/{{ ds }}/',
    file_format='csv_with_header',
    columns_array=[
        'customer_id',
        'customer_name', 
        'email',
        'registration_date',
        'lifetime_value'
    ],
    warehouse='ANALYTICS_WH',
    database='ANALYTICS',
    copy_options='SKIP_HEADER=1 FIELD_OPTIONALLY_ENCLOSED_BY=\\"'
)

Advanced Loading with Custom Options

from airflow.providers.snowflake.transfers.copy_into_snowflake import CopyFromExternalStageToSnowflakeOperator

# Advanced configuration with error handling and transformation
advanced_load = CopyFromExternalStageToSnowflakeOperator(
    task_id='advanced_data_load',
    snowflake_conn_id='snowflake_prod',
    table='staging.raw_events',
    stage='@s3_events_stage',
    prefix='events/{{ ds }}/',
    file_format='json_auto_detect',
    warehouse='HEAVY_LOADING_WH',
    database='STAGING',
    schema='RAW',
    copy_options='''
        ON_ERROR=CONTINUE
        SIZE_LIMIT=1000000000
        RETURN_FAILED_ONLY=TRUE
        MATCH_BY_COLUMN_NAME=CASE_INSENSITIVE
        ENFORCE_LENGTH=FALSE
        TRUNCATECOLUMNS=TRUE
    ''',
    validation_mode='RETURN_ALL_ERRORS',
    session_parameters={
        'QUERY_TAG': 'airflow_bulk_load_{{ ds }}',
        'MULTI_STATEMENT_COUNT': 1
    }
)

Multi-Stage Loading Pipeline

from airflow import DAG
from airflow.providers.snowflake.transfers.copy_into_snowflake import CopyFromExternalStageToSnowflakeOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeSqlApiOperator

with DAG(
    'multi_stage_ingestion',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@hourly',
    catchup=False
) as dag:
    
    # Stage 1: Load raw transaction data
    load_transactions = CopyFromExternalStageToSnowflakeOperator(
        task_id='load_raw_transactions',
        table='raw.transactions_staging',
        stage='@s3_transaction_stage',
        prefix='transactions/{{ ds }}/{{ format_datetime(ts, "%H") }}/',
        file_format='csv_transactions',
        warehouse='LOADING_WH'
    )
    
    # Stage 2: Load customer reference data
    load_customers = CopyFromExternalStageToSnowflakeOperator(
        task_id='load_customer_data',
        table='raw.customers_staging',
        stage='@s3_reference_stage',
        prefix='customers/daily/{{ ds }}/',
        file_format='json_customers',
        warehouse='LOADING_WH',
        copy_options='ON_ERROR=ABORT_STATEMENT'  # Strict loading for reference data
    )
    
    # Stage 3: Load product catalog
    load_products = CopyFromExternalStageToSnowflakeOperator(
        task_id='load_product_catalog',
        table='raw.products_staging',
        stage='@s3_catalog_stage',
        pattern='catalog/products_{{ ds }}\\.parquet',
        file_format='parquet_products',
        warehouse='LOADING_WH'
    )
    
    # Stage 4: Data quality validation and promotion
    validate_and_promote = SnowflakeSqlApiOperator(
        task_id='validate_and_promote_data',
        sql='''
            -- Validate transaction data
            CREATE OR REPLACE TABLE raw.transactions AS 
            SELECT * FROM raw.transactions_staging 
            WHERE customer_id IS NOT NULL 
              AND transaction_amount > 0 
              AND transaction_date = '{{ ds }}';
            
            -- Validate and merge customer data
            MERGE INTO raw.customers c 
            USING raw.customers_staging s ON c.customer_id = s.customer_id
            WHEN MATCHED THEN UPDATE SET 
                customer_name = s.customer_name,
                email = s.email,
                updated_at = CURRENT_TIMESTAMP()
            WHEN NOT MATCHED THEN INSERT (customer_id, customer_name, email, created_at)
                VALUES (s.customer_id, s.customer_name, s.email, CURRENT_TIMESTAMP());
            
            -- Update product catalog
            CREATE OR REPLACE TABLE raw.products AS 
            SELECT * FROM raw.products_staging;
            
            -- Clean up staging tables
            DROP TABLE raw.transactions_staging;
            DROP TABLE raw.customers_staging;
            DROP TABLE raw.products_staging;
        ''',
        statement_count=6,
        warehouse='PROCESSING_WH'
    )
    
    # Define dependencies
    [load_transactions, load_customers, load_products] >> validate_and_promote

Error Handling and Monitoring

from airflow.providers.snowflake.transfers.copy_into_snowflake import CopyFromExternalStageToSnowflakeOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeCheckOperator

# Data loading with comprehensive error handling
resilient_load = CopyFromExternalStageToSnowflakeOperator(
    task_id='resilient_data_load',
    snowflake_conn_id='snowflake_prod',
    table='staging.resilient_load',
    stage='@s3_variable_quality_stage',
    prefix='data/{{ ds }}/',
    file_format='csv_with_errors',
    warehouse='RESILIENT_WH',
    copy_options='''
        ON_ERROR=CONTINUE
        RETURN_FAILED_ONLY=TRUE
        MAX_FILE_SIZE=100000000
        SKIP_BLANK_LINES=TRUE
    ''',
    validation_mode='RETURN_ALL_ERRORS',
    # Airflow task retry configuration
    retries=3,
    retry_delay=timedelta(minutes=5)
)

# Post-load data quality check
quality_check = SnowflakeCheckOperator(
    task_id='verify_load_quality',
    snowflake_conn_id='snowflake_prod',
    sql='''
        SELECT 
            CASE 
                WHEN COUNT(*) > 0 AND 
                     COUNT(*) * 0.95 <= (SELECT COUNT(*) FROM staging.resilient_load WHERE error_column IS NULL)
                THEN TRUE 
                ELSE FALSE 
            END as quality_passed
        FROM staging.resilient_load
    ''',
    warehouse='ANALYTICS_WH'
)

resilient_load >> quality_check

File Format Support

The transfer operators support various file formats through Snowflake's file format objects:

CSV Files

CREATE FILE FORMAT csv_format
TYPE = 'CSV'
FIELD_DELIMITER = ','
RECORD_DELIMITER = '\n'
SKIP_HEADER = 1
FIELD_OPTIONALLY_ENCLOSED_BY = '"'
ESCAPE_UNENCLOSED_FIELD = NONE;

JSON Files

CREATE FILE FORMAT json_format
TYPE = 'JSON'
STRIP_OUTER_ARRAY = TRUE
DATE_FORMAT = 'YYYY-MM-DD'
TIMESTAMP_FORMAT = 'YYYY-MM-DD HH24:MI:SS';

Parquet Files

CREATE FILE FORMAT parquet_format
TYPE = 'PARQUET'
COMPRESSION = 'AUTO';

Copy Options

Common COPY INTO options supported:

  • ON_ERROR: Error handling (CONTINUE, SKIP_FILE, ABORT_STATEMENT)
  • SIZE_LIMIT: Maximum data size to load per file
  • FORCE: Force loading even if files were previously loaded
  • MATCH_BY_COLUMN_NAME: Match columns by name instead of position
  • ENFORCE_LENGTH: Enforce column length constraints
  • TRUNCATECOLUMNS: Truncate columns that exceed target length

Performance Optimization

Warehouse Sizing

  • Use larger warehouses (L, XL, 2XL) for bulk loading operations
  • Consider multi-cluster warehouses for concurrent loads
  • Auto-suspend warehouses after completion to control costs

File Organization

  • Organize files in uniform sizes (100-250MB optimal)
  • Use compressed formats when possible (GZIP, BROTLI)
  • Partition files by date or logical boundaries

Parallel Loading

  • Load from multiple stages or prefixes in parallel
  • Use separate tasks for independent table loads
  • Leverage Snowflake's automatic parallelization within single COPY operations

Monitoring and Troubleshooting

Load History Queries

-- Check recent COPY operations
SELECT *
FROM TABLE(INFORMATION_SCHEMA.COPY_HISTORY(
    TABLE_NAME => 'RAW.SALES_TRANSACTIONS',
    START_TIME => DATEADD(hour, -24, CURRENT_TIMESTAMP())
));

-- Check for load errors
SELECT *
FROM TABLE(INFORMATION_SCHEMA.LOAD_HISTORY(
    TABLE_NAME => 'RAW.SALES_TRANSACTIONS',
    START_TIME => DATEADD(hour, -24, CURRENT_TIMESTAMP())
))
WHERE STATUS = 'LOAD_FAILED';

Common Issues and Solutions

  • Authentication Errors: Verify stage credentials and permissions
  • File Format Mismatches: Validate file format definition against actual files
  • Schema Mismatches: Ensure target table structure matches source data
  • Permission Issues: Verify role has USAGE on stage and INSERT on target table

Error Handling

The transfer operators provide comprehensive error handling:

  • Stage Access Errors: Invalid credentials, missing permissions, network issues
  • File Format Errors: Schema mismatches, encoding issues, malformed data
  • Target Table Errors: Missing tables, permission issues, constraint violations
  • Resource Errors: Warehouse capacity limits, query complexity limits

All errors include detailed Snowflake error codes, file-specific information, and troubleshooting guidance.

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-snowflake

docs

hooks.md

index.md

operators.md

snowpark.md

transfers.md

triggers.md

utils.md

tile.json