PostgreSQL integration provider for Apache Airflow with database hooks, assets, and dialect support.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Advanced data retrieval capabilities with support for both pandas and polars DataFrames, enabling efficient data manipulation, analysis, and integration with the Python data science ecosystem.
Execute SQL queries and return results as DataFrames with support for multiple DataFrame libraries.
def get_df(
self,
sql: str | list[str],
parameters: list | tuple | Mapping[str, Any] | None = None,
*,
df_type: Literal["pandas", "polars"] = "pandas",
**kwargs: Any
) -> PandasDataFrame | PolarsDataFrame:
"""
Execute SQL and return results as DataFrame.
Parameters:
- sql: str or list of str, SQL statement(s) to execute
- parameters: list/tuple/dict, query parameters for binding
- df_type: str, type of DataFrame to return ("pandas" or "polars")
- **kwargs: additional arguments passed to pandas/polars read methods
Returns:
pandas.DataFrame or polars.DataFrame: Query results as DataFrame
Raises:
AirflowOptionalProviderFeatureException: If required DataFrame library not installed
"""Returns query results as pandas DataFrames with full pandas integration.
def get_df(
self,
sql: str | list[str],
parameters: list | tuple | Mapping[str, Any] | None = None,
*,
df_type: Literal["pandas"],
**kwargs: Any
) -> PandasDataFrame:
"""
Execute SQL and return pandas DataFrame.
Additional pandas-specific kwargs:
- index_col: str or list, column(s) to use as row index
- coerce_float: bool, convert non-numeric values to NaN
- parse_dates: bool or list, parse date columns
- chunksize: int, return iterator yielding DataFrames
"""Returns query results as polars DataFrames for high-performance data processing.
def get_df(
self,
sql: str | list[str],
parameters: list | tuple | Mapping[str, Any] | None = None,
*,
df_type: Literal["polars"],
**kwargs: Any
) -> PolarsDataFrame:
"""
Execute SQL and return polars DataFrame.
Additional polars-specific kwargs:
- schema_overrides: dict, override column data types
- try_parse_dates: bool, try to parse string columns as dates
"""Deprecated method maintained for backward compatibility.
def get_pandas_df(self, sql, parameters=None, **kwargs):
"""
Execute SQL query and return pandas DataFrame.
DEPRECATED: Use get_df(df_type="pandas") instead.
Parameters:
- sql: str, SQL query to execute
- parameters: list/tuple/dict, query parameters
- **kwargs: additional arguments passed to pandas.read_sql
Returns:
pandas.DataFrame: Query results
"""from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(postgres_conn_id="postgres_default")
# Get pandas DataFrame
df = hook.get_df("SELECT * FROM sales_data WHERE date >= %s", parameters=["2024-01-01"])
# Get polars DataFrame for better performance
df = hook.get_df(
"SELECT * FROM large_dataset",
df_type="polars"
)# Pandas with custom options
df = hook.get_df(
"SELECT date_col, amount FROM transactions",
df_type="pandas",
parse_dates=["date_col"],
index_col="date_col"
)
# Polars with schema overrides
df = hook.get_df(
"SELECT id, price, created_at FROM products",
df_type="polars",
schema_overrides={"price": "Float64"},
try_parse_dates=True
)# Execute multiple queries and combine results
queries = [
"SELECT * FROM users WHERE region = 'US'",
"SELECT * FROM users WHERE region = 'EU'"
]
df = hook.get_df(queries, df_type="pandas")# Using list parameters
df = hook.get_df(
"SELECT * FROM orders WHERE status = %s AND amount > %s",
parameters=["completed", 100.0]
)
# Using dictionary parameters
df = hook.get_df(
"SELECT * FROM products WHERE category = %(cat)s",
parameters={"cat": "electronics"}
)# Process large datasets in chunks (pandas only)
for chunk_df in hook.get_df(
"SELECT * FROM massive_table",
df_type="pandas",
chunksize=10000
):
# Process each chunk
process_chunk(chunk_df)psycopg2-binary for PostgreSQL connectivitypip install apache-airflow-providers-postgres[pandas]pip install apache-airflow-providers-postgres[polars]>=2.1.2 (Python < 3.13), >=2.2.3 (Python >= 3.13)>=1.26.0from typing import TYPE_CHECKING, Any, Literal, Mapping, overload
if TYPE_CHECKING:
from pandas import DataFrame as PandasDataFrame
from polars import DataFrame as PolarsDataFrame
@overload
def get_df(
self,
sql: str | list[str],
parameters: list | tuple | Mapping[str, Any] | None = None,
*,
df_type: Literal["pandas"] = "pandas",
**kwargs: Any
) -> PandasDataFrame: ...
@overload
def get_df(
self,
sql: str | list[str],
parameters: list | tuple | Mapping[str, Any] | None = None,
*,
df_type: Literal["polars"],
**kwargs: Any
) -> PolarsDataFrame: ...Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-postgres