0
# Apache Airflow PostgreSQL Provider
1
2
PostgreSQL integration provider for Apache Airflow that enables database connectivity, query execution, and data manipulation through hooks, assets, and SQL dialect support. This package provides comprehensive PostgreSQL integration capabilities including synchronous and asynchronous database connections, bulk data operations, schema introspection, AWS IAM authentication, Redshift support, and OpenLineage integration for data lineage tracking.
3
4
## Package Information
5
6
- **Package Name**: apache-airflow-providers-postgres
7
- **Language**: Python
8
- **Installation**: `pip install apache-airflow-providers-postgres`
9
- **Minimum Airflow Version**: 2.10.0+
10
11
## Core Imports
12
13
```python
14
from airflow.providers.postgres.hooks.postgres import PostgresHook
15
```
16
17
Asset/dataset handling:
18
19
```python
20
from airflow.providers.postgres.assets.postgres import sanitize_uri
21
```
22
23
SQL dialect:
24
25
```python
26
from airflow.providers.postgres.dialects.postgres import PostgresDialect
27
```
28
29
## Basic Usage
30
31
```python
32
from airflow.providers.postgres.hooks.postgres import PostgresHook
33
34
# Initialize hook with connection
35
hook = PostgresHook(postgres_conn_id="my_postgres_conn")
36
37
# Execute queries
38
records = hook.get_records("SELECT * FROM users WHERE active = %s", parameters=[True])
39
40
# Get data as DataFrame
41
df = hook.get_df("SELECT * FROM sales_data", df_type="pandas")
42
43
# Insert rows with upsert capability
44
hook.insert_rows(
45
table="users",
46
rows=[(1, "john", "john@example.com"), (2, "jane", "jane@example.com")],
47
target_fields=["id", "name", "email"],
48
replace=True,
49
replace_index="id"
50
)
51
52
# Bulk load from file
53
hook.bulk_load("user_imports", "/path/to/data.tsv")
54
```
55
56
## Architecture
57
58
The provider is built around several key components:
59
60
- **PostgresHook**: Main database interface extending DbApiHook with PostgreSQL-specific features
61
- **PostgresDialect**: SQL dialect implementation for PostgreSQL-specific operations like UPSERT
62
- **Asset Handler**: URI sanitization and validation for PostgreSQL datasets/assets
63
- **Provider Info**: Metadata registration for Airflow integration
64
65
This architecture enables seamless integration with the broader Airflow ecosystem while providing PostgreSQL-specific optimizations and features.
66
67
## Capabilities
68
69
### Database Connection and Query Execution
70
71
Core database connectivity, query execution, and transaction management with support for multiple cursor types, SSL configuration, and connection pooling.
72
73
```python { .api }
74
class PostgresHook:
75
def get_conn(self) -> connection: ...
76
def run(self, sql, autocommit=False, parameters=None, handler=None): ...
77
def get_records(self, sql, parameters=None): ...
78
def get_first(self, sql, parameters=None): ...
79
```
80
81
[Database Connection](./database-connection.md)
82
83
### Data Retrieval and DataFrame Operations
84
85
Advanced data retrieval with DataFrame support for both pandas and polars, providing efficient data manipulation and analysis capabilities.
86
87
```python { .api }
88
def get_df(
89
self,
90
sql: str | list[str],
91
parameters: list | tuple | Mapping[str, Any] | None = None,
92
*,
93
df_type: Literal["pandas", "polars"] = "pandas",
94
**kwargs: Any
95
) -> PandasDataFrame | PolarsDataFrame: ...
96
```
97
98
[Data Retrieval](./data-retrieval.md)
99
100
### Bulk Operations and Data Loading
101
102
High-performance bulk data operations including file-based loading, dumping, and PostgreSQL COPY command support for efficient data transfer.
103
104
```python { .api }
105
def bulk_load(self, table: str, tmp_file: str) -> None: ...
106
def bulk_dump(self, table: str, tmp_file: str) -> None: ...
107
def copy_expert(self, sql: str, filename: str) -> None: ...
108
def insert_rows(
109
self,
110
table,
111
rows,
112
target_fields=None,
113
commit_every=1000,
114
replace=False,
115
**kwargs
116
): ...
117
```
118
119
[Bulk Operations](./bulk-operations.md)
120
121
### Schema Operations and Introspection
122
123
Database schema introspection and metadata operations for analyzing table structures, primary keys, and database organization.
124
125
```python { .api }
126
def get_table_primary_key(
127
self,
128
table: str,
129
schema: str | None = "public"
130
) -> list[str] | None: ...
131
```
132
133
[Schema Operations](./schema-operations.md)
134
135
### AWS Integration and Authentication
136
137
AWS IAM authentication support for RDS PostgreSQL and Amazon Redshift with automatic token management and cross-provider integration.
138
139
```python { .api }
140
def get_iam_token(self, conn: Connection) -> tuple[str, str, int]: ...
141
```
142
143
[AWS Integration](./aws-integration.md)
144
145
### Asset and Dataset Management
146
147
PostgreSQL asset/dataset URI handling with validation, sanitization, and integration with Airflow's data lineage and dependency management systems.
148
149
```python { .api }
150
def sanitize_uri(uri: SplitResult) -> SplitResult: ...
151
```
152
153
[Asset Management](./asset-management.md)
154
155
### SQL Dialect and Database-Specific Operations
156
157
PostgreSQL-specific SQL dialect implementation providing optimized operations like UPSERT statements and database-specific query generation.
158
159
```python { .api }
160
class PostgresDialect:
161
def generate_replace_sql(self, table, values, target_fields, **kwargs) -> str: ...
162
def get_primary_keys(self, table: str, schema: str | None = None) -> list[str] | None: ...
163
```
164
165
[SQL Dialect](./sql-dialect.md)
166
167
### OpenLineage Integration
168
169
Data lineage tracking integration with OpenLineage for comprehensive data flow monitoring and compliance requirements.
170
171
```python { .api }
172
def get_openlineage_database_info(self, connection) -> DatabaseInfo: ...
173
def get_openlineage_database_dialect(self, connection) -> str: ...
174
def get_openlineage_default_schema(self) -> str | None: ...
175
```
176
177
[OpenLineage Integration](./openlineage-integration.md)
178
179
## Types
180
181
```python { .api }
182
from typing import TypeAlias, Literal, Mapping, Any
183
from psycopg2.extras import DictCursor, RealDictCursor, NamedTupleCursor
184
from psycopg2.extensions import connection
185
186
# Type aliases used throughout the provider
187
CursorType: TypeAlias = DictCursor | RealDictCursor | NamedTupleCursor
188
189
# DataFrame types (conditional imports)
190
try:
191
from pandas import DataFrame as PandasDataFrame
192
except ImportError:
193
PandasDataFrame = None
194
195
try:
196
from polars import DataFrame as PolarsDataFrame
197
except ImportError:
198
PolarsDataFrame = None
199
```