CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-postgres

PostgreSQL integration provider for Apache Airflow with database hooks, assets, and dialect support.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

openlineage-integration.mddocs/

OpenLineage Integration

Data lineage tracking integration with OpenLineage for comprehensive data flow monitoring, compliance requirements, and observability across PostgreSQL and Redshift data operations within Apache Airflow workflows.

Capabilities

Database Information Provider

Provide PostgreSQL/Redshift-specific information for OpenLineage data lineage tracking.

def get_openlineage_database_info(self, connection) -> DatabaseInfo:
    """
    Return PostgreSQL/Redshift-specific information for OpenLineage.
    
    Parameters:
    - connection: database connection object
    
    Returns:
    DatabaseInfo: Database information object containing connection details
    
    Dependencies:
    Requires apache-airflow-providers-openlineage package
    """

Database Dialect Detection

Detect and report the specific database dialect for OpenLineage integration.

def get_openlineage_database_dialect(self, connection) -> str:
    """
    Return database dialect for OpenLineage tracking.
    
    Parameters:
    - connection: database connection object
    
    Returns:
    str: "postgres" for PostgreSQL, "redshift" for Redshift connections
    """

Default Schema Detection

Determine the current default schema for OpenLineage dataset identification.

def get_openlineage_default_schema(self) -> str | None:
    """
    Return current default schema, typically from SEARCH_PATH.
    
    Returns:
    str or None: Current schema name, usually "public" for PostgreSQL
    """

Usage Examples

Basic OpenLineage Integration

from airflow.providers.postgres.hooks.postgres import PostgresHook

# Hook automatically integrates with OpenLineage when package is installed
hook = PostgresHook(postgres_conn_id="postgres_default")

# Database operations are automatically tracked
records = hook.get_records("SELECT * FROM users WHERE active = true")

# OpenLineage captures:
# - Database connection info
# - SQL query executed  
# - Tables accessed
# - Schema information

Explicit Lineage Information

def get_lineage_info():
    """Get OpenLineage information explicitly."""
    hook = PostgresHook(postgres_conn_id="postgres_default")
    
    with hook.get_conn() as conn:
        # Get database info for lineage
        db_info = hook.get_openlineage_database_info(conn)
        dialect = hook.get_openlineage_database_dialect(conn)
        default_schema = hook.get_openlineage_default_schema()
        
        print(f"Database dialect: {dialect}")
        print(f"Default schema: {default_schema}")
        print(f"Database info: {db_info}")

get_lineage_info()

Redshift vs PostgreSQL Detection

def detect_database_type():
    """Detect whether connected to PostgreSQL or Redshift."""
    
    # PostgreSQL connection
    pg_hook = PostgresHook(postgres_conn_id="postgres_default")
    with pg_hook.get_conn() as conn:
        pg_dialect = pg_hook.get_openlineage_database_dialect(conn)
        print(f"PostgreSQL dialect: {pg_dialect}")  # "postgres"
    
    # Redshift connection  
    rs_hook = PostgresHook(postgres_conn_id="redshift_default")
    with rs_hook.get_conn() as conn:
        rs_dialect = rs_hook.get_openlineage_database_dialect(conn)
        print(f"Redshift dialect: {rs_dialect}")  # "redshift"

detect_database_type()

OpenLineage-Aware DAG Development

Data Pipeline with Lineage Tracking

from airflow import DAG
from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime

with DAG(
    "data_pipeline_with_lineage",
    start_date=datetime(2024, 1, 1),
    schedule_interval="@daily"
) as dag:
    
    @task
    def extract_raw_data():
        """Extract raw data - automatically tracked by OpenLineage."""
        hook = PostgresHook(postgres_conn_id="source_db")
        
        # This query is automatically tracked
        data = hook.get_df("""
            SELECT user_id, event_type, timestamp, properties
            FROM raw_events 
            WHERE date = '{{ ds }}'
        """)
        
        return len(data)
    
    @task
    def transform_data():
        """Transform data - lineage captures read/write operations."""
        hook = PostgresHook(postgres_conn_id="warehouse_db")
        
        # OpenLineage tracks both the read and write operations
        hook.run("""
            INSERT INTO processed_events (user_id, event_type, event_date, processed_at)
            SELECT 
                user_id,
                event_type,
                DATE(timestamp) as event_date,
                CURRENT_TIMESTAMP as processed_at
            FROM raw_events 
            WHERE date = '{{ ds }}'
        """)
    
    @task
    def create_aggregates():
        """Create aggregated data - full lineage chain captured."""
        hook = PostgresHook(postgres_conn_id="warehouse_db")
        
        # Complex query with multiple table dependencies
        hook.run("""
            INSERT INTO daily_user_metrics (user_id, event_date, event_count, unique_event_types)
            SELECT 
                user_id,
                event_date,
                COUNT(*) as event_count,
                COUNT(DISTINCT event_type) as unique_event_types
            FROM processed_events
            WHERE event_date = '{{ ds }}'
            GROUP BY user_id, event_date
        """)
    
    # Define task dependencies
    extract_raw_data() >> transform_data() >> create_aggregates()

Cross-Database Lineage

@task
def cross_database_etl():
    """ETL across multiple databases with full lineage tracking."""
    
    # Source database
    source_hook = PostgresHook(postgres_conn_id="app_db")
    
    # Target database  
    target_hook = PostgresHook(postgres_conn_id="analytics_db")
    
    # Extract from source - lineage captures read
    user_data = source_hook.get_df("""
        SELECT u.id, u.name, u.email, u.created_at,
               COUNT(o.id) as order_count,
               SUM(o.total) as lifetime_value
        FROM users u
        LEFT JOIN orders o ON u.id = o.user_id
        WHERE u.updated_at >= '{{ ds }}'
        GROUP BY u.id, u.name, u.email, u.created_at
    """)
    
    # Load to target - lineage captures write
    target_hook.insert_rows(
        table="customer_analytics",
        rows=user_data.values.tolist(),
        target_fields=list(user_data.columns),
        replace=True,
        replace_index="id"
    )
    
    # OpenLineage automatically captures:
    # - Source tables: app_db.public.users, app_db.public.orders
    # - Target table: analytics_db.public.customer_analytics
    # - Data transformation logic
    # - Cross-database relationship

Configuration and Setup

OpenLineage Provider Installation

# Install OpenLineage provider
pip install apache-airflow-providers-openlineage

# Or install with PostgreSQL provider extras
pip install apache-airflow-providers-postgres[openlineage]

Airflow Configuration

Configure OpenLineage in airflow.cfg:

[openlineage]
transport = {"type": "http", "url": "http://marquez:5000"}
namespace = production_data_pipeline
extractors = airflow.providers.postgres.hooks.postgres

Environment Variables

# OpenLineage configuration
export OPENLINEAGE_URL=http://marquez:5000
export OPENLINEAGE_NAMESPACE=production

# Airflow OpenLineage integration
export AIRFLOW__OPENLINEAGE__TRANSPORT={"type": "http", "url": "http://marquez:5000"}

Advanced Lineage Scenarios

Complex Query Lineage

@task  
def complex_analytics_query():
    """Complex query with multiple table joins and CTEs."""
    hook = PostgresHook(postgres_conn_id="analytics_db")
    
    # OpenLineage parses and tracks all table dependencies
    hook.run("""
        WITH user_metrics AS (
            SELECT 
                u.id as user_id,
                u.segment,
                COUNT(DISTINCT o.id) as order_count,
                AVG(o.total) as avg_order_value
            FROM users u
            JOIN orders o ON u.id = o.user_id
            WHERE o.created_at >= '{{ ds }}'
            GROUP BY u.id, u.segment
        ),
        product_metrics AS (
            SELECT 
                p.category,
                COUNT(DISTINCT oi.order_id) as orders_with_category,
                SUM(oi.quantity * oi.price) as category_revenue
            FROM products p
            JOIN order_items oi ON p.id = oi.product_id
            JOIN orders o ON oi.order_id = o.id
            WHERE o.created_at >= '{{ ds }}'
            GROUP BY p.category
        )
        INSERT INTO daily_analytics (
            date, user_segment, product_category,
            total_orders, avg_order_value, category_revenue
        )
        SELECT 
            '{{ ds }}' as date,
            um.segment,
            pm.category,
            um.order_count,
            um.avg_order_value,
            pm.category_revenue
        FROM user_metrics um
        CROSS JOIN product_metrics pm
    """)
    
    # OpenLineage captures dependencies on:
    # - users table
    # - orders table  
    # - products table
    # - order_items table
    # And writes to daily_analytics table

Stored Procedure Lineage

@task
def call_stored_procedure():
    """Call stored procedure - lineage tracks inputs/outputs."""
    hook = PostgresHook(postgres_conn_id="postgres_default")
    
    # OpenLineage can track stored procedure calls
    hook.run("""
        CALL update_customer_segments(
            p_calculation_date := '{{ ds }}',
            p_recalculate_all := false
        )
    """)
    
    # Lineage tracking depends on OpenLineage configuration
    # and stored procedure analysis capabilities

Lineage Data Flow

Automatic Data Capture

OpenLineage integration automatically captures:

  1. Input Datasets: Tables read by queries
  2. Output Datasets: Tables written/updated
  3. Transformation Logic: SQL queries and operations
  4. Schema Information: Column-level lineage where possible
  5. Job Information: Airflow DAG and task context
  6. Connection Details: Database and schema information

Manual Lineage Enhancement

from openlineage.airflow.extractors import OperatorLineage

@task
def enhanced_lineage_task():
    """Task with manually enhanced lineage information."""
    hook = PostgresHook(postgres_conn_id="postgres_default")
    
    # Perform database operations
    result = hook.run("INSERT INTO summary_table SELECT * FROM detail_table")
    
    # Optional: Add custom lineage metadata
    # (Implementation depends on OpenLineage setup)
    
    return result

Dependencies and Requirements

Required Packages

# Core dependencies
apache-airflow-providers-postgres
apache-airflow-providers-openlineage

# Optional for enhanced features
sqlparse  # For SQL parsing and analysis

Version Compatibility

  • OpenLineage Provider: Latest version recommended
  • PostgreSQL Provider: 6.2.3+
  • Airflow: 2.10.0+ (required by PostgreSQL provider)

OpenLineage Backend

Compatible with OpenLineage-compatible backends:

  • Marquez: Open-source lineage backend
  • DataHub: LinkedIn's metadata platform
  • Custom HTTP: Any OpenLineage-compatible API
  • File: Local JSON file output for development

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-postgres

docs

asset-management.md

aws-integration.md

bulk-operations.md

data-retrieval.md

database-connection.md

index.md

openlineage-integration.md

schema-operations.md

sql-dialect.md

tile.json