PostgreSQL integration provider for Apache Airflow with database hooks, assets, and dialect support.
npx @tessl/cli install tessl/pypi-apache-airflow-providers-postgres@6.2.0PostgreSQL integration provider for Apache Airflow that enables database connectivity, query execution, and data manipulation through hooks, assets, and SQL dialect support. This package provides comprehensive PostgreSQL integration capabilities including synchronous and asynchronous database connections, bulk data operations, schema introspection, AWS IAM authentication, Redshift support, and OpenLineage integration for data lineage tracking.
pip install apache-airflow-providers-postgresfrom airflow.providers.postgres.hooks.postgres import PostgresHookAsset/dataset handling:
from airflow.providers.postgres.assets.postgres import sanitize_uriSQL dialect:
from airflow.providers.postgres.dialects.postgres import PostgresDialectfrom airflow.providers.postgres.hooks.postgres import PostgresHook
# Initialize hook with connection
hook = PostgresHook(postgres_conn_id="my_postgres_conn")
# Execute queries
records = hook.get_records("SELECT * FROM users WHERE active = %s", parameters=[True])
# Get data as DataFrame
df = hook.get_df("SELECT * FROM sales_data", df_type="pandas")
# Insert rows with upsert capability
hook.insert_rows(
table="users",
rows=[(1, "john", "john@example.com"), (2, "jane", "jane@example.com")],
target_fields=["id", "name", "email"],
replace=True,
replace_index="id"
)
# Bulk load from file
hook.bulk_load("user_imports", "/path/to/data.tsv")The provider is built around several key components:
This architecture enables seamless integration with the broader Airflow ecosystem while providing PostgreSQL-specific optimizations and features.
Core database connectivity, query execution, and transaction management with support for multiple cursor types, SSL configuration, and connection pooling.
class PostgresHook:
def get_conn(self) -> connection: ...
def run(self, sql, autocommit=False, parameters=None, handler=None): ...
def get_records(self, sql, parameters=None): ...
def get_first(self, sql, parameters=None): ...Advanced data retrieval with DataFrame support for both pandas and polars, providing efficient data manipulation and analysis capabilities.
def get_df(
self,
sql: str | list[str],
parameters: list | tuple | Mapping[str, Any] | None = None,
*,
df_type: Literal["pandas", "polars"] = "pandas",
**kwargs: Any
) -> PandasDataFrame | PolarsDataFrame: ...High-performance bulk data operations including file-based loading, dumping, and PostgreSQL COPY command support for efficient data transfer.
def bulk_load(self, table: str, tmp_file: str) -> None: ...
def bulk_dump(self, table: str, tmp_file: str) -> None: ...
def copy_expert(self, sql: str, filename: str) -> None: ...
def insert_rows(
self,
table,
rows,
target_fields=None,
commit_every=1000,
replace=False,
**kwargs
): ...Database schema introspection and metadata operations for analyzing table structures, primary keys, and database organization.
def get_table_primary_key(
self,
table: str,
schema: str | None = "public"
) -> list[str] | None: ...AWS IAM authentication support for RDS PostgreSQL and Amazon Redshift with automatic token management and cross-provider integration.
def get_iam_token(self, conn: Connection) -> tuple[str, str, int]: ...PostgreSQL asset/dataset URI handling with validation, sanitization, and integration with Airflow's data lineage and dependency management systems.
def sanitize_uri(uri: SplitResult) -> SplitResult: ...PostgreSQL-specific SQL dialect implementation providing optimized operations like UPSERT statements and database-specific query generation.
class PostgresDialect:
def generate_replace_sql(self, table, values, target_fields, **kwargs) -> str: ...
def get_primary_keys(self, table: str, schema: str | None = None) -> list[str] | None: ...Data lineage tracking integration with OpenLineage for comprehensive data flow monitoring and compliance requirements.
def get_openlineage_database_info(self, connection) -> DatabaseInfo: ...
def get_openlineage_database_dialect(self, connection) -> str: ...
def get_openlineage_default_schema(self) -> str | None: ...from typing import TypeAlias, Literal, Mapping, Any
from psycopg2.extras import DictCursor, RealDictCursor, NamedTupleCursor
from psycopg2.extensions import connection
# Type aliases used throughout the provider
CursorType: TypeAlias = DictCursor | RealDictCursor | NamedTupleCursor
# DataFrame types (conditional imports)
try:
from pandas import DataFrame as PandasDataFrame
except ImportError:
PandasDataFrame = None
try:
from polars import DataFrame as PolarsDataFrame
except ImportError:
PolarsDataFrame = None