0
# Database Hooks
1
2
Database hooks provide the foundation for connecting to and interacting with SQL databases in Airflow. The DbApiHook serves as an abstract base class that provides common database operations, while specific database providers extend this class for their particular database systems.
3
4
## Capabilities
5
6
### Core Database Hook
7
8
The primary abstract base class for all SQL database hooks, providing connection management, query execution, and data operations.
9
10
```python { .api }
11
class DbApiHook:
12
"""
13
Abstract base class for SQL hooks that provides common database operations.
14
15
Attributes:
16
conn_name_attr (str): Name of the default connection attribute
17
default_conn_name (str): Default connection ID
18
supports_autocommit (bool): Whether database supports autocommit
19
supports_executemany (bool): Whether database supports executemany
20
placeholder (str): SQL placeholder character for parameters
21
connection: Database connection object
22
sqlalchemy_url: SQLAlchemy URL object
23
inspector: SQLAlchemy inspector for schema operations
24
dialect: Database dialect object for SQL formatting
25
reserved_words: Set of database reserved words
26
"""
27
28
def get_conn(self):
29
"""
30
Get database connection.
31
32
Returns:
33
Database connection object
34
"""
35
36
def get_uri(self):
37
"""
38
Extract URI from connection.
39
40
Returns:
41
str: Database connection URI
42
"""
43
44
def get_sqlalchemy_engine(self, engine_kwargs=None):
45
"""
46
Get SQLAlchemy engine for the connection.
47
48
Args:
49
engine_kwargs (dict, optional): Additional engine parameters
50
51
Returns:
52
sqlalchemy.engine.Engine: SQLAlchemy engine
53
"""
54
55
def get_df(self, sql, parameters=None, **kwargs):
56
"""
57
Execute SQL query and return results as a DataFrame.
58
59
Args:
60
sql (str): SQL query to execute
61
parameters (dict, optional): Query parameters
62
**kwargs: Additional arguments for DataFrame creation
63
64
Returns:
65
pandas.DataFrame or polars.DataFrame: Query results as DataFrame
66
"""
67
68
def get_df_by_chunks(self, sql, parameters=None, chunksize=None, **kwargs):
69
"""
70
Execute SQL query and return results as chunked DataFrames.
71
72
Args:
73
sql (str): SQL query to execute
74
parameters (dict, optional): Query parameters
75
chunksize (int, optional): Number of rows per chunk
76
**kwargs: Additional arguments for DataFrame creation
77
78
Returns:
79
Generator yielding DataFrame chunks
80
"""
81
82
def get_records(self, sql, parameters=None):
83
"""
84
Execute SQL query and return results as list of tuples.
85
86
Args:
87
sql (str): SQL query to execute
88
parameters (dict, optional): Query parameters
89
90
Returns:
91
list: List of result tuples
92
"""
93
94
def get_first(self, sql, parameters=None):
95
"""
96
Execute SQL query and return first row.
97
98
Args:
99
sql (str): SQL query to execute
100
parameters (dict, optional): Query parameters
101
102
Returns:
103
tuple or None: First result row or None if no results
104
"""
105
106
def run(self, sql, autocommit=False, parameters=None, handler=None, split_statements=True, return_last=True):
107
"""
108
Execute SQL command(s) with various options.
109
110
Args:
111
sql (str or list): SQL statement(s) to execute
112
autocommit (bool): Whether to commit automatically
113
parameters (dict, optional): Query parameters
114
handler (callable, optional): Result handler function
115
split_statements (bool): Whether to split multiple statements
116
return_last (bool): Whether to return only last result
117
118
Returns:
119
Any: Query results based on handler and return_last settings
120
"""
121
122
def insert_rows(self, table, rows, target_fields=None, commit_every=1000, replace=False):
123
"""
124
Insert rows into database table.
125
126
Args:
127
table (str): Target table name
128
rows (list): List of row tuples to insert
129
target_fields (list, optional): Target column names
130
commit_every (int): Commit after this many rows
131
replace (bool): Whether to use REPLACE instead of INSERT
132
"""
133
134
def bulk_dump(self, table, tmp_file):
135
"""
136
Dump table contents to file.
137
138
Args:
139
table (str): Table name to dump
140
tmp_file (str): Target file path
141
"""
142
143
def bulk_load(self, table, tmp_file):
144
"""
145
Load file contents into table.
146
147
Args:
148
table (str): Target table name
149
tmp_file (str): Source file path
150
"""
151
152
def test_connection(self):
153
"""
154
Test database connection.
155
156
Returns:
157
tuple: (success: bool, message: str)
158
"""
159
```
160
161
### Database Connection Protocol
162
163
Protocol interface defining the expected database connection interface.
164
165
```python { .api }
166
class ConnectorProtocol:
167
"""
168
Protocol defining database connection interface.
169
"""
170
171
def connect(self, host: str, port: int, username: str, schema: str):
172
"""
173
Connect to database.
174
175
Args:
176
host (str): Database host
177
port (int): Database port
178
username (str): Username for connection
179
schema (str): Database schema/name
180
181
Returns:
182
Database connection object
183
"""
184
```
185
186
### Result Handler Functions
187
188
Pre-built handler functions for processing query results.
189
190
```python { .api }
191
def fetch_all_handler(cursor):
192
"""
193
Handler to fetch all query results.
194
195
Args:
196
cursor: Database cursor object
197
198
Returns:
199
list: All query results
200
"""
201
202
def fetch_one_handler(cursor):
203
"""
204
Handler to fetch first query result.
205
206
Args:
207
cursor: Database cursor object
208
209
Returns:
210
Any: First query result or None
211
"""
212
213
def return_single_query_results(sql, return_last, split_statements, result):
214
"""
215
Determine when to return single vs multiple query results.
216
217
Args:
218
sql (str or list): Original SQL statement(s)
219
return_last (bool): Whether to return only last result
220
split_statements (bool): Whether statements were split
221
result: Query results
222
223
Returns:
224
Any: Processed results based on parameters
225
"""
226
```
227
228
### Constants
229
230
```python { .api }
231
# Set of supported SQL placeholders
232
SQL_PLACEHOLDERS: frozenset = frozenset({"%s", "?"})
233
```
234
235
## Usage Examples
236
237
### Basic Database Operations
238
239
```python
240
from airflow.providers.common.sql.hooks.sql import DbApiHook
241
242
# Custom hook extending DbApiHook
243
class MyDatabaseHook(DbApiHook):
244
conn_name_attr = 'my_conn_id'
245
default_conn_name = 'my_default_conn'
246
supports_autocommit = True
247
248
def get_conn(self):
249
# Implementation specific to your database
250
pass
251
252
# Use the hook
253
hook = MyDatabaseHook(conn_id='my_database')
254
255
# Execute query and get records
256
results = hook.get_records('SELECT * FROM users WHERE active = %s', parameters=[True])
257
258
# Execute query and get DataFrame
259
df = hook.get_df('SELECT name, email FROM users LIMIT 10')
260
261
# Insert data
262
rows = [('John', 'john@example.com'), ('Jane', 'jane@example.com')]
263
hook.insert_rows('users', rows, target_fields=['name', 'email'])
264
265
# Test connection
266
success, message = hook.test_connection()
267
```
268
269
### Using Result Handlers
270
271
```python
272
from airflow.providers.common.sql.hooks.handlers import fetch_all_handler, fetch_one_handler
273
274
# Execute with custom handler
275
result = hook.run(
276
'SELECT COUNT(*) FROM orders WHERE date = %s',
277
parameters=['2023-01-01'],
278
handler=fetch_one_handler
279
)
280
281
# Get only the count value
282
count = result[0] if result else 0
283
```