or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

hooks.mdindex.mdoperators.mdsnowpark.mdtransfers.mdtriggers.mdutils.md

transfers.mddocs/

0

# Data Transfer Operations

1

2

Specialized operators for efficient bulk data loading from cloud storage services into Snowflake using COPY INTO operations. These operators provide optimized data ingestion capabilities with support for multiple cloud storage platforms, file formats, and advanced loading options.

3

4

## Capabilities

5

6

### External Stage Copy Operator

7

8

Primary operator for loading data from external cloud storage stages (S3, GCS, Azure Blob) into Snowflake tables using COPY INTO commands with comprehensive configuration options.

9

10

```python { .api }

11

class CopyFromExternalStageToSnowflakeOperator(BaseOperator):

12

"""

13

Execute COPY INTO command to load files from external stage to Snowflake.

14

Supports S3, GCS, and Azure Blob Storage with flexible file pattern matching

15

and comprehensive loading options.

16

"""

17

18

template_fields: Sequence[str] = ("files",)

19

template_fields_renderers = {"files": "json"}

20

21

def __init__(

22

self,

23

*,

24

files: list | None = None,

25

table: str,

26

stage: str,

27

prefix: str | None = None,

28

file_format: str,

29

schema: str | None = None,

30

columns_array: list | None = None,

31

pattern: str | None = None,

32

warehouse: str | None = None,

33

database: str | None = None,

34

autocommit: bool = True,

35

snowflake_conn_id: str = "snowflake_default",

36

role: str | None = None,

37

authenticator: str | None = None,

38

session_parameters: dict | None = None,

39

copy_options: str | None = None,

40

validation_mode: str | None = None,

41

**kwargs,

42

):

43

"""

44

Initialize external stage copy operator.

45

46

Parameters:

47

- files: List of specific files to copy (optional, can use pattern instead)

48

- table: Target Snowflake table name

49

- stage: External stage name (e.g., '@my_s3_stage')

50

- prefix: File path prefix within stage

51

- file_format: Named file format or inline format specification

52

- schema: Target schema name (optional, uses default if not specified)

53

- columns_array: List of column names for selective loading

54

- pattern: File pattern for matching files (alternative to files list)

55

- warehouse: Snowflake warehouse name

56

- database: Snowflake database name

57

- autocommit: Enable autocommit for the COPY operation

58

- snowflake_conn_id: Snowflake connection ID

59

- role: Snowflake role name

60

- authenticator: Authentication method

61

- session_parameters: Session-level parameters

62

- copy_options: Additional COPY INTO options (e.g., 'ON_ERROR=CONTINUE')

63

- validation_mode: Validation mode ('RETURN_ERRORS', 'RETURN_ALL_ERRORS')

64

"""

65

```

66

67

#### Execution Method

68

69

```python { .api }

70

def execute(self, context: Any) -> None:

71

"""

72

Execute the COPY INTO command to load data from external stage.

73

74

Parameters:

75

- context: Airflow task execution context

76

77

Returns:

78

Copy operation results and statistics

79

"""

80

```

81

82

#### OpenLineage Integration

83

84

```python { .api }

85

def get_openlineage_facets_on_complete(self, task_instance):

86

"""

87

Get OpenLineage facets after COPY operation completion.

88

Provides data lineage information for the copy operation.

89

90

Parameters:

91

- task_instance: Airflow TaskInstance object

92

93

Returns:

94

OpenLineage facets dictionary with lineage metadata

95

"""

96

```

97

98

#### Internal Methods

99

100

```python { .api }

101

def _extract_openlineage_unique_dataset_paths(

102

self,

103

query_result: list[dict[str, Any]]

104

) -> tuple[list[tuple[str, str]], list[str]]:

105

"""

106

Extract unique dataset paths for OpenLineage tracking.

107

108

Parameters:

109

- query_result: COPY command result data

110

111

Returns:

112

Tuple of (dataset_paths, file_paths) for lineage tracking

113

"""

114

```

115

116

### Validation Function

117

118

```python { .api }

119

def _validate_parameter(param_name: str, value: str | None) -> str | None:

120

"""

121

Validate parameter to ensure it doesn't contain invalid patterns.

122

Prevents SQL injection by checking for semicolons and other dangerous patterns.

123

124

Parameters:

125

- param_name: Name of parameter being validated

126

- value: Parameter value to validate

127

128

Returns:

129

Validated parameter value or None

130

131

Raises:

132

ValueError: If parameter contains invalid patterns

133

"""

134

```

135

136

## Usage Examples

137

138

### Basic File Copy from S3

139

140

```python

141

from airflow import DAG

142

from airflow.providers.snowflake.transfers.copy_into_snowflake import CopyFromExternalStageToSnowflakeOperator

143

from datetime import datetime

144

145

with DAG(

146

'data_ingestion_pipeline',

147

start_date=datetime(2024, 1, 1),

148

schedule_interval='@daily',

149

catchup=False

150

) as dag:

151

152

# Copy CSV files from S3 to Snowflake

153

load_sales_data = CopyFromExternalStageToSnowflakeOperator(

154

task_id='load_daily_sales',

155

snowflake_conn_id='snowflake_prod',

156

table='raw.sales_transactions',

157

stage='@s3_data_stage',

158

prefix='sales/daily/{{ ds }}/',

159

file_format='csv_format',

160

warehouse='LOADING_WH',

161

database='RAW_DATA',

162

schema='PUBLIC',

163

copy_options='ON_ERROR=CONTINUE FORCE=TRUE',

164

autocommit=True

165

)

166

```

167

168

### Pattern-Based File Loading

169

170

```python

171

from airflow.providers.snowflake.transfers.copy_into_snowflake import CopyFromExternalStageToSnowflakeOperator

172

173

# Load files matching specific pattern

174

load_with_pattern = CopyFromExternalStageToSnowflakeOperator(

175

task_id='load_json_files',

176

snowflake_conn_id='snowflake_prod',

177

table='staging.json_events',

178

stage='@gcs_events_stage',

179

pattern='events/year={{ macros.ds_format(ds, "%Y-%m-%d", "%Y") }}/month={{ macros.ds_format(ds, "%Y-%m-%d", "%m") }}/.*\\.json',

180

file_format='json_format',

181

warehouse='ETL_WH',

182

database='STAGING',

183

copy_options='MATCH_BY_COLUMN_NAME=CASE_INSENSITIVE'

184

)

185

```

186

187

### Specific File List Loading

188

189

```python

190

from airflow.providers.snowflake.transfers.copy_into_snowflake import CopyFromExternalStageToSnowflakeOperator

191

192

# Load specific files by name

193

load_specific_files = CopyFromExternalStageToSnowflakeOperator(

194

task_id='load_critical_files',

195

snowflake_conn_id='snowflake_prod',

196

table='critical.financial_data',

197

stage='@azure_secure_stage',

198

files=[

199

'financial/transactions/{{ ds }}/morning_batch.parquet',

200

'financial/transactions/{{ ds }}/evening_batch.parquet',

201

'financial/reconciliation/{{ ds }}/daily_summary.parquet'

202

],

203

file_format='parquet_format',

204

warehouse='SECURE_WH',

205

database='FINANCIAL',

206

schema='SECURE',

207

copy_options='ON_ERROR=ABORT_STATEMENT',

208

validation_mode='RETURN_ERRORS'

209

)

210

```

211

212

### Selective Column Loading

213

214

```python

215

from airflow.providers.snowflake.transfers.copy_into_snowflake import CopyFromExternalStageToSnowflakeOperator

216

217

# Load only specific columns from source files

218

load_partial_columns = CopyFromExternalStageToSnowflakeOperator(

219

task_id='load_customer_subset',

220

snowflake_conn_id='snowflake_prod',

221

table='analytics.customer_subset',

222

stage='@s3_customer_stage',

223

prefix='customers/export/{{ ds }}/',

224

file_format='csv_with_header',

225

columns_array=[

226

'customer_id',

227

'customer_name',

228

'email',

229

'registration_date',

230

'lifetime_value'

231

],

232

warehouse='ANALYTICS_WH',

233

database='ANALYTICS',

234

copy_options='SKIP_HEADER=1 FIELD_OPTIONALLY_ENCLOSED_BY=\\"'

235

)

236

```

237

238

### Advanced Loading with Custom Options

239

240

```python

241

from airflow.providers.snowflake.transfers.copy_into_snowflake import CopyFromExternalStageToSnowflakeOperator

242

243

# Advanced configuration with error handling and transformation

244

advanced_load = CopyFromExternalStageToSnowflakeOperator(

245

task_id='advanced_data_load',

246

snowflake_conn_id='snowflake_prod',

247

table='staging.raw_events',

248

stage='@s3_events_stage',

249

prefix='events/{{ ds }}/',

250

file_format='json_auto_detect',

251

warehouse='HEAVY_LOADING_WH',

252

database='STAGING',

253

schema='RAW',

254

copy_options='''

255

ON_ERROR=CONTINUE

256

SIZE_LIMIT=1000000000

257

RETURN_FAILED_ONLY=TRUE

258

MATCH_BY_COLUMN_NAME=CASE_INSENSITIVE

259

ENFORCE_LENGTH=FALSE

260

TRUNCATECOLUMNS=TRUE

261

''',

262

validation_mode='RETURN_ALL_ERRORS',

263

session_parameters={

264

'QUERY_TAG': 'airflow_bulk_load_{{ ds }}',

265

'MULTI_STATEMENT_COUNT': 1

266

}

267

)

268

```

269

270

### Multi-Stage Loading Pipeline

271

272

```python

273

from airflow import DAG

274

from airflow.providers.snowflake.transfers.copy_into_snowflake import CopyFromExternalStageToSnowflakeOperator

275

from airflow.providers.snowflake.operators.snowflake import SnowflakeSqlApiOperator

276

277

with DAG(

278

'multi_stage_ingestion',

279

start_date=datetime(2024, 1, 1),

280

schedule_interval='@hourly',

281

catchup=False

282

) as dag:

283

284

# Stage 1: Load raw transaction data

285

load_transactions = CopyFromExternalStageToSnowflakeOperator(

286

task_id='load_raw_transactions',

287

table='raw.transactions_staging',

288

stage='@s3_transaction_stage',

289

prefix='transactions/{{ ds }}/{{ format_datetime(ts, "%H") }}/',

290

file_format='csv_transactions',

291

warehouse='LOADING_WH'

292

)

293

294

# Stage 2: Load customer reference data

295

load_customers = CopyFromExternalStageToSnowflakeOperator(

296

task_id='load_customer_data',

297

table='raw.customers_staging',

298

stage='@s3_reference_stage',

299

prefix='customers/daily/{{ ds }}/',

300

file_format='json_customers',

301

warehouse='LOADING_WH',

302

copy_options='ON_ERROR=ABORT_STATEMENT' # Strict loading for reference data

303

)

304

305

# Stage 3: Load product catalog

306

load_products = CopyFromExternalStageToSnowflakeOperator(

307

task_id='load_product_catalog',

308

table='raw.products_staging',

309

stage='@s3_catalog_stage',

310

pattern='catalog/products_{{ ds }}\\.parquet',

311

file_format='parquet_products',

312

warehouse='LOADING_WH'

313

)

314

315

# Stage 4: Data quality validation and promotion

316

validate_and_promote = SnowflakeSqlApiOperator(

317

task_id='validate_and_promote_data',

318

sql='''

319

-- Validate transaction data

320

CREATE OR REPLACE TABLE raw.transactions AS

321

SELECT * FROM raw.transactions_staging

322

WHERE customer_id IS NOT NULL

323

AND transaction_amount > 0

324

AND transaction_date = '{{ ds }}';

325

326

-- Validate and merge customer data

327

MERGE INTO raw.customers c

328

USING raw.customers_staging s ON c.customer_id = s.customer_id

329

WHEN MATCHED THEN UPDATE SET

330

customer_name = s.customer_name,

331

email = s.email,

332

updated_at = CURRENT_TIMESTAMP()

333

WHEN NOT MATCHED THEN INSERT (customer_id, customer_name, email, created_at)

334

VALUES (s.customer_id, s.customer_name, s.email, CURRENT_TIMESTAMP());

335

336

-- Update product catalog

337

CREATE OR REPLACE TABLE raw.products AS

338

SELECT * FROM raw.products_staging;

339

340

-- Clean up staging tables

341

DROP TABLE raw.transactions_staging;

342

DROP TABLE raw.customers_staging;

343

DROP TABLE raw.products_staging;

344

''',

345

statement_count=6,

346

warehouse='PROCESSING_WH'

347

)

348

349

# Define dependencies

350

[load_transactions, load_customers, load_products] >> validate_and_promote

351

```

352

353

### Error Handling and Monitoring

354

355

```python

356

from airflow.providers.snowflake.transfers.copy_into_snowflake import CopyFromExternalStageToSnowflakeOperator

357

from airflow.providers.snowflake.operators.snowflake import SnowflakeCheckOperator

358

359

# Data loading with comprehensive error handling

360

resilient_load = CopyFromExternalStageToSnowflakeOperator(

361

task_id='resilient_data_load',

362

snowflake_conn_id='snowflake_prod',

363

table='staging.resilient_load',

364

stage='@s3_variable_quality_stage',

365

prefix='data/{{ ds }}/',

366

file_format='csv_with_errors',

367

warehouse='RESILIENT_WH',

368

copy_options='''

369

ON_ERROR=CONTINUE

370

RETURN_FAILED_ONLY=TRUE

371

MAX_FILE_SIZE=100000000

372

SKIP_BLANK_LINES=TRUE

373

''',

374

validation_mode='RETURN_ALL_ERRORS',

375

# Airflow task retry configuration

376

retries=3,

377

retry_delay=timedelta(minutes=5)

378

)

379

380

# Post-load data quality check

381

quality_check = SnowflakeCheckOperator(

382

task_id='verify_load_quality',

383

snowflake_conn_id='snowflake_prod',

384

sql='''

385

SELECT

386

CASE

387

WHEN COUNT(*) > 0 AND

388

COUNT(*) * 0.95 <= (SELECT COUNT(*) FROM staging.resilient_load WHERE error_column IS NULL)

389

THEN TRUE

390

ELSE FALSE

391

END as quality_passed

392

FROM staging.resilient_load

393

''',

394

warehouse='ANALYTICS_WH'

395

)

396

397

resilient_load >> quality_check

398

```

399

400

## File Format Support

401

402

The transfer operators support various file formats through Snowflake's file format objects:

403

404

### CSV Files

405

```sql

406

CREATE FILE FORMAT csv_format

407

TYPE = 'CSV'

408

FIELD_DELIMITER = ','

409

RECORD_DELIMITER = '\n'

410

SKIP_HEADER = 1

411

FIELD_OPTIONALLY_ENCLOSED_BY = '"'

412

ESCAPE_UNENCLOSED_FIELD = NONE;

413

```

414

415

### JSON Files

416

```sql

417

CREATE FILE FORMAT json_format

418

TYPE = 'JSON'

419

STRIP_OUTER_ARRAY = TRUE

420

DATE_FORMAT = 'YYYY-MM-DD'

421

TIMESTAMP_FORMAT = 'YYYY-MM-DD HH24:MI:SS';

422

```

423

424

### Parquet Files

425

```sql

426

CREATE FILE FORMAT parquet_format

427

TYPE = 'PARQUET'

428

COMPRESSION = 'AUTO';

429

```

430

431

## Copy Options

432

433

Common COPY INTO options supported:

434

435

- `ON_ERROR`: Error handling (`CONTINUE`, `SKIP_FILE`, `ABORT_STATEMENT`)

436

- `SIZE_LIMIT`: Maximum data size to load per file

437

- `FORCE`: Force loading even if files were previously loaded

438

- `MATCH_BY_COLUMN_NAME`: Match columns by name instead of position

439

- `ENFORCE_LENGTH`: Enforce column length constraints

440

- `TRUNCATECOLUMNS`: Truncate columns that exceed target length

441

442

## Performance Optimization

443

444

### Warehouse Sizing

445

- Use larger warehouses (L, XL, 2XL) for bulk loading operations

446

- Consider multi-cluster warehouses for concurrent loads

447

- Auto-suspend warehouses after completion to control costs

448

449

### File Organization

450

- Organize files in uniform sizes (100-250MB optimal)

451

- Use compressed formats when possible (GZIP, BROTLI)

452

- Partition files by date or logical boundaries

453

454

### Parallel Loading

455

- Load from multiple stages or prefixes in parallel

456

- Use separate tasks for independent table loads

457

- Leverage Snowflake's automatic parallelization within single COPY operations

458

459

## Monitoring and Troubleshooting

460

461

### Load History Queries

462

```sql

463

-- Check recent COPY operations

464

SELECT *

465

FROM TABLE(INFORMATION_SCHEMA.COPY_HISTORY(

466

TABLE_NAME => 'RAW.SALES_TRANSACTIONS',

467

START_TIME => DATEADD(hour, -24, CURRENT_TIMESTAMP())

468

));

469

470

-- Check for load errors

471

SELECT *

472

FROM TABLE(INFORMATION_SCHEMA.LOAD_HISTORY(

473

TABLE_NAME => 'RAW.SALES_TRANSACTIONS',

474

START_TIME => DATEADD(hour, -24, CURRENT_TIMESTAMP())

475

))

476

WHERE STATUS = 'LOAD_FAILED';

477

```

478

479

### Common Issues and Solutions

480

- **Authentication Errors**: Verify stage credentials and permissions

481

- **File Format Mismatches**: Validate file format definition against actual files

482

- **Schema Mismatches**: Ensure target table structure matches source data

483

- **Permission Issues**: Verify role has USAGE on stage and INSERT on target table

484

485

## Error Handling

486

487

The transfer operators provide comprehensive error handling:

488

489

- **Stage Access Errors**: Invalid credentials, missing permissions, network issues

490

- **File Format Errors**: Schema mismatches, encoding issues, malformed data

491

- **Target Table Errors**: Missing tables, permission issues, constraint violations

492

- **Resource Errors**: Warehouse capacity limits, query complexity limits

493

494

All errors include detailed Snowflake error codes, file-specific information, and troubleshooting guidance.