or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cloud-storage.mdconnection-config.mddata-operations.mddata-types.mdindex.mdtable-types.md

data-operations.mddocs/

0

# Data Operations

1

2

Specialized SQL operations including MERGE statements for upserts and COPY INTO commands for bulk data loading from cloud storage with comprehensive formatting options.

3

4

## Capabilities

5

6

### MERGE Operations

7

8

Advanced MERGE INTO statements for complex upsert operations with conditional logic.

9

10

```python { .api }

11

from snowflake.sqlalchemy import MergeInto

12

13

class MergeInto(UpdateBase):

14

"""MERGE INTO statement builder for upsert operations."""

15

16

def __init__(self, target, source, on):

17

"""

18

Create MERGE INTO statement.

19

20

Args:

21

target: Target table

22

source: Source table or query

23

on: Join condition

24

"""

25

26

def when_matched_then_update(self):

27

"""

28

Add WHEN MATCHED THEN UPDATE clause.

29

30

Returns:

31

clause: Clause object with .values(**kwargs) and .where(predicate) methods

32

"""

33

34

def when_matched_then_delete(self):

35

"""

36

Add WHEN MATCHED THEN DELETE clause.

37

38

Returns:

39

clause: Clause object with .where(predicate) method

40

"""

41

42

def when_not_matched_then_insert(self):

43

"""

44

Add WHEN NOT MATCHED THEN INSERT clause.

45

46

Returns:

47

clause: Clause object with .values(**kwargs) and .where(predicate) methods

48

"""

49

```

50

51

### COPY INTO Operations

52

53

Bulk data loading operations with comprehensive formatting and configuration options.

54

55

```python { .api }

56

from snowflake.sqlalchemy import CopyIntoStorage

57

58

class CopyInto(UpdateBase):

59

"""Base class for COPY INTO operations."""

60

61

def __init__(self, from_, into, partition_by=None, formatter=None):

62

"""

63

Create COPY INTO statement.

64

65

Args:

66

from_: Source location

67

into: Target location

68

partition_by: Optional partition expression

69

formatter: File format specification

70

"""

71

72

def force(self, force: bool = True):

73

"""

74

Set FORCE option to reload files.

75

76

Args:

77

force: Whether to force reload

78

79

Returns:

80

CopyInto: Self for method chaining

81

"""

82

83

def single(self, single_file: bool = True):

84

"""

85

Set SINGLE option to load single file.

86

87

Args:

88

single_file: Whether to load single file

89

90

Returns:

91

CopyInto: Self for method chaining

92

"""

93

94

def maxfilesize(self, max_size: int):

95

"""

96

Set maximum file size.

97

98

Args:

99

max_size: Maximum file size

100

101

Returns:

102

CopyInto: Self for method chaining

103

"""

104

105

def files(self, file_names: List[str]):

106

"""

107

Set FILES option to specify file list.

108

109

Args:

110

file_names: List of file names

111

112

Returns:

113

CopyInto: Self for method chaining

114

"""

115

116

def pattern(self, pattern: str):

117

"""

118

Set PATTERN option for file matching.

119

120

Args:

121

pattern: Regex pattern for file matching

122

123

Returns:

124

CopyInto: Self for method chaining

125

"""

126

127

# CopyIntoStorage is an alias for CopyInto

128

CopyIntoStorage = CopyInto

129

130

class CopyIntoStorage(CopyInto):

131

"""COPY INTO statement for bulk data operations (alias for CopyInto)."""

132

133

def __init__(self, table, stage_location, file_format=None):

134

"""

135

Create COPY INTO statement.

136

137

Args:

138

table: Target table

139

stage_location: Stage or external location

140

file_format: File format specification

141

"""

142

143

# Inherits all methods from CopyInto base class:

144

# force(), single(), maxfilesize(), files(), pattern()

145

```

146

147

### File Format Classes

148

149

Specialized formatters for different file types with comprehensive options.

150

151

```python { .api }

152

from snowflake.sqlalchemy import (

153

CopyFormatter, CSVFormatter, JSONFormatter, PARQUETFormatter

154

)

155

156

class CopyFormatter:

157

"""Base formatter for COPY commands."""

158

159

def __init__(self):

160

"""Initialize base formatter."""

161

162

class CSVFormatter(CopyFormatter):

163

"""CSV-specific formatter with extensive options."""

164

165

def __init__(self):

166

"""Initialize CSV formatter."""

167

168

def compression(self, compression_type: str):

169

"""

170

Set compression type.

171

172

Args:

173

compression_type: Compression type (GZIP, BZIP2, etc.)

174

"""

175

176

def record_delimiter(self, delimiter: Union[str, int]):

177

"""

178

Set record delimiter.

179

180

Args:

181

delimiter: Record delimiter character or ASCII code

182

"""

183

184

def field_delimiter(self, delimiter: Union[str, int]):

185

"""

186

Set field delimiter.

187

188

Args:

189

delimiter: Field delimiter character or ASCII code

190

"""

191

192

def skip_header(self, lines: int):

193

"""

194

Set number of header lines to skip.

195

196

Args:

197

lines: Number of header lines

198

"""

199

200

def null_if(self, values: Sequence[str]):

201

"""

202

Set NULL replacement values.

203

204

Args:

205

values: List of strings to treat as NULL

206

"""

207

208

def error_on_column_count_mismatch(self, flag: bool):

209

"""

210

Set error behavior for column count mismatch.

211

212

Args:

213

flag: Whether to error on mismatch

214

"""

215

216

class JSONFormatter(CopyFormatter):

217

"""JSON-specific formatter."""

218

219

def __init__(self):

220

"""Initialize JSON formatter."""

221

222

def compression(self, compression_type: str):

223

"""

224

Set compression type.

225

226

Args:

227

compression_type: Compression type

228

"""

229

230

def file_extension(self, extension: str):

231

"""

232

Set file extension.

233

234

Args:

235

extension: File extension

236

"""

237

238

class PARQUETFormatter(CopyFormatter):

239

"""Parquet-specific formatter."""

240

241

def __init__(self):

242

"""Initialize Parquet formatter."""

243

244

def snappy_compression(self, enabled: bool):

245

"""

246

Enable/disable Snappy compression.

247

248

Args:

249

enabled: Whether Snappy compression is enabled

250

"""

251

252

def binary_as_text(self, flag: bool):

253

"""

254

Handle binary data as text.

255

256

Args:

257

flag: Whether to treat binary as text

258

"""

259

```

260

261

### Stage Management

262

263

DDL operations for creating and managing stages and file formats.

264

265

```python { .api }

266

from snowflake.sqlalchemy import CreateStage, CreateFileFormat

267

268

class CreateStage(DDLElement):

269

"""CREATE STAGE DDL statement."""

270

271

def __init__(self, container, stage, replace_if_exists=False, *, temporary=False):

272

"""

273

Create stage creation statement.

274

275

Args:

276

container: Container (physical base for the stage)

277

stage: ExternalStage object

278

replace_if_exists: Whether to replace if exists

279

temporary: Whether stage is temporary

280

"""

281

282

class CreateFileFormat(DDLElement):

283

"""CREATE FILE FORMAT DDL statement."""

284

285

def __init__(self, format_name, formatter, replace_if_exists=False):

286

"""

287

Create file format creation statement.

288

289

Args:

290

format_name: File format name

291

formatter: Formatter object (CSVFormatter, JSONFormatter, etc.)

292

replace_if_exists: Whether to replace if exists

293

"""

294

```

295

296

## Usage Examples

297

298

### MERGE Operations

299

300

```python

301

from snowflake.sqlalchemy import MergeInto

302

from sqlalchemy import MetaData, Table, Column, Integer, select

303

304

# Basic merge operation

305

target_table = Table('users', metadata, autoload_with=engine)

306

source_table = Table('user_updates', metadata, autoload_with=engine)

307

308

merge = MergeInto(

309

target=target_table,

310

source=source_table,

311

on=target_table.c.id == source_table.c.id

312

)

313

314

# Add clauses

315

update_clause = merge.when_matched_then_update().values(

316

name=source_table.c.name,

317

email=source_table.c.email

318

)

319

320

insert_clause = merge.when_not_matched_then_insert().values(

321

id=source_table.c.id,

322

name=source_table.c.name,

323

email=source_table.c.email

324

)

325

326

# Execute merge

327

engine.execute(merge)

328

```

329

330

### Complex MERGE with Conditions

331

332

```python

333

# Merge with conditional logic

334

merge = MergeInto(

335

target=target_table,

336

source=select(source_table).where(source_table.c.active == True),

337

on=target_table.c.id == source_table.c.id

338

)

339

340

# Add clauses with conditions

341

update_clause = merge.when_matched_then_update().values(

342

name=source_table.c.name,

343

updated_at=func.current_timestamp()

344

)

345

346

delete_clause = merge.when_matched_then_delete().where(

347

source_table.c.deleted == True

348

)

349

350

insert_clause = merge.when_not_matched_then_insert().values(

351

id=source_table.c.id,

352

name=source_table.c.name,

353

created_at=func.current_timestamp()

354

)

355

```

356

357

### CSV Copy Operations

358

359

```python

360

from snowflake.sqlalchemy import CopyIntoStorage, CSVFormatter

361

362

# Create CSV formatter with options

363

csv_format = (CSVFormatter()

364

.compression('GZIP')

365

.field_delimiter(',')

366

.record_delimiter('\n')

367

.skip_header(1)

368

.null_if(['', 'NULL', 'null'])

369

.error_on_column_count_mismatch(True)

370

)

371

372

# Copy from external stage

373

copy_stmt = (CopyIntoStorage(

374

table=users_table,

375

stage_location='@my_stage/users/',

376

file_format=csv_format

377

)

378

.files(['users_001.csv.gz', 'users_002.csv.gz'])

379

.force(True)

380

)

381

382

engine.execute(copy_stmt)

383

```

384

385

### JSON Copy Operations

386

387

```python

388

from snowflake.sqlalchemy import JSONFormatter

389

390

# JSON formatter

391

json_format = (JSONFormatter()

392

.compression('GZIP')

393

.file_extension('.json')

394

)

395

396

# Copy JSON data

397

copy_json = CopyIntoStorage(

398

table=events_table,

399

stage_location='s3://my-bucket/events/',

400

file_format=json_format

401

).pattern('.*events.*\.json\.gz')

402

403

engine.execute(copy_json)

404

```

405

406

### Parquet Copy Operations

407

408

```python

409

from snowflake.sqlalchemy import PARQUETFormatter

410

411

# Parquet formatter

412

parquet_format = (PARQUETFormatter()

413

.snappy_compression(True)

414

.binary_as_text(False)

415

)

416

417

# Copy Parquet files

418

copy_parquet = CopyIntoStorage(

419

table=analytics_table,

420

stage_location='@analytics_stage/',

421

file_format=parquet_format

422

).single(False)

423

424

engine.execute(copy_parquet)

425

```

426

427

### Stage and File Format Creation

428

429

```python

430

from snowflake.sqlalchemy import CreateStage, CreateFileFormat

431

432

# Create CSV formatter first

433

csv_formatter = (CSVFormatter()

434

.field_delimiter(',')

435

.skip_header(1)

436

.null_if(['NULL', ''])

437

)

438

439

# Create file format

440

create_csv_format = CreateFileFormat(

441

format_name='my_csv_format',

442

formatter=csv_formatter

443

)

444

445

# Create AWS bucket container

446

aws_container = (AWSBucket('my-bucket', 'data/')

447

.credentials(aws_key_id='key', aws_secret_key='secret')

448

)

449

450

# Create external stage

451

external_stage = ExternalStage('my_external_stage')

452

create_stage = CreateStage(

453

container=aws_container,

454

stage=external_stage

455

)

456

457

engine.execute(create_csv_format)

458

engine.execute(create_stage)

459

```