CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-postgres

PostgreSQL integration provider for Apache Airflow with database hooks, assets, and dialect support.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

sql-dialect.mddocs/

SQL Dialect and Database-Specific Operations

PostgreSQL-specific SQL dialect implementation providing optimized operations like UPSERT statements, primary key introspection, schema-aware queries, and database-specific query generation for enhanced PostgreSQL integration.

Capabilities

PostgresDialect Class

Database dialect implementation for PostgreSQL-specific operations and query generation.

class PostgresDialect(Dialect):
    """
    PostgreSQL-specific SQL dialect implementation.
    Provides database-specific operations and query generation.
    """
    
    @property
    def name(self) -> str:
        """
        Dialect name identifier.
        
        Returns:
        str: "postgresql"
        """

Primary Key Introspection

Retrieve primary key information using PostgreSQL system catalogs with caching for performance.

@lru_cache(maxsize=None)
def get_primary_keys(
    self, 
    table: str, 
    schema: str | None = None
) -> list[str] | None:
    """
    Get primary key columns using information_schema queries.
    Uses LRU cache for performance optimization.
    
    Parameters:
    - table: str, table name (may include schema as "schema.table")
    - schema: str or None, schema name (extracted from table if None)
    
    Returns:
    list[str] or None: Primary key column names, None if no primary key exists
    
    Implementation:
    - Extracts schema from table name if schema parameter is None
    - Queries information_schema.table_constraints and key_column_usage
    - Caches results to avoid repeated database queries
    """

UPSERT SQL Generation

Generate PostgreSQL-specific UPSERT statements using ON CONFLICT clause for efficient conflict resolution.

def generate_replace_sql(
    self, 
    table, 
    values, 
    target_fields, 
    **kwargs
) -> str:
    """
    Generate PostgreSQL UPSERT statement using ON CONFLICT clause.
    
    Parameters:
    - table: str, target table name
    - values: list, row values for insertion
    - target_fields: list, column names for insertion
    - **kwargs: additional parameters including replace_index
    
    Kwargs:
    - replace_index: str or list, column(s) to use for conflict detection
    
    Returns:
    str: Generated UPSERT SQL with ON CONFLICT DO UPDATE clause
    
    Example Output:
    INSERT INTO users (id, name, email) VALUES (%s, %s, %s)
    ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name, email = EXCLUDED.email
    """

Usage Examples

Direct Dialect Usage

from airflow.providers.postgres.dialects.postgres import PostgresDialect
from airflow.providers.postgres.hooks.postgres import PostgresHook

# Get dialect instance
hook = PostgresHook(postgres_conn_id="postgres_default")
dialect = hook.dialect  # Returns PostgresDialect instance

# Or create directly
dialect = PostgresDialect()

Primary Key Introspection

# Get primary keys for table
pk_columns = dialect.get_primary_keys("users", "public")
print(f"Primary key columns: {pk_columns}")  # ["id"]

# Handle table with schema in name
pk_columns = dialect.get_primary_keys("sales.orders")  
print(f"Primary key columns: {pk_columns}")  # ["order_id"]

# Composite primary key
pk_columns = dialect.get_primary_keys("order_items")
print(f"Composite key: {pk_columns}")  # ["order_id", "product_id"]

UPSERT SQL Generation

# Generate UPSERT for single-column primary key
upsert_sql = dialect.generate_replace_sql(
    table="users",
    values=[(1, "John", "john@example.com")],
    target_fields=["id", "name", "email"],
    replace_index="id"
)
print(upsert_sql)
# INSERT INTO users (id, name, email) VALUES (%s, %s, %s)
# ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name, email = EXCLUDED.email

# Generate UPSERT for composite key
upsert_sql = dialect.generate_replace_sql(
    table="user_preferences",
    values=[(1, "theme", "dark")],
    target_fields=["user_id", "setting_key", "setting_value"],
    replace_index=["user_id", "setting_key"]
)
print(upsert_sql)
# INSERT INTO user_preferences (user_id, setting_key, setting_value) VALUES (%s, %s, %s)
# ON CONFLICT (user_id, setting_key) DO UPDATE SET setting_value = EXCLUDED.setting_value

Integration with PostgresHook

The dialect is automatically used by PostgresHook for database-specific operations:

hook = PostgresHook(postgres_conn_id="postgres_default")

# insert_rows automatically uses dialect for UPSERT generation
hook.insert_rows(
    table="products",
    rows=[
        (1, "Widget", 19.99),
        (2, "Gadget", 29.99)
    ],
    target_fields=["id", "name", "price"],
    replace=True,  # Triggers UPSERT generation
    replace_index="id"
)

Advanced Usage

Custom Schema Handling

def analyze_table_structure(table_name, schema_name=None):
    """Analyze table structure using dialect capabilities."""
    
    dialect = PostgresDialect()
    
    # Get primary key information
    pk_columns = dialect.get_primary_keys(table_name, schema_name)
    
    if pk_columns:
        print(f"Table {table_name} has primary key: {pk_columns}")
        
        # Generate sample UPSERT
        sample_values = [tuple(f"value_{i}" for i in range(len(["col1", "col2", "col3"])))]
        upsert_sql = dialect.generate_replace_sql(
            table=table_name,
            values=sample_values,
            target_fields=["col1", "col2", "col3"],
            replace_index=pk_columns
        )
        print(f"Sample UPSERT SQL:\n{upsert_sql}")
    else:
        print(f"Table {table_name} has no primary key")

# Analyze different tables
analyze_table_structure("users", "public")
analyze_table_structure("sales.orders")

Bulk UPSERT Operations

def bulk_upsert_with_dialect():
    """Perform bulk upsert using dialect-generated SQL."""
    
    hook = PostgresHook()
    dialect = hook.dialect
    
    # Large dataset for upsert
    data_rows = [
        (i, f"user_{i}", f"user_{i}@example.com") 
        for i in range(1, 10001)
    ]
    
    # Get primary key for target table
    pk_columns = dialect.get_primary_keys("users", "public")
    
    if pk_columns:
        # Use hook's built-in upsert (uses dialect internally)
        hook.insert_rows(
            table="users",
            rows=data_rows,
            target_fields=["id", "name", "email"],
            replace=True,
            replace_index=pk_columns,
            commit_every=1000
        )
    else:
        # Fallback to regular insert
        hook.insert_rows(
            table="users",
            rows=data_rows,
            target_fields=["id", "name", "email"],
            commit_every=1000
        )

Dynamic UPSERT Generation

def dynamic_upsert_handler(table_name, data_dict_list, schema="public"):
    """
    Handle upsert operations dynamically based on table structure.
    """
    hook = PostgresHook()
    dialect = hook.dialect
    
    # Get table primary key
    pk_columns = dialect.get_primary_keys(table_name, schema)
    
    if not data_dict_list:
        return
    
    # Extract fields from data
    target_fields = list(data_dict_list[0].keys())
    rows = [list(row.values()) for row in data_dict_list]
    
    if pk_columns:
        # Check if all primary key columns are present
        missing_pk_cols = set(pk_columns) - set(target_fields)
        if missing_pk_cols:
            raise ValueError(f"Missing primary key columns: {missing_pk_cols}")
        
        # Perform upsert
        hook.insert_rows(
            table=table_name,
            rows=rows,
            target_fields=target_fields,
            replace=True,
            replace_index=pk_columns
        )
        print(f"Upserted {len(rows)} rows into {table_name}")
    else:
        # Regular insert for tables without primary key
        hook.insert_rows(
            table=table_name,
            rows=rows,
            target_fields=target_fields
        )
        print(f"Inserted {len(rows)} rows into {table_name}")

# Usage
user_data = [
    {"id": 1, "name": "Alice", "email": "alice@example.com"},
    {"id": 2, "name": "Bob", "email": "bob@example.com"}
]

dynamic_upsert_handler("users", user_data)

Performance Optimization

Primary Key Caching

The dialect uses LRU caching to optimize primary key lookups:

# First call queries database
pk1 = dialect.get_primary_keys("users")  # Database query

# Subsequent calls use cache
pk2 = dialect.get_primary_keys("users")  # From cache
pk3 = dialect.get_primary_keys("users")  # From cache

# Clear cache if needed (rare)
dialect.get_primary_keys.cache_clear()

UPSERT vs INSERT Performance

def performance_comparison():
    """Compare UPSERT vs INSERT performance."""
    
    import time
    
    hook = PostgresHook()
    test_data = [(i, f"user_{i}") for i in range(1000)]
    
    # Regular INSERT
    start_time = time.time()
    hook.insert_rows("test_users", test_data, ["id", "name"])
    insert_time = time.time() - start_time
    
    # UPSERT (with conflict resolution)
    start_time = time.time()
    hook.insert_rows(
        "test_users", 
        test_data, 
        ["id", "name"], 
        replace=True, 
        replace_index="id"
    )
    upsert_time = time.time() - start_time
    
    print(f"INSERT time: {insert_time:.2f}s")
    print(f"UPSERT time: {upsert_time:.2f}s")

SQL Generation Details

ON CONFLICT Clause Structure

The dialect generates UPSERT statements with the following structure:

INSERT INTO table_name (column1, column2, ...)
VALUES (%s, %s, ...)
ON CONFLICT (conflict_columns)
DO UPDATE SET 
    column1 = EXCLUDED.column1,
    column2 = EXCLUDED.column2,
    ...

Conflict Resolution Options

# Single column conflict
upsert_sql = dialect.generate_replace_sql(
    table="users",
    values=data,
    target_fields=["id", "name", "email"],
    replace_index="id"  # Single column
)

# Multi-column conflict (composite key)
upsert_sql = dialect.generate_replace_sql(
    table="user_settings",
    values=data,
    target_fields=["user_id", "setting_name", "value"],
    replace_index=["user_id", "setting_name"]  # Multiple columns
)

Integration with PostgreSQL Features

The dialect leverages PostgreSQL-specific features:

  • ON CONFLICT: Native upsert support (PostgreSQL 9.5+)
  • EXCLUDED: Reference to conflicting row values
  • information_schema: Standard metadata queries
  • System catalogs: pg_* tables for advanced introspection

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-postgres

docs

asset-management.md

aws-integration.md

bulk-operations.md

data-retrieval.md

database-connection.md

index.md

openlineage-integration.md

schema-operations.md

sql-dialect.md

tile.json