0
# Data Transfer Operations
1
2
Specialized operators for efficient bulk data loading from cloud storage services into Snowflake using COPY INTO operations. These operators provide optimized data ingestion capabilities with support for multiple cloud storage platforms, file formats, and advanced loading options.
3
4
## Capabilities
5
6
### External Stage Copy Operator
7
8
Primary operator for loading data from external cloud storage stages (S3, GCS, Azure Blob) into Snowflake tables using COPY INTO commands with comprehensive configuration options.
9
10
```python { .api }
11
class CopyFromExternalStageToSnowflakeOperator(BaseOperator):
12
"""
13
Execute COPY INTO command to load files from external stage to Snowflake.
14
Supports S3, GCS, and Azure Blob Storage with flexible file pattern matching
15
and comprehensive loading options.
16
"""
17
18
template_fields: Sequence[str] = ("files",)
19
template_fields_renderers = {"files": "json"}
20
21
def __init__(
22
self,
23
*,
24
files: list | None = None,
25
table: str,
26
stage: str,
27
prefix: str | None = None,
28
file_format: str,
29
schema: str | None = None,
30
columns_array: list | None = None,
31
pattern: str | None = None,
32
warehouse: str | None = None,
33
database: str | None = None,
34
autocommit: bool = True,
35
snowflake_conn_id: str = "snowflake_default",
36
role: str | None = None,
37
authenticator: str | None = None,
38
session_parameters: dict | None = None,
39
copy_options: str | None = None,
40
validation_mode: str | None = None,
41
**kwargs,
42
):
43
"""
44
Initialize external stage copy operator.
45
46
Parameters:
47
- files: List of specific files to copy (optional, can use pattern instead)
48
- table: Target Snowflake table name
49
- stage: External stage name (e.g., '@my_s3_stage')
50
- prefix: File path prefix within stage
51
- file_format: Named file format or inline format specification
52
- schema: Target schema name (optional, uses default if not specified)
53
- columns_array: List of column names for selective loading
54
- pattern: File pattern for matching files (alternative to files list)
55
- warehouse: Snowflake warehouse name
56
- database: Snowflake database name
57
- autocommit: Enable autocommit for the COPY operation
58
- snowflake_conn_id: Snowflake connection ID
59
- role: Snowflake role name
60
- authenticator: Authentication method
61
- session_parameters: Session-level parameters
62
- copy_options: Additional COPY INTO options (e.g., 'ON_ERROR=CONTINUE')
63
- validation_mode: Validation mode ('RETURN_ERRORS', 'RETURN_ALL_ERRORS')
64
"""
65
```
66
67
#### Execution Method
68
69
```python { .api }
70
def execute(self, context: Any) -> None:
71
"""
72
Execute the COPY INTO command to load data from external stage.
73
74
Parameters:
75
- context: Airflow task execution context
76
77
Returns:
78
Copy operation results and statistics
79
"""
80
```
81
82
#### OpenLineage Integration
83
84
```python { .api }
85
def get_openlineage_facets_on_complete(self, task_instance):
86
"""
87
Get OpenLineage facets after COPY operation completion.
88
Provides data lineage information for the copy operation.
89
90
Parameters:
91
- task_instance: Airflow TaskInstance object
92
93
Returns:
94
OpenLineage facets dictionary with lineage metadata
95
"""
96
```
97
98
#### Internal Methods
99
100
```python { .api }
101
def _extract_openlineage_unique_dataset_paths(
102
self,
103
query_result: list[dict[str, Any]]
104
) -> tuple[list[tuple[str, str]], list[str]]:
105
"""
106
Extract unique dataset paths for OpenLineage tracking.
107
108
Parameters:
109
- query_result: COPY command result data
110
111
Returns:
112
Tuple of (dataset_paths, file_paths) for lineage tracking
113
"""
114
```
115
116
### Validation Function
117
118
```python { .api }
119
def _validate_parameter(param_name: str, value: str | None) -> str | None:
120
"""
121
Validate parameter to ensure it doesn't contain invalid patterns.
122
Prevents SQL injection by checking for semicolons and other dangerous patterns.
123
124
Parameters:
125
- param_name: Name of parameter being validated
126
- value: Parameter value to validate
127
128
Returns:
129
Validated parameter value or None
130
131
Raises:
132
ValueError: If parameter contains invalid patterns
133
"""
134
```
135
136
## Usage Examples
137
138
### Basic File Copy from S3
139
140
```python
141
from airflow import DAG
142
from airflow.providers.snowflake.transfers.copy_into_snowflake import CopyFromExternalStageToSnowflakeOperator
143
from datetime import datetime
144
145
with DAG(
146
'data_ingestion_pipeline',
147
start_date=datetime(2024, 1, 1),
148
schedule_interval='@daily',
149
catchup=False
150
) as dag:
151
152
# Copy CSV files from S3 to Snowflake
153
load_sales_data = CopyFromExternalStageToSnowflakeOperator(
154
task_id='load_daily_sales',
155
snowflake_conn_id='snowflake_prod',
156
table='raw.sales_transactions',
157
stage='@s3_data_stage',
158
prefix='sales/daily/{{ ds }}/',
159
file_format='csv_format',
160
warehouse='LOADING_WH',
161
database='RAW_DATA',
162
schema='PUBLIC',
163
copy_options='ON_ERROR=CONTINUE FORCE=TRUE',
164
autocommit=True
165
)
166
```
167
168
### Pattern-Based File Loading
169
170
```python
171
from airflow.providers.snowflake.transfers.copy_into_snowflake import CopyFromExternalStageToSnowflakeOperator
172
173
# Load files matching specific pattern
174
load_with_pattern = CopyFromExternalStageToSnowflakeOperator(
175
task_id='load_json_files',
176
snowflake_conn_id='snowflake_prod',
177
table='staging.json_events',
178
stage='@gcs_events_stage',
179
pattern='events/year={{ macros.ds_format(ds, "%Y-%m-%d", "%Y") }}/month={{ macros.ds_format(ds, "%Y-%m-%d", "%m") }}/.*\\.json',
180
file_format='json_format',
181
warehouse='ETL_WH',
182
database='STAGING',
183
copy_options='MATCH_BY_COLUMN_NAME=CASE_INSENSITIVE'
184
)
185
```
186
187
### Specific File List Loading
188
189
```python
190
from airflow.providers.snowflake.transfers.copy_into_snowflake import CopyFromExternalStageToSnowflakeOperator
191
192
# Load specific files by name
193
load_specific_files = CopyFromExternalStageToSnowflakeOperator(
194
task_id='load_critical_files',
195
snowflake_conn_id='snowflake_prod',
196
table='critical.financial_data',
197
stage='@azure_secure_stage',
198
files=[
199
'financial/transactions/{{ ds }}/morning_batch.parquet',
200
'financial/transactions/{{ ds }}/evening_batch.parquet',
201
'financial/reconciliation/{{ ds }}/daily_summary.parquet'
202
],
203
file_format='parquet_format',
204
warehouse='SECURE_WH',
205
database='FINANCIAL',
206
schema='SECURE',
207
copy_options='ON_ERROR=ABORT_STATEMENT',
208
validation_mode='RETURN_ERRORS'
209
)
210
```
211
212
### Selective Column Loading
213
214
```python
215
from airflow.providers.snowflake.transfers.copy_into_snowflake import CopyFromExternalStageToSnowflakeOperator
216
217
# Load only specific columns from source files
218
load_partial_columns = CopyFromExternalStageToSnowflakeOperator(
219
task_id='load_customer_subset',
220
snowflake_conn_id='snowflake_prod',
221
table='analytics.customer_subset',
222
stage='@s3_customer_stage',
223
prefix='customers/export/{{ ds }}/',
224
file_format='csv_with_header',
225
columns_array=[
226
'customer_id',
227
'customer_name',
228
'email',
229
'registration_date',
230
'lifetime_value'
231
],
232
warehouse='ANALYTICS_WH',
233
database='ANALYTICS',
234
copy_options='SKIP_HEADER=1 FIELD_OPTIONALLY_ENCLOSED_BY=\\"'
235
)
236
```
237
238
### Advanced Loading with Custom Options
239
240
```python
241
from airflow.providers.snowflake.transfers.copy_into_snowflake import CopyFromExternalStageToSnowflakeOperator
242
243
# Advanced configuration with error handling and transformation
244
advanced_load = CopyFromExternalStageToSnowflakeOperator(
245
task_id='advanced_data_load',
246
snowflake_conn_id='snowflake_prod',
247
table='staging.raw_events',
248
stage='@s3_events_stage',
249
prefix='events/{{ ds }}/',
250
file_format='json_auto_detect',
251
warehouse='HEAVY_LOADING_WH',
252
database='STAGING',
253
schema='RAW',
254
copy_options='''
255
ON_ERROR=CONTINUE
256
SIZE_LIMIT=1000000000
257
RETURN_FAILED_ONLY=TRUE
258
MATCH_BY_COLUMN_NAME=CASE_INSENSITIVE
259
ENFORCE_LENGTH=FALSE
260
TRUNCATECOLUMNS=TRUE
261
''',
262
validation_mode='RETURN_ALL_ERRORS',
263
session_parameters={
264
'QUERY_TAG': 'airflow_bulk_load_{{ ds }}',
265
'MULTI_STATEMENT_COUNT': 1
266
}
267
)
268
```
269
270
### Multi-Stage Loading Pipeline
271
272
```python
273
from airflow import DAG
274
from airflow.providers.snowflake.transfers.copy_into_snowflake import CopyFromExternalStageToSnowflakeOperator
275
from airflow.providers.snowflake.operators.snowflake import SnowflakeSqlApiOperator
276
277
with DAG(
278
'multi_stage_ingestion',
279
start_date=datetime(2024, 1, 1),
280
schedule_interval='@hourly',
281
catchup=False
282
) as dag:
283
284
# Stage 1: Load raw transaction data
285
load_transactions = CopyFromExternalStageToSnowflakeOperator(
286
task_id='load_raw_transactions',
287
table='raw.transactions_staging',
288
stage='@s3_transaction_stage',
289
prefix='transactions/{{ ds }}/{{ format_datetime(ts, "%H") }}/',
290
file_format='csv_transactions',
291
warehouse='LOADING_WH'
292
)
293
294
# Stage 2: Load customer reference data
295
load_customers = CopyFromExternalStageToSnowflakeOperator(
296
task_id='load_customer_data',
297
table='raw.customers_staging',
298
stage='@s3_reference_stage',
299
prefix='customers/daily/{{ ds }}/',
300
file_format='json_customers',
301
warehouse='LOADING_WH',
302
copy_options='ON_ERROR=ABORT_STATEMENT' # Strict loading for reference data
303
)
304
305
# Stage 3: Load product catalog
306
load_products = CopyFromExternalStageToSnowflakeOperator(
307
task_id='load_product_catalog',
308
table='raw.products_staging',
309
stage='@s3_catalog_stage',
310
pattern='catalog/products_{{ ds }}\\.parquet',
311
file_format='parquet_products',
312
warehouse='LOADING_WH'
313
)
314
315
# Stage 4: Data quality validation and promotion
316
validate_and_promote = SnowflakeSqlApiOperator(
317
task_id='validate_and_promote_data',
318
sql='''
319
-- Validate transaction data
320
CREATE OR REPLACE TABLE raw.transactions AS
321
SELECT * FROM raw.transactions_staging
322
WHERE customer_id IS NOT NULL
323
AND transaction_amount > 0
324
AND transaction_date = '{{ ds }}';
325
326
-- Validate and merge customer data
327
MERGE INTO raw.customers c
328
USING raw.customers_staging s ON c.customer_id = s.customer_id
329
WHEN MATCHED THEN UPDATE SET
330
customer_name = s.customer_name,
331
email = s.email,
332
updated_at = CURRENT_TIMESTAMP()
333
WHEN NOT MATCHED THEN INSERT (customer_id, customer_name, email, created_at)
334
VALUES (s.customer_id, s.customer_name, s.email, CURRENT_TIMESTAMP());
335
336
-- Update product catalog
337
CREATE OR REPLACE TABLE raw.products AS
338
SELECT * FROM raw.products_staging;
339
340
-- Clean up staging tables
341
DROP TABLE raw.transactions_staging;
342
DROP TABLE raw.customers_staging;
343
DROP TABLE raw.products_staging;
344
''',
345
statement_count=6,
346
warehouse='PROCESSING_WH'
347
)
348
349
# Define dependencies
350
[load_transactions, load_customers, load_products] >> validate_and_promote
351
```
352
353
### Error Handling and Monitoring
354
355
```python
356
from airflow.providers.snowflake.transfers.copy_into_snowflake import CopyFromExternalStageToSnowflakeOperator
357
from airflow.providers.snowflake.operators.snowflake import SnowflakeCheckOperator
358
359
# Data loading with comprehensive error handling
360
resilient_load = CopyFromExternalStageToSnowflakeOperator(
361
task_id='resilient_data_load',
362
snowflake_conn_id='snowflake_prod',
363
table='staging.resilient_load',
364
stage='@s3_variable_quality_stage',
365
prefix='data/{{ ds }}/',
366
file_format='csv_with_errors',
367
warehouse='RESILIENT_WH',
368
copy_options='''
369
ON_ERROR=CONTINUE
370
RETURN_FAILED_ONLY=TRUE
371
MAX_FILE_SIZE=100000000
372
SKIP_BLANK_LINES=TRUE
373
''',
374
validation_mode='RETURN_ALL_ERRORS',
375
# Airflow task retry configuration
376
retries=3,
377
retry_delay=timedelta(minutes=5)
378
)
379
380
# Post-load data quality check
381
quality_check = SnowflakeCheckOperator(
382
task_id='verify_load_quality',
383
snowflake_conn_id='snowflake_prod',
384
sql='''
385
SELECT
386
CASE
387
WHEN COUNT(*) > 0 AND
388
COUNT(*) * 0.95 <= (SELECT COUNT(*) FROM staging.resilient_load WHERE error_column IS NULL)
389
THEN TRUE
390
ELSE FALSE
391
END as quality_passed
392
FROM staging.resilient_load
393
''',
394
warehouse='ANALYTICS_WH'
395
)
396
397
resilient_load >> quality_check
398
```
399
400
## File Format Support
401
402
The transfer operators support various file formats through Snowflake's file format objects:
403
404
### CSV Files
405
```sql
406
CREATE FILE FORMAT csv_format
407
TYPE = 'CSV'
408
FIELD_DELIMITER = ','
409
RECORD_DELIMITER = '\n'
410
SKIP_HEADER = 1
411
FIELD_OPTIONALLY_ENCLOSED_BY = '"'
412
ESCAPE_UNENCLOSED_FIELD = NONE;
413
```
414
415
### JSON Files
416
```sql
417
CREATE FILE FORMAT json_format
418
TYPE = 'JSON'
419
STRIP_OUTER_ARRAY = TRUE
420
DATE_FORMAT = 'YYYY-MM-DD'
421
TIMESTAMP_FORMAT = 'YYYY-MM-DD HH24:MI:SS';
422
```
423
424
### Parquet Files
425
```sql
426
CREATE FILE FORMAT parquet_format
427
TYPE = 'PARQUET'
428
COMPRESSION = 'AUTO';
429
```
430
431
## Copy Options
432
433
Common COPY INTO options supported:
434
435
- `ON_ERROR`: Error handling (`CONTINUE`, `SKIP_FILE`, `ABORT_STATEMENT`)
436
- `SIZE_LIMIT`: Maximum data size to load per file
437
- `FORCE`: Force loading even if files were previously loaded
438
- `MATCH_BY_COLUMN_NAME`: Match columns by name instead of position
439
- `ENFORCE_LENGTH`: Enforce column length constraints
440
- `TRUNCATECOLUMNS`: Truncate columns that exceed target length
441
442
## Performance Optimization
443
444
### Warehouse Sizing
445
- Use larger warehouses (L, XL, 2XL) for bulk loading operations
446
- Consider multi-cluster warehouses for concurrent loads
447
- Auto-suspend warehouses after completion to control costs
448
449
### File Organization
450
- Organize files in uniform sizes (100-250MB optimal)
451
- Use compressed formats when possible (GZIP, BROTLI)
452
- Partition files by date or logical boundaries
453
454
### Parallel Loading
455
- Load from multiple stages or prefixes in parallel
456
- Use separate tasks for independent table loads
457
- Leverage Snowflake's automatic parallelization within single COPY operations
458
459
## Monitoring and Troubleshooting
460
461
### Load History Queries
462
```sql
463
-- Check recent COPY operations
464
SELECT *
465
FROM TABLE(INFORMATION_SCHEMA.COPY_HISTORY(
466
TABLE_NAME => 'RAW.SALES_TRANSACTIONS',
467
START_TIME => DATEADD(hour, -24, CURRENT_TIMESTAMP())
468
));
469
470
-- Check for load errors
471
SELECT *
472
FROM TABLE(INFORMATION_SCHEMA.LOAD_HISTORY(
473
TABLE_NAME => 'RAW.SALES_TRANSACTIONS',
474
START_TIME => DATEADD(hour, -24, CURRENT_TIMESTAMP())
475
))
476
WHERE STATUS = 'LOAD_FAILED';
477
```
478
479
### Common Issues and Solutions
480
- **Authentication Errors**: Verify stage credentials and permissions
481
- **File Format Mismatches**: Validate file format definition against actual files
482
- **Schema Mismatches**: Ensure target table structure matches source data
483
- **Permission Issues**: Verify role has USAGE on stage and INSERT on target table
484
485
## Error Handling
486
487
The transfer operators provide comprehensive error handling:
488
489
- **Stage Access Errors**: Invalid credentials, missing permissions, network issues
490
- **File Format Errors**: Schema mismatches, encoding issues, malformed data
491
- **Target Table Errors**: Missing tables, permission issues, constraint violations
492
- **Resource Errors**: Warehouse capacity limits, query complexity limits
493
494
All errors include detailed Snowflake error codes, file-specific information, and troubleshooting guidance.