0
# Data Retrieval and DataFrame Operations
1
2
Advanced data retrieval capabilities with support for both pandas and polars DataFrames, enabling efficient data manipulation, analysis, and integration with the Python data science ecosystem.
3
4
## Capabilities
5
6
### DataFrame Operations
7
8
Execute SQL queries and return results as DataFrames with support for multiple DataFrame libraries.
9
10
```python { .api }
11
def get_df(
12
self,
13
sql: str | list[str],
14
parameters: list | tuple | Mapping[str, Any] | None = None,
15
*,
16
df_type: Literal["pandas", "polars"] = "pandas",
17
**kwargs: Any
18
) -> PandasDataFrame | PolarsDataFrame:
19
"""
20
Execute SQL and return results as DataFrame.
21
22
Parameters:
23
- sql: str or list of str, SQL statement(s) to execute
24
- parameters: list/tuple/dict, query parameters for binding
25
- df_type: str, type of DataFrame to return ("pandas" or "polars")
26
- **kwargs: additional arguments passed to pandas/polars read methods
27
28
Returns:
29
pandas.DataFrame or polars.DataFrame: Query results as DataFrame
30
31
Raises:
32
AirflowOptionalProviderFeatureException: If required DataFrame library not installed
33
"""
34
```
35
36
### Pandas DataFrame Support
37
38
Returns query results as pandas DataFrames with full pandas integration.
39
40
```python { .api }
41
def get_df(
42
self,
43
sql: str | list[str],
44
parameters: list | tuple | Mapping[str, Any] | None = None,
45
*,
46
df_type: Literal["pandas"],
47
**kwargs: Any
48
) -> PandasDataFrame:
49
"""
50
Execute SQL and return pandas DataFrame.
51
52
Additional pandas-specific kwargs:
53
- index_col: str or list, column(s) to use as row index
54
- coerce_float: bool, convert non-numeric values to NaN
55
- parse_dates: bool or list, parse date columns
56
- chunksize: int, return iterator yielding DataFrames
57
"""
58
```
59
60
### Polars DataFrame Support
61
62
Returns query results as polars DataFrames for high-performance data processing.
63
64
```python { .api }
65
def get_df(
66
self,
67
sql: str | list[str],
68
parameters: list | tuple | Mapping[str, Any] | None = None,
69
*,
70
df_type: Literal["polars"],
71
**kwargs: Any
72
) -> PolarsDataFrame:
73
"""
74
Execute SQL and return polars DataFrame.
75
76
Additional polars-specific kwargs:
77
- schema_overrides: dict, override column data types
78
- try_parse_dates: bool, try to parse string columns as dates
79
"""
80
```
81
82
### Legacy DataFrame Method (Deprecated)
83
84
Deprecated method maintained for backward compatibility.
85
86
```python { .api }
87
def get_pandas_df(self, sql, parameters=None, **kwargs):
88
"""
89
Execute SQL query and return pandas DataFrame.
90
91
DEPRECATED: Use get_df(df_type="pandas") instead.
92
93
Parameters:
94
- sql: str, SQL query to execute
95
- parameters: list/tuple/dict, query parameters
96
- **kwargs: additional arguments passed to pandas.read_sql
97
98
Returns:
99
pandas.DataFrame: Query results
100
"""
101
```
102
103
## Usage Examples
104
105
### Basic DataFrame Retrieval
106
107
```python
108
from airflow.providers.postgres.hooks.postgres import PostgresHook
109
110
hook = PostgresHook(postgres_conn_id="postgres_default")
111
112
# Get pandas DataFrame
113
df = hook.get_df("SELECT * FROM sales_data WHERE date >= %s", parameters=["2024-01-01"])
114
115
# Get polars DataFrame for better performance
116
df = hook.get_df(
117
"SELECT * FROM large_dataset",
118
df_type="polars"
119
)
120
```
121
122
### Advanced DataFrame Options
123
124
```python
125
# Pandas with custom options
126
df = hook.get_df(
127
"SELECT date_col, amount FROM transactions",
128
df_type="pandas",
129
parse_dates=["date_col"],
130
index_col="date_col"
131
)
132
133
# Polars with schema overrides
134
df = hook.get_df(
135
"SELECT id, price, created_at FROM products",
136
df_type="polars",
137
schema_overrides={"price": "Float64"},
138
try_parse_dates=True
139
)
140
```
141
142
### Multiple Query Processing
143
144
```python
145
# Execute multiple queries and combine results
146
queries = [
147
"SELECT * FROM users WHERE region = 'US'",
148
"SELECT * FROM users WHERE region = 'EU'"
149
]
150
151
df = hook.get_df(queries, df_type="pandas")
152
```
153
154
### Parameterized Queries
155
156
```python
157
# Using list parameters
158
df = hook.get_df(
159
"SELECT * FROM orders WHERE status = %s AND amount > %s",
160
parameters=["completed", 100.0]
161
)
162
163
# Using dictionary parameters
164
df = hook.get_df(
165
"SELECT * FROM products WHERE category = %(cat)s",
166
parameters={"cat": "electronics"}
167
)
168
```
169
170
### Chunked Processing
171
172
```python
173
# Process large datasets in chunks (pandas only)
174
for chunk_df in hook.get_df(
175
"SELECT * FROM massive_table",
176
df_type="pandas",
177
chunksize=10000
178
):
179
# Process each chunk
180
process_chunk(chunk_df)
181
```
182
183
## Dependencies
184
185
### Required Dependencies
186
187
- **Base**: `psycopg2-binary` for PostgreSQL connectivity
188
- **Pandas**: Install with `pip install apache-airflow-providers-postgres[pandas]`
189
- **Polars**: Install with `pip install apache-airflow-providers-postgres[polars]`
190
191
### Version Compatibility
192
193
- **pandas**: `>=2.1.2` (Python < 3.13), `>=2.2.3` (Python >= 3.13)
194
- **polars**: `>=1.26.0`
195
196
## Type Annotations
197
198
```python { .api }
199
from typing import TYPE_CHECKING, Any, Literal, Mapping, overload
200
201
if TYPE_CHECKING:
202
from pandas import DataFrame as PandasDataFrame
203
from polars import DataFrame as PolarsDataFrame
204
205
@overload
206
def get_df(
207
self,
208
sql: str | list[str],
209
parameters: list | tuple | Mapping[str, Any] | None = None,
210
*,
211
df_type: Literal["pandas"] = "pandas",
212
**kwargs: Any
213
) -> PandasDataFrame: ...
214
215
@overload
216
def get_df(
217
self,
218
sql: str | list[str],
219
parameters: list | tuple | Mapping[str, Any] | None = None,
220
*,
221
df_type: Literal["polars"],
222
**kwargs: Any
223
) -> PolarsDataFrame: ...
224
```