0
# Database Connection and Query Execution
1
2
Core database connectivity, query execution, and transaction management functionality for PostgreSQL databases through Apache Airflow. Provides connection management, SSL support, custom cursor types, and comprehensive query execution capabilities.
3
4
## Capabilities
5
6
### PostgresHook Class
7
8
Main database hook class that extends DbApiHook with PostgreSQL-specific functionality.
9
10
```python { .api }
11
class PostgresHook(DbApiHook):
12
"""
13
Interact with Postgres.
14
15
Parameters:
16
- postgres_conn_id: str, connection ID reference to PostgreSQL database
17
- options: str | None, command-line options to send to server at connection start
18
- enable_log_db_messages: bool, enable logging of database messages sent to client
19
"""
20
21
# Class attributes
22
conn_name_attr = "postgres_conn_id"
23
default_conn_name = "postgres_default"
24
conn_type = "postgres"
25
hook_name = "Postgres"
26
supports_autocommit = True
27
supports_executemany = True
28
ignored_extra_options = {
29
"iam", "redshift", "redshift-serverless", "cursor",
30
"cluster-identifier", "workgroup-name", "aws_conn_id",
31
"sqlalchemy_scheme", "sqlalchemy_query"
32
}
33
34
def __init__(
35
self,
36
*args,
37
options: str | None = None,
38
enable_log_db_messages: bool = False,
39
**kwargs
40
) -> None: ...
41
```
42
43
### Connection Management
44
45
Establishes and manages database connections with comprehensive configuration support.
46
47
```python { .api }
48
def get_conn(self) -> connection:
49
"""
50
Establishes database connection with support for:
51
- SSL parameters from connection extras
52
- Custom cursor types (dictcursor, realdictcursor, namedtuplecursor)
53
- AWS IAM authentication for RDS/Redshift
54
- Connection options and logging configuration
55
56
Returns:
57
psycopg2 connection object
58
"""
59
60
def get_uri(self) -> str:
61
"""
62
Extract connection URI from connection configuration.
63
64
Returns:
65
str: Connection URI in SQLAlchemy format
66
"""
67
```
68
69
### SQLAlchemy Integration
70
71
Provides SQLAlchemy URL objects for framework integration.
72
73
```python { .api }
74
@property
75
def sqlalchemy_url(self) -> URL:
76
"""
77
Constructs SQLAlchemy URL object for PostgreSQL connection.
78
Includes query parameters from extra configuration.
79
80
Returns:
81
sqlalchemy.engine.URL: SQLAlchemy URL object
82
"""
83
84
@property
85
def dialect_name(self) -> str:
86
"""
87
Database dialect name identifier.
88
89
Returns:
90
str: "postgresql"
91
"""
92
93
@property
94
def dialect(self) -> Dialect:
95
"""
96
Returns dialect implementation for PostgreSQL.
97
98
Returns:
99
PostgresDialect: Dialect implementation instance
100
"""
101
```
102
103
### Query Execution
104
105
Executes SQL statements with parameter binding and transaction control.
106
107
```python { .api }
108
def run(
109
self,
110
sql,
111
autocommit: bool = False,
112
parameters=None,
113
handler=None
114
):
115
"""
116
Execute SQL statement(s).
117
118
Parameters:
119
- sql: str or list of str, SQL statement(s) to execute
120
- autocommit: bool, whether to autocommit the transaction
121
- parameters: list/tuple/dict, query parameters for binding
122
- handler: callable, optional result handler function
123
124
Returns:
125
Query results (if any)
126
"""
127
128
def get_records(self, sql, parameters=None):
129
"""
130
Execute SQL query and return all records.
131
132
Parameters:
133
- sql: str, SQL query to execute
134
- parameters: list/tuple/dict, query parameters for binding
135
136
Returns:
137
list: All records from query result
138
"""
139
140
def get_first(self, sql, parameters=None):
141
"""
142
Execute SQL query and return first record.
143
144
Parameters:
145
- sql: str, SQL query to execute
146
- parameters: list/tuple/dict, query parameters for binding
147
148
Returns:
149
tuple or None: First record from query result
150
"""
151
```
152
153
### Cursor Management
154
155
Manages database cursors with support for different cursor types.
156
157
```python { .api }
158
def get_cursor(self):
159
"""
160
Get database cursor from current connection.
161
Supports custom cursor types configured in connection extras.
162
163
Returns:
164
Database cursor object (DictCursor, RealDictCursor, NamedTupleCursor, or default)
165
"""
166
167
def _get_cursor(self, raw_cursor: str) -> CursorType:
168
"""
169
Internal method to get specific cursor type from string name.
170
171
Parameters:
172
- raw_cursor: str, cursor type name ("dictcursor", "realdictcursor", "namedtuplecursor")
173
174
Returns:
175
CursorType: Configured cursor class
176
"""
177
178
@staticmethod
179
def _serialize_cell(cell: object, conn: connection | None = None) -> Any:
180
"""
181
Internal static method to serialize cell values for database operations.
182
Handles special data type conversions for PostgreSQL compatibility.
183
184
Parameters:
185
- cell: object, data value to serialize
186
- conn: connection | None, optional database connection for context
187
188
Returns:
189
Any: Serialized value suitable for database insertion
190
"""
191
```
192
193
### Transaction Control
194
195
Controls transaction behavior and autocommit settings.
196
197
```python { .api }
198
def set_autocommit(self, conn, autocommit: bool):
199
"""
200
Set autocommit mode for connection.
201
202
Parameters:
203
- conn: database connection object
204
- autocommit: bool, autocommit mode setting
205
"""
206
207
def get_autocommit(self, conn) -> bool:
208
"""
209
Get current autocommit status for connection.
210
211
Parameters:
212
- conn: database connection object
213
214
Returns:
215
bool: Current autocommit status
216
"""
217
```
218
219
### Database Logging
220
221
Manages database message logging for debugging and monitoring.
222
223
```python { .api }
224
def get_db_log_messages(self, conn) -> None:
225
"""
226
Log database messages sent to client during session.
227
Requires enable_log_db_messages=True in constructor.
228
229
Parameters:
230
- conn: database connection object
231
"""
232
```
233
234
### UI Integration
235
236
Provides Airflow UI field behavior configuration for connection forms.
237
238
```python { .api }
239
@classmethod
240
def get_ui_field_behaviour(cls) -> dict[str, Any]:
241
"""
242
Returns Airflow UI field behavior configuration for PostgreSQL connections.
243
Defines form field visibility, requirements, and placeholders.
244
245
Returns:
246
dict: UI field configuration dictionary
247
"""
248
```
249
250
## Types
251
252
```python { .api }
253
from typing import TypeAlias
254
from psycopg2.extras import DictCursor, RealDictCursor, NamedTupleCursor
255
256
CursorType: TypeAlias = DictCursor | RealDictCursor | NamedTupleCursor
257
```
258
259
## Connection Configuration
260
261
### Connection Extra Parameters
262
263
Configure connection behavior through the connection's extra JSON field:
264
265
```json
266
{
267
"sslmode": "require",
268
"sslcert": "/path/to/cert.pem",
269
"sslkey": "/path/to/key.pem",
270
"sslrootcert": "/path/to/ca.pem",
271
"cursor": "dictcursor",
272
"iam": true,
273
"redshift": true,
274
"cluster-identifier": "my-cluster",
275
"aws_conn_id": "aws_default"
276
}
277
```
278
279
### Supported Extra Options
280
281
- **SSL Configuration**: `sslmode`, `sslcert`, `sslkey`, `sslrootcert`, `sslcrl`
282
- **Cursor Type**: `cursor` - "dictcursor", "realdictcursor", "namedtuplecursor"
283
- **AWS IAM**: `iam` (bool), `aws_conn_id` (str)
284
- **Redshift**: `redshift` (bool), `cluster-identifier` (str)
285
- **Redshift Serverless**: `redshift-serverless` (bool), `workgroup-name` (str)
286
- **SQLAlchemy**: `sqlalchemy_scheme` (str), `sqlalchemy_query` (dict)
287
- **Connection Options**: Any valid PostgreSQL connection parameter
288
289
### Default Values
290
291
- **Connection Type**: "postgres"
292
- **Default Connection**: "postgres_default"
293
- **Default Port**: 5432 (Redshift: 5439)
294
- **Default Schema**: "public"