or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

asset-management.mdaws-integration.mdbulk-operations.mddata-retrieval.mddatabase-connection.mdindex.mdopenlineage-integration.mdschema-operations.mdsql-dialect.md
tile.json

tessl/pypi-apache-airflow-providers-postgres

PostgreSQL integration provider for Apache Airflow with database hooks, assets, and dialect support.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-providers-postgres@6.2.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-providers-postgres@6.2.0

index.mddocs/

Apache Airflow PostgreSQL Provider

PostgreSQL integration provider for Apache Airflow that enables database connectivity, query execution, and data manipulation through hooks, assets, and SQL dialect support. This package provides comprehensive PostgreSQL integration capabilities including synchronous and asynchronous database connections, bulk data operations, schema introspection, AWS IAM authentication, Redshift support, and OpenLineage integration for data lineage tracking.

Package Information

  • Package Name: apache-airflow-providers-postgres
  • Language: Python
  • Installation: pip install apache-airflow-providers-postgres
  • Minimum Airflow Version: 2.10.0+

Core Imports

from airflow.providers.postgres.hooks.postgres import PostgresHook

Asset/dataset handling:

from airflow.providers.postgres.assets.postgres import sanitize_uri

SQL dialect:

from airflow.providers.postgres.dialects.postgres import PostgresDialect

Basic Usage

from airflow.providers.postgres.hooks.postgres import PostgresHook

# Initialize hook with connection
hook = PostgresHook(postgres_conn_id="my_postgres_conn")

# Execute queries
records = hook.get_records("SELECT * FROM users WHERE active = %s", parameters=[True])

# Get data as DataFrame
df = hook.get_df("SELECT * FROM sales_data", df_type="pandas")

# Insert rows with upsert capability
hook.insert_rows(
    table="users",
    rows=[(1, "john", "john@example.com"), (2, "jane", "jane@example.com")],
    target_fields=["id", "name", "email"],
    replace=True,
    replace_index="id"
)

# Bulk load from file
hook.bulk_load("user_imports", "/path/to/data.tsv")

Architecture

The provider is built around several key components:

  • PostgresHook: Main database interface extending DbApiHook with PostgreSQL-specific features
  • PostgresDialect: SQL dialect implementation for PostgreSQL-specific operations like UPSERT
  • Asset Handler: URI sanitization and validation for PostgreSQL datasets/assets
  • Provider Info: Metadata registration for Airflow integration

This architecture enables seamless integration with the broader Airflow ecosystem while providing PostgreSQL-specific optimizations and features.

Capabilities

Database Connection and Query Execution

Core database connectivity, query execution, and transaction management with support for multiple cursor types, SSL configuration, and connection pooling.

class PostgresHook:
    def get_conn(self) -> connection: ...
    def run(self, sql, autocommit=False, parameters=None, handler=None): ...
    def get_records(self, sql, parameters=None): ...
    def get_first(self, sql, parameters=None): ...

Database Connection

Data Retrieval and DataFrame Operations

Advanced data retrieval with DataFrame support for both pandas and polars, providing efficient data manipulation and analysis capabilities.

def get_df(
    self, 
    sql: str | list[str], 
    parameters: list | tuple | Mapping[str, Any] | None = None, 
    *, 
    df_type: Literal["pandas", "polars"] = "pandas", 
    **kwargs: Any
) -> PandasDataFrame | PolarsDataFrame: ...

Data Retrieval

Bulk Operations and Data Loading

High-performance bulk data operations including file-based loading, dumping, and PostgreSQL COPY command support for efficient data transfer.

def bulk_load(self, table: str, tmp_file: str) -> None: ...
def bulk_dump(self, table: str, tmp_file: str) -> None: ...
def copy_expert(self, sql: str, filename: str) -> None: ...
def insert_rows(
    self, 
    table, 
    rows, 
    target_fields=None, 
    commit_every=1000, 
    replace=False, 
    **kwargs
): ...

Bulk Operations

Schema Operations and Introspection

Database schema introspection and metadata operations for analyzing table structures, primary keys, and database organization.

def get_table_primary_key(
    self, 
    table: str, 
    schema: str | None = "public"
) -> list[str] | None: ...

Schema Operations

AWS Integration and Authentication

AWS IAM authentication support for RDS PostgreSQL and Amazon Redshift with automatic token management and cross-provider integration.

def get_iam_token(self, conn: Connection) -> tuple[str, str, int]: ...

AWS Integration

Asset and Dataset Management

PostgreSQL asset/dataset URI handling with validation, sanitization, and integration with Airflow's data lineage and dependency management systems.

def sanitize_uri(uri: SplitResult) -> SplitResult: ...

Asset Management

SQL Dialect and Database-Specific Operations

PostgreSQL-specific SQL dialect implementation providing optimized operations like UPSERT statements and database-specific query generation.

class PostgresDialect:
    def generate_replace_sql(self, table, values, target_fields, **kwargs) -> str: ...
    def get_primary_keys(self, table: str, schema: str | None = None) -> list[str] | None: ...

SQL Dialect

OpenLineage Integration

Data lineage tracking integration with OpenLineage for comprehensive data flow monitoring and compliance requirements.

def get_openlineage_database_info(self, connection) -> DatabaseInfo: ...
def get_openlineage_database_dialect(self, connection) -> str: ...
def get_openlineage_default_schema(self) -> str | None: ...

OpenLineage Integration

Types

from typing import TypeAlias, Literal, Mapping, Any
from psycopg2.extras import DictCursor, RealDictCursor, NamedTupleCursor
from psycopg2.extensions import connection

# Type aliases used throughout the provider
CursorType: TypeAlias = DictCursor | RealDictCursor | NamedTupleCursor

# DataFrame types (conditional imports)
try:
    from pandas import DataFrame as PandasDataFrame
except ImportError:
    PandasDataFrame = None

try:
    from polars import DataFrame as PolarsDataFrame  
except ImportError:
    PolarsDataFrame = None