0
# Data Operations
1
2
Specialized SQL operations including MERGE statements for upserts and COPY INTO commands for bulk data loading from cloud storage with comprehensive formatting options.
3
4
## Capabilities
5
6
### MERGE Operations
7
8
Advanced MERGE INTO statements for complex upsert operations with conditional logic.
9
10
```python { .api }
11
from snowflake.sqlalchemy import MergeInto
12
13
class MergeInto(UpdateBase):
14
"""MERGE INTO statement builder for upsert operations."""
15
16
def __init__(self, target, source, on):
17
"""
18
Create MERGE INTO statement.
19
20
Args:
21
target: Target table
22
source: Source table or query
23
on: Join condition
24
"""
25
26
def when_matched_then_update(self):
27
"""
28
Add WHEN MATCHED THEN UPDATE clause.
29
30
Returns:
31
clause: Clause object with .values(**kwargs) and .where(predicate) methods
32
"""
33
34
def when_matched_then_delete(self):
35
"""
36
Add WHEN MATCHED THEN DELETE clause.
37
38
Returns:
39
clause: Clause object with .where(predicate) method
40
"""
41
42
def when_not_matched_then_insert(self):
43
"""
44
Add WHEN NOT MATCHED THEN INSERT clause.
45
46
Returns:
47
clause: Clause object with .values(**kwargs) and .where(predicate) methods
48
"""
49
```
50
51
### COPY INTO Operations
52
53
Bulk data loading operations with comprehensive formatting and configuration options.
54
55
```python { .api }
56
from snowflake.sqlalchemy import CopyIntoStorage
57
58
class CopyInto(UpdateBase):
59
"""Base class for COPY INTO operations."""
60
61
def __init__(self, from_, into, partition_by=None, formatter=None):
62
"""
63
Create COPY INTO statement.
64
65
Args:
66
from_: Source location
67
into: Target location
68
partition_by: Optional partition expression
69
formatter: File format specification
70
"""
71
72
def force(self, force: bool = True):
73
"""
74
Set FORCE option to reload files.
75
76
Args:
77
force: Whether to force reload
78
79
Returns:
80
CopyInto: Self for method chaining
81
"""
82
83
def single(self, single_file: bool = True):
84
"""
85
Set SINGLE option to load single file.
86
87
Args:
88
single_file: Whether to load single file
89
90
Returns:
91
CopyInto: Self for method chaining
92
"""
93
94
def maxfilesize(self, max_size: int):
95
"""
96
Set maximum file size.
97
98
Args:
99
max_size: Maximum file size
100
101
Returns:
102
CopyInto: Self for method chaining
103
"""
104
105
def files(self, file_names: List[str]):
106
"""
107
Set FILES option to specify file list.
108
109
Args:
110
file_names: List of file names
111
112
Returns:
113
CopyInto: Self for method chaining
114
"""
115
116
def pattern(self, pattern: str):
117
"""
118
Set PATTERN option for file matching.
119
120
Args:
121
pattern: Regex pattern for file matching
122
123
Returns:
124
CopyInto: Self for method chaining
125
"""
126
127
# CopyIntoStorage is an alias for CopyInto
128
CopyIntoStorage = CopyInto
129
130
class CopyIntoStorage(CopyInto):
131
"""COPY INTO statement for bulk data operations (alias for CopyInto)."""
132
133
def __init__(self, table, stage_location, file_format=None):
134
"""
135
Create COPY INTO statement.
136
137
Args:
138
table: Target table
139
stage_location: Stage or external location
140
file_format: File format specification
141
"""
142
143
# Inherits all methods from CopyInto base class:
144
# force(), single(), maxfilesize(), files(), pattern()
145
```
146
147
### File Format Classes
148
149
Specialized formatters for different file types with comprehensive options.
150
151
```python { .api }
152
from snowflake.sqlalchemy import (
153
CopyFormatter, CSVFormatter, JSONFormatter, PARQUETFormatter
154
)
155
156
class CopyFormatter:
157
"""Base formatter for COPY commands."""
158
159
def __init__(self):
160
"""Initialize base formatter."""
161
162
class CSVFormatter(CopyFormatter):
163
"""CSV-specific formatter with extensive options."""
164
165
def __init__(self):
166
"""Initialize CSV formatter."""
167
168
def compression(self, compression_type: str):
169
"""
170
Set compression type.
171
172
Args:
173
compression_type: Compression type (GZIP, BZIP2, etc.)
174
"""
175
176
def record_delimiter(self, delimiter: Union[str, int]):
177
"""
178
Set record delimiter.
179
180
Args:
181
delimiter: Record delimiter character or ASCII code
182
"""
183
184
def field_delimiter(self, delimiter: Union[str, int]):
185
"""
186
Set field delimiter.
187
188
Args:
189
delimiter: Field delimiter character or ASCII code
190
"""
191
192
def skip_header(self, lines: int):
193
"""
194
Set number of header lines to skip.
195
196
Args:
197
lines: Number of header lines
198
"""
199
200
def null_if(self, values: Sequence[str]):
201
"""
202
Set NULL replacement values.
203
204
Args:
205
values: List of strings to treat as NULL
206
"""
207
208
def error_on_column_count_mismatch(self, flag: bool):
209
"""
210
Set error behavior for column count mismatch.
211
212
Args:
213
flag: Whether to error on mismatch
214
"""
215
216
class JSONFormatter(CopyFormatter):
217
"""JSON-specific formatter."""
218
219
def __init__(self):
220
"""Initialize JSON formatter."""
221
222
def compression(self, compression_type: str):
223
"""
224
Set compression type.
225
226
Args:
227
compression_type: Compression type
228
"""
229
230
def file_extension(self, extension: str):
231
"""
232
Set file extension.
233
234
Args:
235
extension: File extension
236
"""
237
238
class PARQUETFormatter(CopyFormatter):
239
"""Parquet-specific formatter."""
240
241
def __init__(self):
242
"""Initialize Parquet formatter."""
243
244
def snappy_compression(self, enabled: bool):
245
"""
246
Enable/disable Snappy compression.
247
248
Args:
249
enabled: Whether Snappy compression is enabled
250
"""
251
252
def binary_as_text(self, flag: bool):
253
"""
254
Handle binary data as text.
255
256
Args:
257
flag: Whether to treat binary as text
258
"""
259
```
260
261
### Stage Management
262
263
DDL operations for creating and managing stages and file formats.
264
265
```python { .api }
266
from snowflake.sqlalchemy import CreateStage, CreateFileFormat
267
268
class CreateStage(DDLElement):
269
"""CREATE STAGE DDL statement."""
270
271
def __init__(self, container, stage, replace_if_exists=False, *, temporary=False):
272
"""
273
Create stage creation statement.
274
275
Args:
276
container: Container (physical base for the stage)
277
stage: ExternalStage object
278
replace_if_exists: Whether to replace if exists
279
temporary: Whether stage is temporary
280
"""
281
282
class CreateFileFormat(DDLElement):
283
"""CREATE FILE FORMAT DDL statement."""
284
285
def __init__(self, format_name, formatter, replace_if_exists=False):
286
"""
287
Create file format creation statement.
288
289
Args:
290
format_name: File format name
291
formatter: Formatter object (CSVFormatter, JSONFormatter, etc.)
292
replace_if_exists: Whether to replace if exists
293
"""
294
```
295
296
## Usage Examples
297
298
### MERGE Operations
299
300
```python
301
from snowflake.sqlalchemy import MergeInto
302
from sqlalchemy import MetaData, Table, Column, Integer, select
303
304
# Basic merge operation
305
target_table = Table('users', metadata, autoload_with=engine)
306
source_table = Table('user_updates', metadata, autoload_with=engine)
307
308
merge = MergeInto(
309
target=target_table,
310
source=source_table,
311
on=target_table.c.id == source_table.c.id
312
)
313
314
# Add clauses
315
update_clause = merge.when_matched_then_update().values(
316
name=source_table.c.name,
317
email=source_table.c.email
318
)
319
320
insert_clause = merge.when_not_matched_then_insert().values(
321
id=source_table.c.id,
322
name=source_table.c.name,
323
email=source_table.c.email
324
)
325
326
# Execute merge
327
engine.execute(merge)
328
```
329
330
### Complex MERGE with Conditions
331
332
```python
333
# Merge with conditional logic
334
merge = MergeInto(
335
target=target_table,
336
source=select(source_table).where(source_table.c.active == True),
337
on=target_table.c.id == source_table.c.id
338
)
339
340
# Add clauses with conditions
341
update_clause = merge.when_matched_then_update().values(
342
name=source_table.c.name,
343
updated_at=func.current_timestamp()
344
)
345
346
delete_clause = merge.when_matched_then_delete().where(
347
source_table.c.deleted == True
348
)
349
350
insert_clause = merge.when_not_matched_then_insert().values(
351
id=source_table.c.id,
352
name=source_table.c.name,
353
created_at=func.current_timestamp()
354
)
355
```
356
357
### CSV Copy Operations
358
359
```python
360
from snowflake.sqlalchemy import CopyIntoStorage, CSVFormatter
361
362
# Create CSV formatter with options
363
csv_format = (CSVFormatter()
364
.compression('GZIP')
365
.field_delimiter(',')
366
.record_delimiter('\n')
367
.skip_header(1)
368
.null_if(['', 'NULL', 'null'])
369
.error_on_column_count_mismatch(True)
370
)
371
372
# Copy from external stage
373
copy_stmt = (CopyIntoStorage(
374
table=users_table,
375
stage_location='@my_stage/users/',
376
file_format=csv_format
377
)
378
.files(['users_001.csv.gz', 'users_002.csv.gz'])
379
.force(True)
380
)
381
382
engine.execute(copy_stmt)
383
```
384
385
### JSON Copy Operations
386
387
```python
388
from snowflake.sqlalchemy import JSONFormatter
389
390
# JSON formatter
391
json_format = (JSONFormatter()
392
.compression('GZIP')
393
.file_extension('.json')
394
)
395
396
# Copy JSON data
397
copy_json = CopyIntoStorage(
398
table=events_table,
399
stage_location='s3://my-bucket/events/',
400
file_format=json_format
401
).pattern('.*events.*\.json\.gz')
402
403
engine.execute(copy_json)
404
```
405
406
### Parquet Copy Operations
407
408
```python
409
from snowflake.sqlalchemy import PARQUETFormatter
410
411
# Parquet formatter
412
parquet_format = (PARQUETFormatter()
413
.snappy_compression(True)
414
.binary_as_text(False)
415
)
416
417
# Copy Parquet files
418
copy_parquet = CopyIntoStorage(
419
table=analytics_table,
420
stage_location='@analytics_stage/',
421
file_format=parquet_format
422
).single(False)
423
424
engine.execute(copy_parquet)
425
```
426
427
### Stage and File Format Creation
428
429
```python
430
from snowflake.sqlalchemy import CreateStage, CreateFileFormat
431
432
# Create CSV formatter first
433
csv_formatter = (CSVFormatter()
434
.field_delimiter(',')
435
.skip_header(1)
436
.null_if(['NULL', ''])
437
)
438
439
# Create file format
440
create_csv_format = CreateFileFormat(
441
format_name='my_csv_format',
442
formatter=csv_formatter
443
)
444
445
# Create AWS bucket container
446
aws_container = (AWSBucket('my-bucket', 'data/')
447
.credentials(aws_key_id='key', aws_secret_key='secret')
448
)
449
450
# Create external stage
451
external_stage = ExternalStage('my_external_stage')
452
create_stage = CreateStage(
453
container=aws_container,
454
stage=external_stage
455
)
456
457
engine.execute(create_csv_format)
458
engine.execute(create_stage)
459
```