CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-postgres

PostgreSQL integration provider for Apache Airflow with database hooks, assets, and dialect support.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

data-retrieval.mddocs/

Data Retrieval and DataFrame Operations

Advanced data retrieval capabilities with support for both pandas and polars DataFrames, enabling efficient data manipulation, analysis, and integration with the Python data science ecosystem.

Capabilities

DataFrame Operations

Execute SQL queries and return results as DataFrames with support for multiple DataFrame libraries.

def get_df(
    self, 
    sql: str | list[str], 
    parameters: list | tuple | Mapping[str, Any] | None = None, 
    *, 
    df_type: Literal["pandas", "polars"] = "pandas", 
    **kwargs: Any
) -> PandasDataFrame | PolarsDataFrame:
    """
    Execute SQL and return results as DataFrame.
    
    Parameters:
    - sql: str or list of str, SQL statement(s) to execute
    - parameters: list/tuple/dict, query parameters for binding
    - df_type: str, type of DataFrame to return ("pandas" or "polars")
    - **kwargs: additional arguments passed to pandas/polars read methods
    
    Returns:
    pandas.DataFrame or polars.DataFrame: Query results as DataFrame
    
    Raises:
    AirflowOptionalProviderFeatureException: If required DataFrame library not installed
    """

Pandas DataFrame Support

Returns query results as pandas DataFrames with full pandas integration.

def get_df(
    self, 
    sql: str | list[str], 
    parameters: list | tuple | Mapping[str, Any] | None = None, 
    *, 
    df_type: Literal["pandas"], 
    **kwargs: Any
) -> PandasDataFrame:
    """
    Execute SQL and return pandas DataFrame.
    
    Additional pandas-specific kwargs:
    - index_col: str or list, column(s) to use as row index
    - coerce_float: bool, convert non-numeric values to NaN
    - parse_dates: bool or list, parse date columns
    - chunksize: int, return iterator yielding DataFrames
    """

Polars DataFrame Support

Returns query results as polars DataFrames for high-performance data processing.

def get_df(
    self, 
    sql: str | list[str], 
    parameters: list | tuple | Mapping[str, Any] | None = None, 
    *, 
    df_type: Literal["polars"], 
    **kwargs: Any
) -> PolarsDataFrame:
    """
    Execute SQL and return polars DataFrame.
    
    Additional polars-specific kwargs:
    - schema_overrides: dict, override column data types
    - try_parse_dates: bool, try to parse string columns as dates
    """

Legacy DataFrame Method (Deprecated)

Deprecated method maintained for backward compatibility.

def get_pandas_df(self, sql, parameters=None, **kwargs):
    """
    Execute SQL query and return pandas DataFrame.
    
    DEPRECATED: Use get_df(df_type="pandas") instead.
    
    Parameters:
    - sql: str, SQL query to execute
    - parameters: list/tuple/dict, query parameters
    - **kwargs: additional arguments passed to pandas.read_sql
    
    Returns:
    pandas.DataFrame: Query results
    """

Usage Examples

Basic DataFrame Retrieval

from airflow.providers.postgres.hooks.postgres import PostgresHook

hook = PostgresHook(postgres_conn_id="postgres_default")

# Get pandas DataFrame
df = hook.get_df("SELECT * FROM sales_data WHERE date >= %s", parameters=["2024-01-01"])

# Get polars DataFrame for better performance
df = hook.get_df(
    "SELECT * FROM large_dataset", 
    df_type="polars"
)

Advanced DataFrame Options

# Pandas with custom options
df = hook.get_df(
    "SELECT date_col, amount FROM transactions",
    df_type="pandas",
    parse_dates=["date_col"],
    index_col="date_col"
)

# Polars with schema overrides
df = hook.get_df(
    "SELECT id, price, created_at FROM products",
    df_type="polars",
    schema_overrides={"price": "Float64"},
    try_parse_dates=True
)

Multiple Query Processing

# Execute multiple queries and combine results
queries = [
    "SELECT * FROM users WHERE region = 'US'",
    "SELECT * FROM users WHERE region = 'EU'"
]

df = hook.get_df(queries, df_type="pandas")

Parameterized Queries

# Using list parameters
df = hook.get_df(
    "SELECT * FROM orders WHERE status = %s AND amount > %s",
    parameters=["completed", 100.0]
)

# Using dictionary parameters  
df = hook.get_df(
    "SELECT * FROM products WHERE category = %(cat)s",
    parameters={"cat": "electronics"}
)

Chunked Processing

# Process large datasets in chunks (pandas only)
for chunk_df in hook.get_df(
    "SELECT * FROM massive_table",
    df_type="pandas",
    chunksize=10000
):
    # Process each chunk
    process_chunk(chunk_df)

Dependencies

Required Dependencies

  • Base: psycopg2-binary for PostgreSQL connectivity
  • Pandas: Install with pip install apache-airflow-providers-postgres[pandas]
  • Polars: Install with pip install apache-airflow-providers-postgres[polars]

Version Compatibility

  • pandas: >=2.1.2 (Python < 3.13), >=2.2.3 (Python >= 3.13)
  • polars: >=1.26.0

Type Annotations

from typing import TYPE_CHECKING, Any, Literal, Mapping, overload

if TYPE_CHECKING:
    from pandas import DataFrame as PandasDataFrame
    from polars import DataFrame as PolarsDataFrame

@overload
def get_df(
    self, 
    sql: str | list[str], 
    parameters: list | tuple | Mapping[str, Any] | None = None, 
    *, 
    df_type: Literal["pandas"] = "pandas", 
    **kwargs: Any
) -> PandasDataFrame: ...

@overload  
def get_df(
    self, 
    sql: str | list[str], 
    parameters: list | tuple | Mapping[str, Any] | None = None, 
    *, 
    df_type: Literal["polars"], 
    **kwargs: Any
) -> PolarsDataFrame: ...

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-postgres

docs

asset-management.md

aws-integration.md

bulk-operations.md

data-retrieval.md

database-connection.md

index.md

openlineage-integration.md

schema-operations.md

sql-dialect.md

tile.json