0
# SQL Dialect and Database-Specific Operations
1
2
PostgreSQL-specific SQL dialect implementation providing optimized operations like UPSERT statements, primary key introspection, schema-aware queries, and database-specific query generation for enhanced PostgreSQL integration.
3
4
## Capabilities
5
6
### PostgresDialect Class
7
8
Database dialect implementation for PostgreSQL-specific operations and query generation.
9
10
```python { .api }
11
class PostgresDialect(Dialect):
12
"""
13
PostgreSQL-specific SQL dialect implementation.
14
Provides database-specific operations and query generation.
15
"""
16
17
@property
18
def name(self) -> str:
19
"""
20
Dialect name identifier.
21
22
Returns:
23
str: "postgresql"
24
"""
25
```
26
27
### Primary Key Introspection
28
29
Retrieve primary key information using PostgreSQL system catalogs with caching for performance.
30
31
```python { .api }
32
@lru_cache(maxsize=None)
33
def get_primary_keys(
34
self,
35
table: str,
36
schema: str | None = None
37
) -> list[str] | None:
38
"""
39
Get primary key columns using information_schema queries.
40
Uses LRU cache for performance optimization.
41
42
Parameters:
43
- table: str, table name (may include schema as "schema.table")
44
- schema: str or None, schema name (extracted from table if None)
45
46
Returns:
47
list[str] or None: Primary key column names, None if no primary key exists
48
49
Implementation:
50
- Extracts schema from table name if schema parameter is None
51
- Queries information_schema.table_constraints and key_column_usage
52
- Caches results to avoid repeated database queries
53
"""
54
```
55
56
### UPSERT SQL Generation
57
58
Generate PostgreSQL-specific UPSERT statements using ON CONFLICT clause for efficient conflict resolution.
59
60
```python { .api }
61
def generate_replace_sql(
62
self,
63
table,
64
values,
65
target_fields,
66
**kwargs
67
) -> str:
68
"""
69
Generate PostgreSQL UPSERT statement using ON CONFLICT clause.
70
71
Parameters:
72
- table: str, target table name
73
- values: list, row values for insertion
74
- target_fields: list, column names for insertion
75
- **kwargs: additional parameters including replace_index
76
77
Kwargs:
78
- replace_index: str or list, column(s) to use for conflict detection
79
80
Returns:
81
str: Generated UPSERT SQL with ON CONFLICT DO UPDATE clause
82
83
Example Output:
84
INSERT INTO users (id, name, email) VALUES (%s, %s, %s)
85
ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name, email = EXCLUDED.email
86
"""
87
```
88
89
## Usage Examples
90
91
### Direct Dialect Usage
92
93
```python
94
from airflow.providers.postgres.dialects.postgres import PostgresDialect
95
from airflow.providers.postgres.hooks.postgres import PostgresHook
96
97
# Get dialect instance
98
hook = PostgresHook(postgres_conn_id="postgres_default")
99
dialect = hook.dialect # Returns PostgresDialect instance
100
101
# Or create directly
102
dialect = PostgresDialect()
103
```
104
105
### Primary Key Introspection
106
107
```python
108
# Get primary keys for table
109
pk_columns = dialect.get_primary_keys("users", "public")
110
print(f"Primary key columns: {pk_columns}") # ["id"]
111
112
# Handle table with schema in name
113
pk_columns = dialect.get_primary_keys("sales.orders")
114
print(f"Primary key columns: {pk_columns}") # ["order_id"]
115
116
# Composite primary key
117
pk_columns = dialect.get_primary_keys("order_items")
118
print(f"Composite key: {pk_columns}") # ["order_id", "product_id"]
119
```
120
121
### UPSERT SQL Generation
122
123
```python
124
# Generate UPSERT for single-column primary key
125
upsert_sql = dialect.generate_replace_sql(
126
table="users",
127
values=[(1, "John", "john@example.com")],
128
target_fields=["id", "name", "email"],
129
replace_index="id"
130
)
131
print(upsert_sql)
132
# INSERT INTO users (id, name, email) VALUES (%s, %s, %s)
133
# ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name, email = EXCLUDED.email
134
135
# Generate UPSERT for composite key
136
upsert_sql = dialect.generate_replace_sql(
137
table="user_preferences",
138
values=[(1, "theme", "dark")],
139
target_fields=["user_id", "setting_key", "setting_value"],
140
replace_index=["user_id", "setting_key"]
141
)
142
print(upsert_sql)
143
# INSERT INTO user_preferences (user_id, setting_key, setting_value) VALUES (%s, %s, %s)
144
# ON CONFLICT (user_id, setting_key) DO UPDATE SET setting_value = EXCLUDED.setting_value
145
```
146
147
### Integration with PostgresHook
148
149
The dialect is automatically used by PostgresHook for database-specific operations:
150
151
```python
152
hook = PostgresHook(postgres_conn_id="postgres_default")
153
154
# insert_rows automatically uses dialect for UPSERT generation
155
hook.insert_rows(
156
table="products",
157
rows=[
158
(1, "Widget", 19.99),
159
(2, "Gadget", 29.99)
160
],
161
target_fields=["id", "name", "price"],
162
replace=True, # Triggers UPSERT generation
163
replace_index="id"
164
)
165
```
166
167
## Advanced Usage
168
169
### Custom Schema Handling
170
171
```python
172
def analyze_table_structure(table_name, schema_name=None):
173
"""Analyze table structure using dialect capabilities."""
174
175
dialect = PostgresDialect()
176
177
# Get primary key information
178
pk_columns = dialect.get_primary_keys(table_name, schema_name)
179
180
if pk_columns:
181
print(f"Table {table_name} has primary key: {pk_columns}")
182
183
# Generate sample UPSERT
184
sample_values = [tuple(f"value_{i}" for i in range(len(["col1", "col2", "col3"])))]
185
upsert_sql = dialect.generate_replace_sql(
186
table=table_name,
187
values=sample_values,
188
target_fields=["col1", "col2", "col3"],
189
replace_index=pk_columns
190
)
191
print(f"Sample UPSERT SQL:\n{upsert_sql}")
192
else:
193
print(f"Table {table_name} has no primary key")
194
195
# Analyze different tables
196
analyze_table_structure("users", "public")
197
analyze_table_structure("sales.orders")
198
```
199
200
### Bulk UPSERT Operations
201
202
```python
203
def bulk_upsert_with_dialect():
204
"""Perform bulk upsert using dialect-generated SQL."""
205
206
hook = PostgresHook()
207
dialect = hook.dialect
208
209
# Large dataset for upsert
210
data_rows = [
211
(i, f"user_{i}", f"user_{i}@example.com")
212
for i in range(1, 10001)
213
]
214
215
# Get primary key for target table
216
pk_columns = dialect.get_primary_keys("users", "public")
217
218
if pk_columns:
219
# Use hook's built-in upsert (uses dialect internally)
220
hook.insert_rows(
221
table="users",
222
rows=data_rows,
223
target_fields=["id", "name", "email"],
224
replace=True,
225
replace_index=pk_columns,
226
commit_every=1000
227
)
228
else:
229
# Fallback to regular insert
230
hook.insert_rows(
231
table="users",
232
rows=data_rows,
233
target_fields=["id", "name", "email"],
234
commit_every=1000
235
)
236
```
237
238
### Dynamic UPSERT Generation
239
240
```python
241
def dynamic_upsert_handler(table_name, data_dict_list, schema="public"):
242
"""
243
Handle upsert operations dynamically based on table structure.
244
"""
245
hook = PostgresHook()
246
dialect = hook.dialect
247
248
# Get table primary key
249
pk_columns = dialect.get_primary_keys(table_name, schema)
250
251
if not data_dict_list:
252
return
253
254
# Extract fields from data
255
target_fields = list(data_dict_list[0].keys())
256
rows = [list(row.values()) for row in data_dict_list]
257
258
if pk_columns:
259
# Check if all primary key columns are present
260
missing_pk_cols = set(pk_columns) - set(target_fields)
261
if missing_pk_cols:
262
raise ValueError(f"Missing primary key columns: {missing_pk_cols}")
263
264
# Perform upsert
265
hook.insert_rows(
266
table=table_name,
267
rows=rows,
268
target_fields=target_fields,
269
replace=True,
270
replace_index=pk_columns
271
)
272
print(f"Upserted {len(rows)} rows into {table_name}")
273
else:
274
# Regular insert for tables without primary key
275
hook.insert_rows(
276
table=table_name,
277
rows=rows,
278
target_fields=target_fields
279
)
280
print(f"Inserted {len(rows)} rows into {table_name}")
281
282
# Usage
283
user_data = [
284
{"id": 1, "name": "Alice", "email": "alice@example.com"},
285
{"id": 2, "name": "Bob", "email": "bob@example.com"}
286
]
287
288
dynamic_upsert_handler("users", user_data)
289
```
290
291
## Performance Optimization
292
293
### Primary Key Caching
294
295
The dialect uses LRU caching to optimize primary key lookups:
296
297
```python
298
# First call queries database
299
pk1 = dialect.get_primary_keys("users") # Database query
300
301
# Subsequent calls use cache
302
pk2 = dialect.get_primary_keys("users") # From cache
303
pk3 = dialect.get_primary_keys("users") # From cache
304
305
# Clear cache if needed (rare)
306
dialect.get_primary_keys.cache_clear()
307
```
308
309
### UPSERT vs INSERT Performance
310
311
```python
312
def performance_comparison():
313
"""Compare UPSERT vs INSERT performance."""
314
315
import time
316
317
hook = PostgresHook()
318
test_data = [(i, f"user_{i}") for i in range(1000)]
319
320
# Regular INSERT
321
start_time = time.time()
322
hook.insert_rows("test_users", test_data, ["id", "name"])
323
insert_time = time.time() - start_time
324
325
# UPSERT (with conflict resolution)
326
start_time = time.time()
327
hook.insert_rows(
328
"test_users",
329
test_data,
330
["id", "name"],
331
replace=True,
332
replace_index="id"
333
)
334
upsert_time = time.time() - start_time
335
336
print(f"INSERT time: {insert_time:.2f}s")
337
print(f"UPSERT time: {upsert_time:.2f}s")
338
```
339
340
## SQL Generation Details
341
342
### ON CONFLICT Clause Structure
343
344
The dialect generates UPSERT statements with the following structure:
345
346
```sql
347
INSERT INTO table_name (column1, column2, ...)
348
VALUES (%s, %s, ...)
349
ON CONFLICT (conflict_columns)
350
DO UPDATE SET
351
column1 = EXCLUDED.column1,
352
column2 = EXCLUDED.column2,
353
...
354
```
355
356
### Conflict Resolution Options
357
358
```python
359
# Single column conflict
360
upsert_sql = dialect.generate_replace_sql(
361
table="users",
362
values=data,
363
target_fields=["id", "name", "email"],
364
replace_index="id" # Single column
365
)
366
367
# Multi-column conflict (composite key)
368
upsert_sql = dialect.generate_replace_sql(
369
table="user_settings",
370
values=data,
371
target_fields=["user_id", "setting_name", "value"],
372
replace_index=["user_id", "setting_name"] # Multiple columns
373
)
374
```
375
376
### Integration with PostgreSQL Features
377
378
The dialect leverages PostgreSQL-specific features:
379
380
- **ON CONFLICT**: Native upsert support (PostgreSQL 9.5+)
381
- **EXCLUDED**: Reference to conflicting row values
382
- **information_schema**: Standard metadata queries
383
- **System catalogs**: pg_* tables for advanced introspection