CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-postgres

PostgreSQL integration provider for Apache Airflow with database hooks, assets, and dialect support.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

schema-operations.mddocs/

Schema Operations and Introspection

Database schema introspection and metadata operations for analyzing table structures, primary keys, constraints, and database organization. Provides programmatic access to PostgreSQL system catalogs and information schema.

Capabilities

Primary Key Introspection

Retrieve primary key information for tables using PostgreSQL system catalogs.

def get_table_primary_key(
    self, 
    table: str, 
    schema: str | None = "public"
) -> list[str] | None:
    """
    Get primary key columns for specified table.
    
    Parameters:
    - table: str, table name to inspect
    - schema: str or None, schema name (defaults to "public")
    
    Returns:
    list[str] or None: List of primary key column names, None if no primary key
    
    Example:
    pk_cols = hook.get_table_primary_key("users", "public")  # ["id"]
    composite_pk = hook.get_table_primary_key("user_roles")  # ["user_id", "role_id"]
    """

Usage Examples

Basic Primary Key Lookup

from airflow.providers.postgres.hooks.postgres import PostgresHook

hook = PostgresHook(postgres_conn_id="postgres_default")

# Get primary key for table in default schema
pk_columns = hook.get_table_primary_key("users")
print(f"Primary key columns: {pk_columns}")  # ["id"]

# Get primary key for table in specific schema
pk_columns = hook.get_table_primary_key("customers", "sales")
print(f"Primary key columns: {pk_columns}")  # ["customer_id"]

Handling Composite Primary Keys

# Tables with composite primary keys
composite_pk = hook.get_table_primary_key("order_items")
if composite_pk:
    print(f"Composite key: {composite_pk}")  # ["order_id", "product_id"]
    
    # Use in upsert operations
    hook.insert_rows(
        table="order_items",
        rows=[(1, 101, 2), (1, 102, 1)],
        target_fields=["order_id", "product_id", "quantity"],
        replace=True,
        replace_index=composite_pk  # Use composite key for conflict resolution
    )

Schema Validation Workflow

def validate_table_structure(table_name, expected_pk_columns):
    """Validate table has expected primary key structure."""
    actual_pk = hook.get_table_primary_key(table_name)
    
    if actual_pk is None:
        raise ValueError(f"Table {table_name} has no primary key")
    
    if set(actual_pk) != set(expected_pk_columns):
        raise ValueError(
            f"Primary key mismatch for {table_name}. "
            f"Expected: {expected_pk_columns}, Actual: {actual_pk}"
        )
    
    return True

# Validate table structure before operations
validate_table_structure("users", ["id"])
validate_table_structure("user_permissions", ["user_id", "permission_id"])

Dynamic Upsert Configuration

def smart_upsert(table_name, rows, target_fields):
    """Perform upsert using table's actual primary key."""
    
    # Discover primary key dynamically
    pk_columns = hook.get_table_primary_key(table_name)
    
    if pk_columns is None:
        # No primary key - use regular insert
        hook.insert_rows(
            table=table_name,
            rows=rows,
            target_fields=target_fields
        )
    else:
        # Use discovered primary key for upsert
        hook.insert_rows(
            table=table_name,
            rows=rows,
            target_fields=target_fields,
            replace=True,
            replace_index=pk_columns
        )

# Usage
smart_upsert("products", [(1, "Widget", 19.99)], ["id", "name", "price"])

ETL Pipeline Integration

def etl_with_schema_validation():
    """ETL pipeline with schema validation."""
    
    # Define expected schemas
    expected_schemas = {
        "users": ["user_id"],
        "orders": ["order_id"],
        "order_items": ["order_id", "item_id"]
    }
    
    # Validate all tables before processing
    for table, expected_pk in expected_schemas.items():
        try:
            validate_table_structure(table, expected_pk)
            print(f"✓ {table} schema validation passed")
        except ValueError as e:
            print(f"✗ {table} schema validation failed: {e}")
            return False
    
    # Proceed with ETL operations
    process_etl_data()
    return True

Schema Information Queries

While the hook provides primary key introspection, you can also execute custom queries for additional schema information:

Table Information

# Get all tables in schema
tables = hook.get_records("""
    SELECT table_name 
    FROM information_schema.tables 
    WHERE table_schema = %s AND table_type = 'BASE TABLE'
""", parameters=["public"])

# Get column information
columns = hook.get_records("""
    SELECT column_name, data_type, is_nullable, column_default
    FROM information_schema.columns
    WHERE table_schema = %s AND table_name = %s
    ORDER BY ordinal_position
""", parameters=["public", "users"])

Constraint Information

# Get foreign key constraints
foreign_keys = hook.get_records("""
    SELECT
        kcu.column_name,
        ccu.table_name AS foreign_table_name,
        ccu.column_name AS foreign_column_name
    FROM information_schema.table_constraints tc
    JOIN information_schema.key_column_usage kcu
        ON tc.constraint_name = kcu.constraint_name
    JOIN information_schema.constraint_column_usage ccu
        ON ccu.constraint_name = tc.constraint_name
    WHERE tc.constraint_type = 'FOREIGN KEY'
        AND tc.table_schema = %s
        AND tc.table_name = %s
""", parameters=["public", "orders"])

# Get unique constraints
unique_constraints = hook.get_records("""
    SELECT
        tc.constraint_name,
        array_agg(kcu.column_name ORDER BY kcu.ordinal_position) as columns
    FROM information_schema.table_constraints tc
    JOIN information_schema.key_column_usage kcu
        ON tc.constraint_name = kcu.constraint_name
    WHERE tc.constraint_type = 'UNIQUE'
        AND tc.table_schema = %s
        AND tc.table_name = %s
    GROUP BY tc.constraint_name
""", parameters=["public", "users"])

Index Information

# Get table indexes
indexes = hook.get_records("""
    SELECT
        i.relname as index_name,
        array_agg(a.attname ORDER BY c.ordinality) as columns,
        ix.indisunique as is_unique,
        ix.indisprimary as is_primary
    FROM pg_class t
    JOIN pg_index ix ON t.oid = ix.indrelid
    JOIN pg_class i ON i.oid = ix.indexrelid
    JOIN unnest(ix.indkey) WITH ORDINALITY AS c(attnum, ordinality) ON true
    JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = c.attnum
    JOIN pg_namespace n ON t.relnamespace = n.oid
    WHERE t.relname = %s AND n.nspname = %s
    GROUP BY i.relname, ix.indisunique, ix.indisprimary
    ORDER BY i.relname
""", parameters=["users", "public"])

Error Handling

def safe_get_primary_key(table_name, schema=None):
    """Safely get primary key with error handling."""
    try:
        pk_columns = hook.get_table_primary_key(table_name, schema)
        return pk_columns
    except Exception as e:
        print(f"Error getting primary key for {table_name}: {e}")
        return None

# Usage with error handling
pk = safe_get_primary_key("nonexistent_table")
if pk is not None:
    print(f"Primary key: {pk}")
else:
    print("Could not determine primary key")

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-postgres

docs

asset-management.md

aws-integration.md

bulk-operations.md

data-retrieval.md

database-connection.md

index.md

openlineage-integration.md

schema-operations.md

sql-dialect.md

tile.json