Provider package apache-airflow-providers-snowflake for Apache Airflow
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Specialized operators for efficient bulk data loading from cloud storage services into Snowflake using COPY INTO operations. These operators provide optimized data ingestion capabilities with support for multiple cloud storage platforms, file formats, and advanced loading options.
Primary operator for loading data from external cloud storage stages (S3, GCS, Azure Blob) into Snowflake tables using COPY INTO commands with comprehensive configuration options.
class CopyFromExternalStageToSnowflakeOperator(BaseOperator):
"""
Execute COPY INTO command to load files from external stage to Snowflake.
Supports S3, GCS, and Azure Blob Storage with flexible file pattern matching
and comprehensive loading options.
"""
template_fields: Sequence[str] = ("files",)
template_fields_renderers = {"files": "json"}
def __init__(
self,
*,
files: list | None = None,
table: str,
stage: str,
prefix: str | None = None,
file_format: str,
schema: str | None = None,
columns_array: list | None = None,
pattern: str | None = None,
warehouse: str | None = None,
database: str | None = None,
autocommit: bool = True,
snowflake_conn_id: str = "snowflake_default",
role: str | None = None,
authenticator: str | None = None,
session_parameters: dict | None = None,
copy_options: str | None = None,
validation_mode: str | None = None,
**kwargs,
):
"""
Initialize external stage copy operator.
Parameters:
- files: List of specific files to copy (optional, can use pattern instead)
- table: Target Snowflake table name
- stage: External stage name (e.g., '@my_s3_stage')
- prefix: File path prefix within stage
- file_format: Named file format or inline format specification
- schema: Target schema name (optional, uses default if not specified)
- columns_array: List of column names for selective loading
- pattern: File pattern for matching files (alternative to files list)
- warehouse: Snowflake warehouse name
- database: Snowflake database name
- autocommit: Enable autocommit for the COPY operation
- snowflake_conn_id: Snowflake connection ID
- role: Snowflake role name
- authenticator: Authentication method
- session_parameters: Session-level parameters
- copy_options: Additional COPY INTO options (e.g., 'ON_ERROR=CONTINUE')
- validation_mode: Validation mode ('RETURN_ERRORS', 'RETURN_ALL_ERRORS')
"""def execute(self, context: Any) -> None:
"""
Execute the COPY INTO command to load data from external stage.
Parameters:
- context: Airflow task execution context
Returns:
Copy operation results and statistics
"""def get_openlineage_facets_on_complete(self, task_instance):
"""
Get OpenLineage facets after COPY operation completion.
Provides data lineage information for the copy operation.
Parameters:
- task_instance: Airflow TaskInstance object
Returns:
OpenLineage facets dictionary with lineage metadata
"""def _extract_openlineage_unique_dataset_paths(
self,
query_result: list[dict[str, Any]]
) -> tuple[list[tuple[str, str]], list[str]]:
"""
Extract unique dataset paths for OpenLineage tracking.
Parameters:
- query_result: COPY command result data
Returns:
Tuple of (dataset_paths, file_paths) for lineage tracking
"""def _validate_parameter(param_name: str, value: str | None) -> str | None:
"""
Validate parameter to ensure it doesn't contain invalid patterns.
Prevents SQL injection by checking for semicolons and other dangerous patterns.
Parameters:
- param_name: Name of parameter being validated
- value: Parameter value to validate
Returns:
Validated parameter value or None
Raises:
ValueError: If parameter contains invalid patterns
"""from airflow import DAG
from airflow.providers.snowflake.transfers.copy_into_snowflake import CopyFromExternalStageToSnowflakeOperator
from datetime import datetime
with DAG(
'data_ingestion_pipeline',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily',
catchup=False
) as dag:
# Copy CSV files from S3 to Snowflake
load_sales_data = CopyFromExternalStageToSnowflakeOperator(
task_id='load_daily_sales',
snowflake_conn_id='snowflake_prod',
table='raw.sales_transactions',
stage='@s3_data_stage',
prefix='sales/daily/{{ ds }}/',
file_format='csv_format',
warehouse='LOADING_WH',
database='RAW_DATA',
schema='PUBLIC',
copy_options='ON_ERROR=CONTINUE FORCE=TRUE',
autocommit=True
)from airflow.providers.snowflake.transfers.copy_into_snowflake import CopyFromExternalStageToSnowflakeOperator
# Load files matching specific pattern
load_with_pattern = CopyFromExternalStageToSnowflakeOperator(
task_id='load_json_files',
snowflake_conn_id='snowflake_prod',
table='staging.json_events',
stage='@gcs_events_stage',
pattern='events/year={{ macros.ds_format(ds, "%Y-%m-%d", "%Y") }}/month={{ macros.ds_format(ds, "%Y-%m-%d", "%m") }}/.*\\.json',
file_format='json_format',
warehouse='ETL_WH',
database='STAGING',
copy_options='MATCH_BY_COLUMN_NAME=CASE_INSENSITIVE'
)from airflow.providers.snowflake.transfers.copy_into_snowflake import CopyFromExternalStageToSnowflakeOperator
# Load specific files by name
load_specific_files = CopyFromExternalStageToSnowflakeOperator(
task_id='load_critical_files',
snowflake_conn_id='snowflake_prod',
table='critical.financial_data',
stage='@azure_secure_stage',
files=[
'financial/transactions/{{ ds }}/morning_batch.parquet',
'financial/transactions/{{ ds }}/evening_batch.parquet',
'financial/reconciliation/{{ ds }}/daily_summary.parquet'
],
file_format='parquet_format',
warehouse='SECURE_WH',
database='FINANCIAL',
schema='SECURE',
copy_options='ON_ERROR=ABORT_STATEMENT',
validation_mode='RETURN_ERRORS'
)from airflow.providers.snowflake.transfers.copy_into_snowflake import CopyFromExternalStageToSnowflakeOperator
# Load only specific columns from source files
load_partial_columns = CopyFromExternalStageToSnowflakeOperator(
task_id='load_customer_subset',
snowflake_conn_id='snowflake_prod',
table='analytics.customer_subset',
stage='@s3_customer_stage',
prefix='customers/export/{{ ds }}/',
file_format='csv_with_header',
columns_array=[
'customer_id',
'customer_name',
'email',
'registration_date',
'lifetime_value'
],
warehouse='ANALYTICS_WH',
database='ANALYTICS',
copy_options='SKIP_HEADER=1 FIELD_OPTIONALLY_ENCLOSED_BY=\\"'
)from airflow.providers.snowflake.transfers.copy_into_snowflake import CopyFromExternalStageToSnowflakeOperator
# Advanced configuration with error handling and transformation
advanced_load = CopyFromExternalStageToSnowflakeOperator(
task_id='advanced_data_load',
snowflake_conn_id='snowflake_prod',
table='staging.raw_events',
stage='@s3_events_stage',
prefix='events/{{ ds }}/',
file_format='json_auto_detect',
warehouse='HEAVY_LOADING_WH',
database='STAGING',
schema='RAW',
copy_options='''
ON_ERROR=CONTINUE
SIZE_LIMIT=1000000000
RETURN_FAILED_ONLY=TRUE
MATCH_BY_COLUMN_NAME=CASE_INSENSITIVE
ENFORCE_LENGTH=FALSE
TRUNCATECOLUMNS=TRUE
''',
validation_mode='RETURN_ALL_ERRORS',
session_parameters={
'QUERY_TAG': 'airflow_bulk_load_{{ ds }}',
'MULTI_STATEMENT_COUNT': 1
}
)from airflow import DAG
from airflow.providers.snowflake.transfers.copy_into_snowflake import CopyFromExternalStageToSnowflakeOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeSqlApiOperator
with DAG(
'multi_stage_ingestion',
start_date=datetime(2024, 1, 1),
schedule_interval='@hourly',
catchup=False
) as dag:
# Stage 1: Load raw transaction data
load_transactions = CopyFromExternalStageToSnowflakeOperator(
task_id='load_raw_transactions',
table='raw.transactions_staging',
stage='@s3_transaction_stage',
prefix='transactions/{{ ds }}/{{ format_datetime(ts, "%H") }}/',
file_format='csv_transactions',
warehouse='LOADING_WH'
)
# Stage 2: Load customer reference data
load_customers = CopyFromExternalStageToSnowflakeOperator(
task_id='load_customer_data',
table='raw.customers_staging',
stage='@s3_reference_stage',
prefix='customers/daily/{{ ds }}/',
file_format='json_customers',
warehouse='LOADING_WH',
copy_options='ON_ERROR=ABORT_STATEMENT' # Strict loading for reference data
)
# Stage 3: Load product catalog
load_products = CopyFromExternalStageToSnowflakeOperator(
task_id='load_product_catalog',
table='raw.products_staging',
stage='@s3_catalog_stage',
pattern='catalog/products_{{ ds }}\\.parquet',
file_format='parquet_products',
warehouse='LOADING_WH'
)
# Stage 4: Data quality validation and promotion
validate_and_promote = SnowflakeSqlApiOperator(
task_id='validate_and_promote_data',
sql='''
-- Validate transaction data
CREATE OR REPLACE TABLE raw.transactions AS
SELECT * FROM raw.transactions_staging
WHERE customer_id IS NOT NULL
AND transaction_amount > 0
AND transaction_date = '{{ ds }}';
-- Validate and merge customer data
MERGE INTO raw.customers c
USING raw.customers_staging s ON c.customer_id = s.customer_id
WHEN MATCHED THEN UPDATE SET
customer_name = s.customer_name,
email = s.email,
updated_at = CURRENT_TIMESTAMP()
WHEN NOT MATCHED THEN INSERT (customer_id, customer_name, email, created_at)
VALUES (s.customer_id, s.customer_name, s.email, CURRENT_TIMESTAMP());
-- Update product catalog
CREATE OR REPLACE TABLE raw.products AS
SELECT * FROM raw.products_staging;
-- Clean up staging tables
DROP TABLE raw.transactions_staging;
DROP TABLE raw.customers_staging;
DROP TABLE raw.products_staging;
''',
statement_count=6,
warehouse='PROCESSING_WH'
)
# Define dependencies
[load_transactions, load_customers, load_products] >> validate_and_promotefrom airflow.providers.snowflake.transfers.copy_into_snowflake import CopyFromExternalStageToSnowflakeOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeCheckOperator
# Data loading with comprehensive error handling
resilient_load = CopyFromExternalStageToSnowflakeOperator(
task_id='resilient_data_load',
snowflake_conn_id='snowflake_prod',
table='staging.resilient_load',
stage='@s3_variable_quality_stage',
prefix='data/{{ ds }}/',
file_format='csv_with_errors',
warehouse='RESILIENT_WH',
copy_options='''
ON_ERROR=CONTINUE
RETURN_FAILED_ONLY=TRUE
MAX_FILE_SIZE=100000000
SKIP_BLANK_LINES=TRUE
''',
validation_mode='RETURN_ALL_ERRORS',
# Airflow task retry configuration
retries=3,
retry_delay=timedelta(minutes=5)
)
# Post-load data quality check
quality_check = SnowflakeCheckOperator(
task_id='verify_load_quality',
snowflake_conn_id='snowflake_prod',
sql='''
SELECT
CASE
WHEN COUNT(*) > 0 AND
COUNT(*) * 0.95 <= (SELECT COUNT(*) FROM staging.resilient_load WHERE error_column IS NULL)
THEN TRUE
ELSE FALSE
END as quality_passed
FROM staging.resilient_load
''',
warehouse='ANALYTICS_WH'
)
resilient_load >> quality_checkThe transfer operators support various file formats through Snowflake's file format objects:
CREATE FILE FORMAT csv_format
TYPE = 'CSV'
FIELD_DELIMITER = ','
RECORD_DELIMITER = '\n'
SKIP_HEADER = 1
FIELD_OPTIONALLY_ENCLOSED_BY = '"'
ESCAPE_UNENCLOSED_FIELD = NONE;CREATE FILE FORMAT json_format
TYPE = 'JSON'
STRIP_OUTER_ARRAY = TRUE
DATE_FORMAT = 'YYYY-MM-DD'
TIMESTAMP_FORMAT = 'YYYY-MM-DD HH24:MI:SS';CREATE FILE FORMAT parquet_format
TYPE = 'PARQUET'
COMPRESSION = 'AUTO';Common COPY INTO options supported:
ON_ERROR: Error handling (CONTINUE, SKIP_FILE, ABORT_STATEMENT)SIZE_LIMIT: Maximum data size to load per fileFORCE: Force loading even if files were previously loadedMATCH_BY_COLUMN_NAME: Match columns by name instead of positionENFORCE_LENGTH: Enforce column length constraintsTRUNCATECOLUMNS: Truncate columns that exceed target length-- Check recent COPY operations
SELECT *
FROM TABLE(INFORMATION_SCHEMA.COPY_HISTORY(
TABLE_NAME => 'RAW.SALES_TRANSACTIONS',
START_TIME => DATEADD(hour, -24, CURRENT_TIMESTAMP())
));
-- Check for load errors
SELECT *
FROM TABLE(INFORMATION_SCHEMA.LOAD_HISTORY(
TABLE_NAME => 'RAW.SALES_TRANSACTIONS',
START_TIME => DATEADD(hour, -24, CURRENT_TIMESTAMP())
))
WHERE STATUS = 'LOAD_FAILED';The transfer operators provide comprehensive error handling:
All errors include detailed Snowflake error codes, file-specific information, and troubleshooting guidance.
Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-snowflake