or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

asset-management.mdaws-integration.mdbulk-operations.mddata-retrieval.mddatabase-connection.mdindex.mdopenlineage-integration.mdschema-operations.mdsql-dialect.md

sql-dialect.mddocs/

0

# SQL Dialect and Database-Specific Operations

1

2

PostgreSQL-specific SQL dialect implementation providing optimized operations like UPSERT statements, primary key introspection, schema-aware queries, and database-specific query generation for enhanced PostgreSQL integration.

3

4

## Capabilities

5

6

### PostgresDialect Class

7

8

Database dialect implementation for PostgreSQL-specific operations and query generation.

9

10

```python { .api }

11

class PostgresDialect(Dialect):

12

"""

13

PostgreSQL-specific SQL dialect implementation.

14

Provides database-specific operations and query generation.

15

"""

16

17

@property

18

def name(self) -> str:

19

"""

20

Dialect name identifier.

21

22

Returns:

23

str: "postgresql"

24

"""

25

```

26

27

### Primary Key Introspection

28

29

Retrieve primary key information using PostgreSQL system catalogs with caching for performance.

30

31

```python { .api }

32

@lru_cache(maxsize=None)

33

def get_primary_keys(

34

self,

35

table: str,

36

schema: str | None = None

37

) -> list[str] | None:

38

"""

39

Get primary key columns using information_schema queries.

40

Uses LRU cache for performance optimization.

41

42

Parameters:

43

- table: str, table name (may include schema as "schema.table")

44

- schema: str or None, schema name (extracted from table if None)

45

46

Returns:

47

list[str] or None: Primary key column names, None if no primary key exists

48

49

Implementation:

50

- Extracts schema from table name if schema parameter is None

51

- Queries information_schema.table_constraints and key_column_usage

52

- Caches results to avoid repeated database queries

53

"""

54

```

55

56

### UPSERT SQL Generation

57

58

Generate PostgreSQL-specific UPSERT statements using ON CONFLICT clause for efficient conflict resolution.

59

60

```python { .api }

61

def generate_replace_sql(

62

self,

63

table,

64

values,

65

target_fields,

66

**kwargs

67

) -> str:

68

"""

69

Generate PostgreSQL UPSERT statement using ON CONFLICT clause.

70

71

Parameters:

72

- table: str, target table name

73

- values: list, row values for insertion

74

- target_fields: list, column names for insertion

75

- **kwargs: additional parameters including replace_index

76

77

Kwargs:

78

- replace_index: str or list, column(s) to use for conflict detection

79

80

Returns:

81

str: Generated UPSERT SQL with ON CONFLICT DO UPDATE clause

82

83

Example Output:

84

INSERT INTO users (id, name, email) VALUES (%s, %s, %s)

85

ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name, email = EXCLUDED.email

86

"""

87

```

88

89

## Usage Examples

90

91

### Direct Dialect Usage

92

93

```python

94

from airflow.providers.postgres.dialects.postgres import PostgresDialect

95

from airflow.providers.postgres.hooks.postgres import PostgresHook

96

97

# Get dialect instance

98

hook = PostgresHook(postgres_conn_id="postgres_default")

99

dialect = hook.dialect # Returns PostgresDialect instance

100

101

# Or create directly

102

dialect = PostgresDialect()

103

```

104

105

### Primary Key Introspection

106

107

```python

108

# Get primary keys for table

109

pk_columns = dialect.get_primary_keys("users", "public")

110

print(f"Primary key columns: {pk_columns}") # ["id"]

111

112

# Handle table with schema in name

113

pk_columns = dialect.get_primary_keys("sales.orders")

114

print(f"Primary key columns: {pk_columns}") # ["order_id"]

115

116

# Composite primary key

117

pk_columns = dialect.get_primary_keys("order_items")

118

print(f"Composite key: {pk_columns}") # ["order_id", "product_id"]

119

```

120

121

### UPSERT SQL Generation

122

123

```python

124

# Generate UPSERT for single-column primary key

125

upsert_sql = dialect.generate_replace_sql(

126

table="users",

127

values=[(1, "John", "john@example.com")],

128

target_fields=["id", "name", "email"],

129

replace_index="id"

130

)

131

print(upsert_sql)

132

# INSERT INTO users (id, name, email) VALUES (%s, %s, %s)

133

# ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name, email = EXCLUDED.email

134

135

# Generate UPSERT for composite key

136

upsert_sql = dialect.generate_replace_sql(

137

table="user_preferences",

138

values=[(1, "theme", "dark")],

139

target_fields=["user_id", "setting_key", "setting_value"],

140

replace_index=["user_id", "setting_key"]

141

)

142

print(upsert_sql)

143

# INSERT INTO user_preferences (user_id, setting_key, setting_value) VALUES (%s, %s, %s)

144

# ON CONFLICT (user_id, setting_key) DO UPDATE SET setting_value = EXCLUDED.setting_value

145

```

146

147

### Integration with PostgresHook

148

149

The dialect is automatically used by PostgresHook for database-specific operations:

150

151

```python

152

hook = PostgresHook(postgres_conn_id="postgres_default")

153

154

# insert_rows automatically uses dialect for UPSERT generation

155

hook.insert_rows(

156

table="products",

157

rows=[

158

(1, "Widget", 19.99),

159

(2, "Gadget", 29.99)

160

],

161

target_fields=["id", "name", "price"],

162

replace=True, # Triggers UPSERT generation

163

replace_index="id"

164

)

165

```

166

167

## Advanced Usage

168

169

### Custom Schema Handling

170

171

```python

172

def analyze_table_structure(table_name, schema_name=None):

173

"""Analyze table structure using dialect capabilities."""

174

175

dialect = PostgresDialect()

176

177

# Get primary key information

178

pk_columns = dialect.get_primary_keys(table_name, schema_name)

179

180

if pk_columns:

181

print(f"Table {table_name} has primary key: {pk_columns}")

182

183

# Generate sample UPSERT

184

sample_values = [tuple(f"value_{i}" for i in range(len(["col1", "col2", "col3"])))]

185

upsert_sql = dialect.generate_replace_sql(

186

table=table_name,

187

values=sample_values,

188

target_fields=["col1", "col2", "col3"],

189

replace_index=pk_columns

190

)

191

print(f"Sample UPSERT SQL:\n{upsert_sql}")

192

else:

193

print(f"Table {table_name} has no primary key")

194

195

# Analyze different tables

196

analyze_table_structure("users", "public")

197

analyze_table_structure("sales.orders")

198

```

199

200

### Bulk UPSERT Operations

201

202

```python

203

def bulk_upsert_with_dialect():

204

"""Perform bulk upsert using dialect-generated SQL."""

205

206

hook = PostgresHook()

207

dialect = hook.dialect

208

209

# Large dataset for upsert

210

data_rows = [

211

(i, f"user_{i}", f"user_{i}@example.com")

212

for i in range(1, 10001)

213

]

214

215

# Get primary key for target table

216

pk_columns = dialect.get_primary_keys("users", "public")

217

218

if pk_columns:

219

# Use hook's built-in upsert (uses dialect internally)

220

hook.insert_rows(

221

table="users",

222

rows=data_rows,

223

target_fields=["id", "name", "email"],

224

replace=True,

225

replace_index=pk_columns,

226

commit_every=1000

227

)

228

else:

229

# Fallback to regular insert

230

hook.insert_rows(

231

table="users",

232

rows=data_rows,

233

target_fields=["id", "name", "email"],

234

commit_every=1000

235

)

236

```

237

238

### Dynamic UPSERT Generation

239

240

```python

241

def dynamic_upsert_handler(table_name, data_dict_list, schema="public"):

242

"""

243

Handle upsert operations dynamically based on table structure.

244

"""

245

hook = PostgresHook()

246

dialect = hook.dialect

247

248

# Get table primary key

249

pk_columns = dialect.get_primary_keys(table_name, schema)

250

251

if not data_dict_list:

252

return

253

254

# Extract fields from data

255

target_fields = list(data_dict_list[0].keys())

256

rows = [list(row.values()) for row in data_dict_list]

257

258

if pk_columns:

259

# Check if all primary key columns are present

260

missing_pk_cols = set(pk_columns) - set(target_fields)

261

if missing_pk_cols:

262

raise ValueError(f"Missing primary key columns: {missing_pk_cols}")

263

264

# Perform upsert

265

hook.insert_rows(

266

table=table_name,

267

rows=rows,

268

target_fields=target_fields,

269

replace=True,

270

replace_index=pk_columns

271

)

272

print(f"Upserted {len(rows)} rows into {table_name}")

273

else:

274

# Regular insert for tables without primary key

275

hook.insert_rows(

276

table=table_name,

277

rows=rows,

278

target_fields=target_fields

279

)

280

print(f"Inserted {len(rows)} rows into {table_name}")

281

282

# Usage

283

user_data = [

284

{"id": 1, "name": "Alice", "email": "alice@example.com"},

285

{"id": 2, "name": "Bob", "email": "bob@example.com"}

286

]

287

288

dynamic_upsert_handler("users", user_data)

289

```

290

291

## Performance Optimization

292

293

### Primary Key Caching

294

295

The dialect uses LRU caching to optimize primary key lookups:

296

297

```python

298

# First call queries database

299

pk1 = dialect.get_primary_keys("users") # Database query

300

301

# Subsequent calls use cache

302

pk2 = dialect.get_primary_keys("users") # From cache

303

pk3 = dialect.get_primary_keys("users") # From cache

304

305

# Clear cache if needed (rare)

306

dialect.get_primary_keys.cache_clear()

307

```

308

309

### UPSERT vs INSERT Performance

310

311

```python

312

def performance_comparison():

313

"""Compare UPSERT vs INSERT performance."""

314

315

import time

316

317

hook = PostgresHook()

318

test_data = [(i, f"user_{i}") for i in range(1000)]

319

320

# Regular INSERT

321

start_time = time.time()

322

hook.insert_rows("test_users", test_data, ["id", "name"])

323

insert_time = time.time() - start_time

324

325

# UPSERT (with conflict resolution)

326

start_time = time.time()

327

hook.insert_rows(

328

"test_users",

329

test_data,

330

["id", "name"],

331

replace=True,

332

replace_index="id"

333

)

334

upsert_time = time.time() - start_time

335

336

print(f"INSERT time: {insert_time:.2f}s")

337

print(f"UPSERT time: {upsert_time:.2f}s")

338

```

339

340

## SQL Generation Details

341

342

### ON CONFLICT Clause Structure

343

344

The dialect generates UPSERT statements with the following structure:

345

346

```sql

347

INSERT INTO table_name (column1, column2, ...)

348

VALUES (%s, %s, ...)

349

ON CONFLICT (conflict_columns)

350

DO UPDATE SET

351

column1 = EXCLUDED.column1,

352

column2 = EXCLUDED.column2,

353

...

354

```

355

356

### Conflict Resolution Options

357

358

```python

359

# Single column conflict

360

upsert_sql = dialect.generate_replace_sql(

361

table="users",

362

values=data,

363

target_fields=["id", "name", "email"],

364

replace_index="id" # Single column

365

)

366

367

# Multi-column conflict (composite key)

368

upsert_sql = dialect.generate_replace_sql(

369

table="user_settings",

370

values=data,

371

target_fields=["user_id", "setting_name", "value"],

372

replace_index=["user_id", "setting_name"] # Multiple columns

373

)

374

```

375

376

### Integration with PostgreSQL Features

377

378

The dialect leverages PostgreSQL-specific features:

379

380

- **ON CONFLICT**: Native upsert support (PostgreSQL 9.5+)

381

- **EXCLUDED**: Reference to conflicting row values

382

- **information_schema**: Standard metadata queries

383

- **System catalogs**: pg_* tables for advanced introspection