or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

dialects.mdhooks.mdindex.mdoperators.mdsensors.mdtriggers.md

dialects.mddocs/

0

# SQL Dialects

1

2

SQL dialects provide database-specific SQL formatting and operations, abstracting differences between SQL databases including query formatting, data type handling, and schema operations.

3

4

## Capabilities

5

6

### Generic SQL Dialect

7

8

Base dialect implementation providing common SQL formatting and operations.

9

10

```python { .api }

11

class Dialect:

12

"""

13

Generic SQL dialect implementation.

14

15

Attributes:

16

placeholder (str): SQL placeholder character for parameters

17

inspector: SQLAlchemy inspector for schema operations

18

insert_statement_format (str): Format string for INSERT statements

19

replace_statement_format (str): Format string for REPLACE statements

20

escape_word_format (str): Format string for escaping identifiers

21

escape_column_names (bool): Whether to escape column names by default

22

"""

23

24

def __init__(self, **kwargs):

25

pass

26

27

def escape_word(self, word):

28

"""

29

Escape word if it's a reserved word or needs escaping.

30

31

Args:

32

word (str): Word to potentially escape

33

34

Returns:

35

str: Escaped word if necessary, original word otherwise

36

"""

37

pass

38

39

def unescape_word(self, word):

40

"""

41

Unescape escaped word.

42

43

Args:

44

word (str): Potentially escaped word

45

46

Returns:

47

str: Unescaped word

48

"""

49

pass

50

51

def extract_schema_from_table(self, table):

52

"""

53

Extract schema name from table identifier.

54

55

Args:

56

table (str): Table identifier (may include schema)

57

58

Returns:

59

tuple: (schema, table_name) or (None, table_name)

60

"""

61

pass

62

63

def get_column_names(self, table, schema=None):

64

"""

65

Get column names for specified table.

66

67

Args:

68

table (str): Table name

69

schema (str, optional): Schema name

70

71

Returns:

72

list: List of column names

73

"""

74

pass

75

76

def get_target_fields(self, table, schema=None):

77

"""

78

Get target fields for table operations.

79

80

Args:

81

table (str): Table name

82

schema (str, optional): Schema name

83

84

Returns:

85

list: List of target field names

86

"""

87

pass

88

89

def get_primary_keys(self, table, schema=None):

90

"""

91

Get primary key columns for table.

92

93

Args:

94

table (str): Table name

95

schema (str, optional): Schema name

96

97

Returns:

98

list: List of primary key column names

99

"""

100

pass

101

102

def generate_insert_sql(self, table, values, target_fields=None, replace=False, **kwargs):

103

"""

104

Generate INSERT or REPLACE SQL statement.

105

106

Args:

107

table (str): Target table name

108

values (list): List of value tuples to insert

109

target_fields (list, optional): Target column names

110

replace (bool): Use REPLACE instead of INSERT

111

**kwargs: Additional formatting options

112

113

Returns:

114

str: Generated SQL statement

115

"""

116

pass

117

118

def generate_replace_sql(self, table, values, target_fields=None, **kwargs):

119

"""

120

Generate REPLACE SQL statement.

121

122

Args:

123

table (str): Target table name

124

values (list): List of value tuples to replace

125

target_fields (list, optional): Target column names

126

**kwargs: Additional formatting options

127

128

Returns:

129

str: Generated REPLACE SQL statement

130

"""

131

pass

132

```

133

134

## Usage Examples

135

136

### Basic Dialect Usage

137

138

```python

139

from airflow.providers.common.sql.dialects.dialect import Dialect

140

141

# Create dialect instance

142

dialect = Dialect()

143

144

# Escape reserved words or identifiers

145

escaped_table = dialect.escape_word('order') # May become `order` or "order"

146

escaped_column = dialect.escape_word('select') # May become `select` or "select"

147

148

# Extract schema from table identifier

149

schema, table = dialect.extract_schema_from_table('myschema.mytable')

150

# schema = 'myschema', table = 'mytable'

151

152

schema, table = dialect.extract_schema_from_table('mytable')

153

# schema = None, table = 'mytable'

154

```

155

156

### Schema Operations

157

158

```python

159

# Get table metadata

160

columns = dialect.get_column_names('users')

161

# Returns: ['id', 'name', 'email', 'created_at']

162

163

columns_with_schema = dialect.get_column_names('users', schema='public')

164

# Returns column names for public.users table

165

166

primary_keys = dialect.get_primary_keys('users')

167

# Returns: ['id']

168

169

target_fields = dialect.get_target_fields('users')

170

# Returns fields suitable for INSERT operations

171

```

172

173

### SQL Generation

174

175

```python

176

# Generate INSERT statement

177

values = [

178

(1, 'John Doe', 'john@example.com'),

179

(2, 'Jane Smith', 'jane@example.com')

180

]

181

182

insert_sql = dialect.generate_insert_sql(

183

table='users',

184

values=values,

185

target_fields=['id', 'name', 'email']

186

)

187

# Returns: INSERT INTO users (id, name, email) VALUES (%s, %s, %s)

188

189

# Generate REPLACE statement (if supported)

190

replace_sql = dialect.generate_replace_sql(

191

table='users',

192

values=values,

193

target_fields=['id', 'name', 'email']

194

)

195

# Returns: REPLACE INTO users (id, name, email) VALUES (%s, %s, %s)

196

197

# Use replace=True flag in generate_insert_sql

198

replace_sql = dialect.generate_insert_sql(

199

table='users',

200

values=values,

201

target_fields=['id', 'name', 'email'],

202

replace=True

203

)

204

```

205

206

### Custom Dialect Implementation

207

208

```python

209

class PostgreSQLDialect(Dialect):

210

"""PostgreSQL-specific dialect."""

211

212

def __init__(self):

213

super().__init__()

214

self.placeholder = '%s'

215

self.escape_word_format = '"{}"'

216

self.insert_statement_format = 'INSERT INTO {table} ({fields}) VALUES {values}'

217

self.replace_statement_format = '''

218

INSERT INTO {table} ({fields}) VALUES {values}

219

ON CONFLICT ({primary_keys}) DO UPDATE SET {updates}

220

'''

221

222

def generate_replace_sql(self, table, values, target_fields=None, **kwargs):

223

# PostgreSQL uses UPSERT instead of REPLACE

224

primary_keys = self.get_primary_keys(table)

225

226

if not primary_keys:

227

# Fallback to regular INSERT if no primary keys

228

return self.generate_insert_sql(table, values, target_fields)

229

230

# Generate UPSERT statement

231

fields_str = ', '.join(target_fields or self.get_target_fields(table))

232

values_placeholder = ', '.join(['%s'] * len(target_fields or []))

233

234

updates = ', '.join([

235

f'{field} = EXCLUDED.{field}'

236

for field in target_fields

237

if field not in primary_keys

238

])

239

240

return self.replace_statement_format.format(

241

table=table,

242

fields=fields_str,

243

values=f'({values_placeholder})',

244

primary_keys=', '.join(primary_keys),

245

updates=updates

246

)

247

248

class MySQLDialect(Dialect):

249

"""MySQL-specific dialect."""

250

251

def __init__(self):

252

super().__init__()

253

self.placeholder = '%s'

254

self.escape_word_format = '`{}`'

255

self.insert_statement_format = 'INSERT INTO {table} ({fields}) VALUES {values}'

256

self.replace_statement_format = 'REPLACE INTO {table} ({fields}) VALUES {values}'

257

```

258

259

### Advanced Dialect Features

260

261

```python

262

class AdvancedDialect(Dialect):

263

"""Example of advanced dialect features."""

264

265

def __init__(self, connection, **kwargs):

266

super().__init__(**kwargs)

267

self.connection = connection

268

self.reserved_words = {'select', 'from', 'where', 'order', 'group'}

269

270

def escape_word(self, word):

271

"""Custom escaping logic."""

272

if word.lower() in self.reserved_words or ' ' in word:

273

return self.escape_word_format.format(word)

274

return word

275

276

def get_column_names(self, table, schema=None):

277

"""Get columns using database introspection."""

278

full_table = f'{schema}.{table}' if schema else table

279

280

cursor = self.connection.cursor()

281

cursor.execute(f"DESCRIBE {full_table}")

282

columns = [row[0] for row in cursor.fetchall()]

283

cursor.close()

284

285

return columns

286

287

def generate_insert_sql(self, table, values, target_fields=None, replace=False, **kwargs):

288

"""Generate optimized INSERT with batch handling."""

289

if not values:

290

return None

291

292

# Escape table and field names

293

escaped_table = self.escape_word(table)

294

295

if target_fields:

296

escaped_fields = [self.escape_word(field) for field in target_fields]

297

fields_str = ', '.join(escaped_fields)

298

else:

299

fields_str = ', '.join([self.escape_word(f) for f in self.get_target_fields(table)])

300

301

# Generate placeholders for batch insert

302

single_row_placeholder = f"({', '.join([self.placeholder] * len(target_fields or []))})"

303

values_placeholder = ', '.join([single_row_placeholder] * len(values))

304

305

statement_format = self.replace_statement_format if replace else self.insert_statement_format

306

307

return statement_format.format(

308

table=escaped_table,

309

fields=fields_str,

310

values=values_placeholder

311

)

312

```

313

314

### Dialect Integration with Hooks

315

316

```python

317

from airflow.providers.common.sql.hooks.sql import DbApiHook

318

319

class CustomDatabaseHook(DbApiHook):

320

"""Custom hook with dialect support."""

321

322

def __init__(self, *args, **kwargs):

323

super().__init__(*args, **kwargs)

324

# Initialize dialect based on database type

325

self.dialect = self._get_dialect()

326

327

def _get_dialect(self):

328

"""Get appropriate dialect for the database."""

329

conn = self.get_connection(self.conn_id)

330

331

if 'postgres' in conn.conn_type:

332

return PostgreSQLDialect()

333

elif 'mysql' in conn.conn_type:

334

return MySQLDialect()

335

else:

336

return Dialect() # Generic dialect

337

338

def insert_rows(self, table, rows, target_fields=None, commit_every=1000, replace=False):

339

"""Insert rows using dialect-specific SQL generation."""

340

if not rows:

341

return

342

343

# Use dialect to generate appropriate SQL

344

insert_sql = self.dialect.generate_insert_sql(

345

table=table,

346

values=rows,

347

target_fields=target_fields,

348

replace=replace

349

)

350

351

# Execute the generated SQL

352

self.run(insert_sql, parameters=rows, autocommit=True)

353

```

354

355

## Dialect Properties

356

357

### Standard Properties

358

359

- `placeholder`: Parameter placeholder (e.g., '%s', '?', ':1')

360

- `escape_word_format`: Format for escaping identifiers (e.g., '`{}`', '"{}"')

361

- `insert_statement_format`: Template for INSERT statements

362

- `replace_statement_format`: Template for REPLACE/UPSERT statements

363

- `escape_column_names`: Whether to escape column names by default

364

365

### Database-Specific Examples

366

367

**PostgreSQL**: `placeholder='%s'`, `escape_word_format='"{}"'`

368

**MySQL**: `placeholder='%s'`, `escape_word_format='`{}`'`

369

**SQLite**: `placeholder='?'`, `escape_word_format='[{}]'`

370

**SQL Server**: `placeholder='?'`, `escape_word_format='[{}]'`