0
# SQL Dialects
1
2
SQL dialects provide database-specific SQL formatting and operations, abstracting differences between SQL databases including query formatting, data type handling, and schema operations.
3
4
## Capabilities
5
6
### Generic SQL Dialect
7
8
Base dialect implementation providing common SQL formatting and operations.
9
10
```python { .api }
11
class Dialect:
12
"""
13
Generic SQL dialect implementation.
14
15
Attributes:
16
placeholder (str): SQL placeholder character for parameters
17
inspector: SQLAlchemy inspector for schema operations
18
insert_statement_format (str): Format string for INSERT statements
19
replace_statement_format (str): Format string for REPLACE statements
20
escape_word_format (str): Format string for escaping identifiers
21
escape_column_names (bool): Whether to escape column names by default
22
"""
23
24
def __init__(self, **kwargs):
25
pass
26
27
def escape_word(self, word):
28
"""
29
Escape word if it's a reserved word or needs escaping.
30
31
Args:
32
word (str): Word to potentially escape
33
34
Returns:
35
str: Escaped word if necessary, original word otherwise
36
"""
37
pass
38
39
def unescape_word(self, word):
40
"""
41
Unescape escaped word.
42
43
Args:
44
word (str): Potentially escaped word
45
46
Returns:
47
str: Unescaped word
48
"""
49
pass
50
51
def extract_schema_from_table(self, table):
52
"""
53
Extract schema name from table identifier.
54
55
Args:
56
table (str): Table identifier (may include schema)
57
58
Returns:
59
tuple: (schema, table_name) or (None, table_name)
60
"""
61
pass
62
63
def get_column_names(self, table, schema=None):
64
"""
65
Get column names for specified table.
66
67
Args:
68
table (str): Table name
69
schema (str, optional): Schema name
70
71
Returns:
72
list: List of column names
73
"""
74
pass
75
76
def get_target_fields(self, table, schema=None):
77
"""
78
Get target fields for table operations.
79
80
Args:
81
table (str): Table name
82
schema (str, optional): Schema name
83
84
Returns:
85
list: List of target field names
86
"""
87
pass
88
89
def get_primary_keys(self, table, schema=None):
90
"""
91
Get primary key columns for table.
92
93
Args:
94
table (str): Table name
95
schema (str, optional): Schema name
96
97
Returns:
98
list: List of primary key column names
99
"""
100
pass
101
102
def generate_insert_sql(self, table, values, target_fields=None, replace=False, **kwargs):
103
"""
104
Generate INSERT or REPLACE SQL statement.
105
106
Args:
107
table (str): Target table name
108
values (list): List of value tuples to insert
109
target_fields (list, optional): Target column names
110
replace (bool): Use REPLACE instead of INSERT
111
**kwargs: Additional formatting options
112
113
Returns:
114
str: Generated SQL statement
115
"""
116
pass
117
118
def generate_replace_sql(self, table, values, target_fields=None, **kwargs):
119
"""
120
Generate REPLACE SQL statement.
121
122
Args:
123
table (str): Target table name
124
values (list): List of value tuples to replace
125
target_fields (list, optional): Target column names
126
**kwargs: Additional formatting options
127
128
Returns:
129
str: Generated REPLACE SQL statement
130
"""
131
pass
132
```
133
134
## Usage Examples
135
136
### Basic Dialect Usage
137
138
```python
139
from airflow.providers.common.sql.dialects.dialect import Dialect
140
141
# Create dialect instance
142
dialect = Dialect()
143
144
# Escape reserved words or identifiers
145
escaped_table = dialect.escape_word('order') # May become `order` or "order"
146
escaped_column = dialect.escape_word('select') # May become `select` or "select"
147
148
# Extract schema from table identifier
149
schema, table = dialect.extract_schema_from_table('myschema.mytable')
150
# schema = 'myschema', table = 'mytable'
151
152
schema, table = dialect.extract_schema_from_table('mytable')
153
# schema = None, table = 'mytable'
154
```
155
156
### Schema Operations
157
158
```python
159
# Get table metadata
160
columns = dialect.get_column_names('users')
161
# Returns: ['id', 'name', 'email', 'created_at']
162
163
columns_with_schema = dialect.get_column_names('users', schema='public')
164
# Returns column names for public.users table
165
166
primary_keys = dialect.get_primary_keys('users')
167
# Returns: ['id']
168
169
target_fields = dialect.get_target_fields('users')
170
# Returns fields suitable for INSERT operations
171
```
172
173
### SQL Generation
174
175
```python
176
# Generate INSERT statement
177
values = [
178
(1, 'John Doe', 'john@example.com'),
179
(2, 'Jane Smith', 'jane@example.com')
180
]
181
182
insert_sql = dialect.generate_insert_sql(
183
table='users',
184
values=values,
185
target_fields=['id', 'name', 'email']
186
)
187
# Returns: INSERT INTO users (id, name, email) VALUES (%s, %s, %s)
188
189
# Generate REPLACE statement (if supported)
190
replace_sql = dialect.generate_replace_sql(
191
table='users',
192
values=values,
193
target_fields=['id', 'name', 'email']
194
)
195
# Returns: REPLACE INTO users (id, name, email) VALUES (%s, %s, %s)
196
197
# Use replace=True flag in generate_insert_sql
198
replace_sql = dialect.generate_insert_sql(
199
table='users',
200
values=values,
201
target_fields=['id', 'name', 'email'],
202
replace=True
203
)
204
```
205
206
### Custom Dialect Implementation
207
208
```python
209
class PostgreSQLDialect(Dialect):
210
"""PostgreSQL-specific dialect."""
211
212
def __init__(self):
213
super().__init__()
214
self.placeholder = '%s'
215
self.escape_word_format = '"{}"'
216
self.insert_statement_format = 'INSERT INTO {table} ({fields}) VALUES {values}'
217
self.replace_statement_format = '''
218
INSERT INTO {table} ({fields}) VALUES {values}
219
ON CONFLICT ({primary_keys}) DO UPDATE SET {updates}
220
'''
221
222
def generate_replace_sql(self, table, values, target_fields=None, **kwargs):
223
# PostgreSQL uses UPSERT instead of REPLACE
224
primary_keys = self.get_primary_keys(table)
225
226
if not primary_keys:
227
# Fallback to regular INSERT if no primary keys
228
return self.generate_insert_sql(table, values, target_fields)
229
230
# Generate UPSERT statement
231
fields_str = ', '.join(target_fields or self.get_target_fields(table))
232
values_placeholder = ', '.join(['%s'] * len(target_fields or []))
233
234
updates = ', '.join([
235
f'{field} = EXCLUDED.{field}'
236
for field in target_fields
237
if field not in primary_keys
238
])
239
240
return self.replace_statement_format.format(
241
table=table,
242
fields=fields_str,
243
values=f'({values_placeholder})',
244
primary_keys=', '.join(primary_keys),
245
updates=updates
246
)
247
248
class MySQLDialect(Dialect):
249
"""MySQL-specific dialect."""
250
251
def __init__(self):
252
super().__init__()
253
self.placeholder = '%s'
254
self.escape_word_format = '`{}`'
255
self.insert_statement_format = 'INSERT INTO {table} ({fields}) VALUES {values}'
256
self.replace_statement_format = 'REPLACE INTO {table} ({fields}) VALUES {values}'
257
```
258
259
### Advanced Dialect Features
260
261
```python
262
class AdvancedDialect(Dialect):
263
"""Example of advanced dialect features."""
264
265
def __init__(self, connection, **kwargs):
266
super().__init__(**kwargs)
267
self.connection = connection
268
self.reserved_words = {'select', 'from', 'where', 'order', 'group'}
269
270
def escape_word(self, word):
271
"""Custom escaping logic."""
272
if word.lower() in self.reserved_words or ' ' in word:
273
return self.escape_word_format.format(word)
274
return word
275
276
def get_column_names(self, table, schema=None):
277
"""Get columns using database introspection."""
278
full_table = f'{schema}.{table}' if schema else table
279
280
cursor = self.connection.cursor()
281
cursor.execute(f"DESCRIBE {full_table}")
282
columns = [row[0] for row in cursor.fetchall()]
283
cursor.close()
284
285
return columns
286
287
def generate_insert_sql(self, table, values, target_fields=None, replace=False, **kwargs):
288
"""Generate optimized INSERT with batch handling."""
289
if not values:
290
return None
291
292
# Escape table and field names
293
escaped_table = self.escape_word(table)
294
295
if target_fields:
296
escaped_fields = [self.escape_word(field) for field in target_fields]
297
fields_str = ', '.join(escaped_fields)
298
else:
299
fields_str = ', '.join([self.escape_word(f) for f in self.get_target_fields(table)])
300
301
# Generate placeholders for batch insert
302
single_row_placeholder = f"({', '.join([self.placeholder] * len(target_fields or []))})"
303
values_placeholder = ', '.join([single_row_placeholder] * len(values))
304
305
statement_format = self.replace_statement_format if replace else self.insert_statement_format
306
307
return statement_format.format(
308
table=escaped_table,
309
fields=fields_str,
310
values=values_placeholder
311
)
312
```
313
314
### Dialect Integration with Hooks
315
316
```python
317
from airflow.providers.common.sql.hooks.sql import DbApiHook
318
319
class CustomDatabaseHook(DbApiHook):
320
"""Custom hook with dialect support."""
321
322
def __init__(self, *args, **kwargs):
323
super().__init__(*args, **kwargs)
324
# Initialize dialect based on database type
325
self.dialect = self._get_dialect()
326
327
def _get_dialect(self):
328
"""Get appropriate dialect for the database."""
329
conn = self.get_connection(self.conn_id)
330
331
if 'postgres' in conn.conn_type:
332
return PostgreSQLDialect()
333
elif 'mysql' in conn.conn_type:
334
return MySQLDialect()
335
else:
336
return Dialect() # Generic dialect
337
338
def insert_rows(self, table, rows, target_fields=None, commit_every=1000, replace=False):
339
"""Insert rows using dialect-specific SQL generation."""
340
if not rows:
341
return
342
343
# Use dialect to generate appropriate SQL
344
insert_sql = self.dialect.generate_insert_sql(
345
table=table,
346
values=rows,
347
target_fields=target_fields,
348
replace=replace
349
)
350
351
# Execute the generated SQL
352
self.run(insert_sql, parameters=rows, autocommit=True)
353
```
354
355
## Dialect Properties
356
357
### Standard Properties
358
359
- `placeholder`: Parameter placeholder (e.g., '%s', '?', ':1')
360
- `escape_word_format`: Format for escaping identifiers (e.g., '`{}`', '"{}"')
361
- `insert_statement_format`: Template for INSERT statements
362
- `replace_statement_format`: Template for REPLACE/UPSERT statements
363
- `escape_column_names`: Whether to escape column names by default
364
365
### Database-Specific Examples
366
367
**PostgreSQL**: `placeholder='%s'`, `escape_word_format='"{}"'`
368
**MySQL**: `placeholder='%s'`, `escape_word_format='`{}`'`
369
**SQLite**: `placeholder='?'`, `escape_word_format='[{}]'`
370
**SQL Server**: `placeholder='?'`, `escape_word_format='[{}]'`