PostgreSQL integration provider for Apache Airflow with database hooks, assets, and dialect support.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Data lineage tracking integration with OpenLineage for comprehensive data flow monitoring, compliance requirements, and observability across PostgreSQL and Redshift data operations within Apache Airflow workflows.
Provide PostgreSQL/Redshift-specific information for OpenLineage data lineage tracking.
def get_openlineage_database_info(self, connection) -> DatabaseInfo:
"""
Return PostgreSQL/Redshift-specific information for OpenLineage.
Parameters:
- connection: database connection object
Returns:
DatabaseInfo: Database information object containing connection details
Dependencies:
Requires apache-airflow-providers-openlineage package
"""Detect and report the specific database dialect for OpenLineage integration.
def get_openlineage_database_dialect(self, connection) -> str:
"""
Return database dialect for OpenLineage tracking.
Parameters:
- connection: database connection object
Returns:
str: "postgres" for PostgreSQL, "redshift" for Redshift connections
"""Determine the current default schema for OpenLineage dataset identification.
def get_openlineage_default_schema(self) -> str | None:
"""
Return current default schema, typically from SEARCH_PATH.
Returns:
str or None: Current schema name, usually "public" for PostgreSQL
"""from airflow.providers.postgres.hooks.postgres import PostgresHook
# Hook automatically integrates with OpenLineage when package is installed
hook = PostgresHook(postgres_conn_id="postgres_default")
# Database operations are automatically tracked
records = hook.get_records("SELECT * FROM users WHERE active = true")
# OpenLineage captures:
# - Database connection info
# - SQL query executed
# - Tables accessed
# - Schema informationdef get_lineage_info():
"""Get OpenLineage information explicitly."""
hook = PostgresHook(postgres_conn_id="postgres_default")
with hook.get_conn() as conn:
# Get database info for lineage
db_info = hook.get_openlineage_database_info(conn)
dialect = hook.get_openlineage_database_dialect(conn)
default_schema = hook.get_openlineage_default_schema()
print(f"Database dialect: {dialect}")
print(f"Default schema: {default_schema}")
print(f"Database info: {db_info}")
get_lineage_info()def detect_database_type():
"""Detect whether connected to PostgreSQL or Redshift."""
# PostgreSQL connection
pg_hook = PostgresHook(postgres_conn_id="postgres_default")
with pg_hook.get_conn() as conn:
pg_dialect = pg_hook.get_openlineage_database_dialect(conn)
print(f"PostgreSQL dialect: {pg_dialect}") # "postgres"
# Redshift connection
rs_hook = PostgresHook(postgres_conn_id="redshift_default")
with rs_hook.get_conn() as conn:
rs_dialect = rs_hook.get_openlineage_database_dialect(conn)
print(f"Redshift dialect: {rs_dialect}") # "redshift"
detect_database_type()from airflow import DAG
from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime
with DAG(
"data_pipeline_with_lineage",
start_date=datetime(2024, 1, 1),
schedule_interval="@daily"
) as dag:
@task
def extract_raw_data():
"""Extract raw data - automatically tracked by OpenLineage."""
hook = PostgresHook(postgres_conn_id="source_db")
# This query is automatically tracked
data = hook.get_df("""
SELECT user_id, event_type, timestamp, properties
FROM raw_events
WHERE date = '{{ ds }}'
""")
return len(data)
@task
def transform_data():
"""Transform data - lineage captures read/write operations."""
hook = PostgresHook(postgres_conn_id="warehouse_db")
# OpenLineage tracks both the read and write operations
hook.run("""
INSERT INTO processed_events (user_id, event_type, event_date, processed_at)
SELECT
user_id,
event_type,
DATE(timestamp) as event_date,
CURRENT_TIMESTAMP as processed_at
FROM raw_events
WHERE date = '{{ ds }}'
""")
@task
def create_aggregates():
"""Create aggregated data - full lineage chain captured."""
hook = PostgresHook(postgres_conn_id="warehouse_db")
# Complex query with multiple table dependencies
hook.run("""
INSERT INTO daily_user_metrics (user_id, event_date, event_count, unique_event_types)
SELECT
user_id,
event_date,
COUNT(*) as event_count,
COUNT(DISTINCT event_type) as unique_event_types
FROM processed_events
WHERE event_date = '{{ ds }}'
GROUP BY user_id, event_date
""")
# Define task dependencies
extract_raw_data() >> transform_data() >> create_aggregates()@task
def cross_database_etl():
"""ETL across multiple databases with full lineage tracking."""
# Source database
source_hook = PostgresHook(postgres_conn_id="app_db")
# Target database
target_hook = PostgresHook(postgres_conn_id="analytics_db")
# Extract from source - lineage captures read
user_data = source_hook.get_df("""
SELECT u.id, u.name, u.email, u.created_at,
COUNT(o.id) as order_count,
SUM(o.total) as lifetime_value
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
WHERE u.updated_at >= '{{ ds }}'
GROUP BY u.id, u.name, u.email, u.created_at
""")
# Load to target - lineage captures write
target_hook.insert_rows(
table="customer_analytics",
rows=user_data.values.tolist(),
target_fields=list(user_data.columns),
replace=True,
replace_index="id"
)
# OpenLineage automatically captures:
# - Source tables: app_db.public.users, app_db.public.orders
# - Target table: analytics_db.public.customer_analytics
# - Data transformation logic
# - Cross-database relationship# Install OpenLineage provider
pip install apache-airflow-providers-openlineage
# Or install with PostgreSQL provider extras
pip install apache-airflow-providers-postgres[openlineage]Configure OpenLineage in airflow.cfg:
[openlineage]
transport = {"type": "http", "url": "http://marquez:5000"}
namespace = production_data_pipeline
extractors = airflow.providers.postgres.hooks.postgres# OpenLineage configuration
export OPENLINEAGE_URL=http://marquez:5000
export OPENLINEAGE_NAMESPACE=production
# Airflow OpenLineage integration
export AIRFLOW__OPENLINEAGE__TRANSPORT={"type": "http", "url": "http://marquez:5000"}@task
def complex_analytics_query():
"""Complex query with multiple table joins and CTEs."""
hook = PostgresHook(postgres_conn_id="analytics_db")
# OpenLineage parses and tracks all table dependencies
hook.run("""
WITH user_metrics AS (
SELECT
u.id as user_id,
u.segment,
COUNT(DISTINCT o.id) as order_count,
AVG(o.total) as avg_order_value
FROM users u
JOIN orders o ON u.id = o.user_id
WHERE o.created_at >= '{{ ds }}'
GROUP BY u.id, u.segment
),
product_metrics AS (
SELECT
p.category,
COUNT(DISTINCT oi.order_id) as orders_with_category,
SUM(oi.quantity * oi.price) as category_revenue
FROM products p
JOIN order_items oi ON p.id = oi.product_id
JOIN orders o ON oi.order_id = o.id
WHERE o.created_at >= '{{ ds }}'
GROUP BY p.category
)
INSERT INTO daily_analytics (
date, user_segment, product_category,
total_orders, avg_order_value, category_revenue
)
SELECT
'{{ ds }}' as date,
um.segment,
pm.category,
um.order_count,
um.avg_order_value,
pm.category_revenue
FROM user_metrics um
CROSS JOIN product_metrics pm
""")
# OpenLineage captures dependencies on:
# - users table
# - orders table
# - products table
# - order_items table
# And writes to daily_analytics table@task
def call_stored_procedure():
"""Call stored procedure - lineage tracks inputs/outputs."""
hook = PostgresHook(postgres_conn_id="postgres_default")
# OpenLineage can track stored procedure calls
hook.run("""
CALL update_customer_segments(
p_calculation_date := '{{ ds }}',
p_recalculate_all := false
)
""")
# Lineage tracking depends on OpenLineage configuration
# and stored procedure analysis capabilitiesOpenLineage integration automatically captures:
from openlineage.airflow.extractors import OperatorLineage
@task
def enhanced_lineage_task():
"""Task with manually enhanced lineage information."""
hook = PostgresHook(postgres_conn_id="postgres_default")
# Perform database operations
result = hook.run("INSERT INTO summary_table SELECT * FROM detail_table")
# Optional: Add custom lineage metadata
# (Implementation depends on OpenLineage setup)
return result# Core dependencies
apache-airflow-providers-postgres
apache-airflow-providers-openlineage
# Optional for enhanced features
sqlparse # For SQL parsing and analysisCompatible with OpenLineage-compatible backends:
Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-postgres