CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-postgres

PostgreSQL integration provider for Apache Airflow with database hooks, assets, and dialect support.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

asset-management.mddocs/

Asset and Dataset Management

PostgreSQL asset/dataset URI handling with validation, sanitization, and integration with Airflow's data lineage and dependency management systems. Supports both legacy dataset terminology and modern asset terminology.

Capabilities

URI Sanitization

Sanitize and validate PostgreSQL asset/dataset URIs for consistency and proper format.

def sanitize_uri(uri: SplitResult) -> SplitResult:
    """
    Sanitize PostgreSQL asset/dataset URIs.
    
    Parameters:
    - uri: SplitResult, parsed URI components from urllib.parse.urlsplit
    
    Returns:
    SplitResult: Sanitized URI components with validated structure
    
    URI Format:
    postgres://host:port/database/schema/table
    postgresql://host:port/database/schema/table
    
    Validation:
    - Ensures URI contains host, database, schema, and table components
    - Adds default port 5432 if not specified
    - Validates path structure matches /database/schema/table format
    
    Raises:
    ValueError: If URI structure is invalid or missing required components
    """

Usage Examples

Basic URI Sanitization

import urllib.parse
from airflow.providers.postgres.assets.postgres import sanitize_uri

# Parse and sanitize PostgreSQL URI
uri_str = "postgres://localhost/mydb/public/users"
parsed_uri = urllib.parse.urlsplit(uri_str)
sanitized_uri = sanitize_uri(parsed_uri)

print(f"Original: {uri_str}")
print(f"Sanitized: {urllib.parse.urlunsplit(sanitized_uri)}")
# Output: postgres://localhost:5432/mydb/public/users

Asset Definition in DAGs

from airflow import DAG
from airflow.datasets import Dataset
from airflow.providers.postgres.assets.postgres import sanitize_uri
import urllib.parse

# Define PostgreSQL dataset/asset
def create_postgres_asset(host, database, schema, table, port=5432):
    uri_str = f"postgres://{host}:{port}/{database}/{schema}/{table}"
    parsed_uri = urllib.parse.urlsplit(uri_str)
    sanitized_uri = sanitize_uri(parsed_uri)
    return Dataset(urllib.parse.urlunsplit(sanitized_uri))

# Create assets for data pipeline
users_table = create_postgres_asset("db.example.com", "app_db", "public", "users")
orders_table = create_postgres_asset("db.example.com", "app_db", "public", "orders")
analytics_table = create_postgres_asset("warehouse.example.com", "analytics", "marts", "user_orders")

# Use in DAG with data dependencies
with DAG("data_pipeline", schedule=None) as dag:
    
    @task(outlets=[users_table])
    def extract_users():
        # Extract user data
        pass
    
    @task(inlets=[users_table], outlets=[orders_table])
    def extract_orders():
        # Extract order data
        pass
    
    @task(inlets=[users_table, orders_table], outlets=[analytics_table])
    def create_analytics():
        # Create analytics table
        pass

Asset URI Validation

def validate_postgres_asset(uri_string):
    """Validate PostgreSQL asset URI format."""
    try:
        parsed_uri = urllib.parse.urlsplit(uri_string)
        sanitized_uri = sanitize_uri(parsed_uri)
        
        print(f"✓ Valid PostgreSQL asset URI: {urllib.parse.urlunsplit(sanitized_uri)}")
        return True
        
    except Exception as e:
        print(f"✗ Invalid PostgreSQL asset URI: {e}")
        return False

# Test various URI formats
test_uris = [
    "postgres://localhost/mydb/public/users",           # Valid
    "postgresql://host:5432/db/schema/table",           # Valid
    "postgres://host/database/schema",                  # Invalid - missing table
    "postgres://host/database",                         # Invalid - missing schema/table
    "mysql://host/database/table"                       # Invalid - wrong scheme
]

for uri in test_uris:
    validate_postgres_asset(uri)

Cross-Database Asset Dependencies

# Define assets across different databases
source_table = create_postgres_asset("source-db", "raw_data", "public", "events")
staging_table = create_postgres_asset("staging-db", "staging", "events", "processed_events")
warehouse_table = create_postgres_asset("warehouse-db", "analytics", "facts", "event_summary")

with DAG("cross_db_pipeline", schedule=[source_table]) as dag:
    
    @task(inlets=[source_table], outlets=[staging_table])
    def stage_events():
        # Move data from source to staging
        source_hook = PostgresHook(postgres_conn_id="source_db")
        staging_hook = PostgresHook(postgres_conn_id="staging_db")
        
        # Extract from source
        data = source_hook.get_df("SELECT * FROM events WHERE processed = false")
        
        # Load to staging
        staging_hook.insert_rows(
            "processed_events",
            data.values.tolist(),
            target_fields=list(data.columns)
        )
    
    @task(inlets=[staging_table], outlets=[warehouse_table])
    def aggregate_events():
        # Aggregate staging data to warehouse
        pass

Asset URI Formats

Supported Schemes

  • postgres: PostgreSQL URI scheme
  • postgresql: Alternative PostgreSQL URI scheme

Both schemes are handled identically by the sanitize_uri function.

URI Structure

postgres://[user[:password]@]host[:port]/database/schema/table

Components

  • scheme: postgres or postgresql
  • user: Optional database username
  • password: Optional database password
  • host: Database hostname or IP address (required)
  • port: Database port (defaults to 5432)
  • database: Database name (required)
  • schema: Schema name (required)
  • table: Table name (required)

Examples of Valid URIs

valid_uris = [
    "postgres://localhost:5432/myapp/public/users",
    "postgresql://db.example.com/warehouse/sales/orders",
    "postgres://user:pass@host:5433/db/schema/table",
    "postgres://readonly@analytics-db/reports/monthly/revenue"
]

Integration with Airflow Assets

Asset-Aware Task Definition

from airflow.decorators import task
from airflow.datasets import Dataset

# Define PostgreSQL assets
customer_data = Dataset("postgres://db/crm/public/customers")
order_data = Dataset("postgres://db/sales/public/orders")
report_data = Dataset("postgres://warehouse/reports/public/daily_summary")

@task(outlets=[customer_data])
def sync_customers():
    """Sync customer data from external source."""
    hook = PostgresHook()
    # Sync logic here
    pass

@task(inlets=[customer_data, order_data], outlets=[report_data])  
def generate_daily_report():
    """Generate daily report from customer and order data."""
    hook = PostgresHook()
    # Report generation logic here
    pass

Dataset-Triggered DAGs

# DAG that runs when PostgreSQL assets are updated
with DAG(
    "report_generator",
    schedule=[
        Dataset("postgres://db/sales/public/orders"),
        Dataset("postgres://db/crm/public/customers")
    ]
) as report_dag:
    
    generate_reports_task = generate_daily_report()

Asset Lineage Tracking

def track_data_lineage():
    """Example of tracking data lineage with PostgreSQL assets."""
    
    # Source data assets
    raw_events = Dataset("postgres://source/raw/public/events")
    raw_users = Dataset("postgres://source/raw/public/users")
    
    # Intermediate processing assets
    clean_events = Dataset("postgres://staging/clean/public/events")
    enriched_events = Dataset("postgres://staging/enriched/public/events")
    
    # Final output assets
    user_metrics = Dataset("postgres://warehouse/metrics/public/user_activity")
    
    # Define processing pipeline with clear lineage
    with DAG("event_processing_pipeline") as dag:
        
        @task(inlets=[raw_events], outlets=[clean_events])
        def clean_events_task():
            # Data cleaning logic
            pass
        
        @task(inlets=[clean_events, raw_users], outlets=[enriched_events])
        def enrich_events_task():
            # Data enrichment logic
            pass
        
        @task(inlets=[enriched_events], outlets=[user_metrics])
        def calculate_metrics_task():
            # Metrics calculation logic
            pass

Provider Registration

The PostgreSQL provider automatically registers asset URI handlers with Airflow:

Asset URI Registration

# From provider.yaml - automatically registered
"asset-uris":
  - schemes: [postgres, postgresql]
    handler: airflow.providers.postgres.assets.postgres.sanitize_uri

# Legacy dataset URI support (backward compatibility)
"dataset-uris":
  - schemes: [postgres, postgresql] 
    handler: airflow.providers.postgres.assets.postgres.sanitize_uri

Handler Function

The sanitize_uri function is automatically called by Airflow when:

  • Creating Dataset/Asset objects with postgres:// or postgresql:// URIs
  • Validating asset dependencies in DAGs
  • Processing asset lineage information
  • Comparing asset URIs for dependency resolution

Error Handling

Common URI Validation Errors

def handle_uri_errors():
    """Handle common PostgreSQL asset URI errors."""
    
    problematic_uris = [
        "postgres://host/db",                    # Missing schema/table
        "postgres://host/db/schema",             # Missing table  
        "postgres:///db/schema/table",           # Missing host
        "mysql://host/db/schema/table"           # Wrong scheme
    ]
    
    for uri_str in problematic_uris:
        try:
            parsed = urllib.parse.urlsplit(uri_str)
            sanitized = sanitize_uri(parsed)
            print(f"✓ {uri_str}")
        except Exception as e:
            print(f"✗ {uri_str}: {e}")

Best Practices

def create_safe_postgres_asset(host, database, schema, table, port=5432):
    """Safely create PostgreSQL asset with validation."""
    
    # Validate inputs
    if not all([host, database, schema, table]):
        raise ValueError("All components (host, database, schema, table) are required")
    
    # Construct URI
    uri_str = f"postgres://{host}:{port}/{database}/{schema}/{table}"
    
    try:
        # Parse and sanitize
        parsed_uri = urllib.parse.urlsplit(uri_str)
        sanitized_uri = sanitize_uri(parsed_uri)
        
        # Create Dataset
        return Dataset(urllib.parse.urlunsplit(sanitized_uri))
        
    except Exception as e:
        raise ValueError(f"Failed to create PostgreSQL asset: {e}")

# Safe usage
try:
    asset = create_safe_postgres_asset("db.example.com", "myapp", "public", "users")
    print(f"Created asset: {asset.uri}")
except ValueError as e:
    print(f"Error: {e}")

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-postgres

docs

asset-management.md

aws-integration.md

bulk-operations.md

data-retrieval.md

database-connection.md

index.md

openlineage-integration.md

schema-operations.md

sql-dialect.md

tile.json