PostgreSQL integration provider for Apache Airflow with database hooks, assets, and dialect support.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
PostgreSQL-specific SQL dialect implementation providing optimized operations like UPSERT statements, primary key introspection, schema-aware queries, and database-specific query generation for enhanced PostgreSQL integration.
Database dialect implementation for PostgreSQL-specific operations and query generation.
class PostgresDialect(Dialect):
"""
PostgreSQL-specific SQL dialect implementation.
Provides database-specific operations and query generation.
"""
@property
def name(self) -> str:
"""
Dialect name identifier.
Returns:
str: "postgresql"
"""Retrieve primary key information using PostgreSQL system catalogs with caching for performance.
@lru_cache(maxsize=None)
def get_primary_keys(
self,
table: str,
schema: str | None = None
) -> list[str] | None:
"""
Get primary key columns using information_schema queries.
Uses LRU cache for performance optimization.
Parameters:
- table: str, table name (may include schema as "schema.table")
- schema: str or None, schema name (extracted from table if None)
Returns:
list[str] or None: Primary key column names, None if no primary key exists
Implementation:
- Extracts schema from table name if schema parameter is None
- Queries information_schema.table_constraints and key_column_usage
- Caches results to avoid repeated database queries
"""Generate PostgreSQL-specific UPSERT statements using ON CONFLICT clause for efficient conflict resolution.
def generate_replace_sql(
self,
table,
values,
target_fields,
**kwargs
) -> str:
"""
Generate PostgreSQL UPSERT statement using ON CONFLICT clause.
Parameters:
- table: str, target table name
- values: list, row values for insertion
- target_fields: list, column names for insertion
- **kwargs: additional parameters including replace_index
Kwargs:
- replace_index: str or list, column(s) to use for conflict detection
Returns:
str: Generated UPSERT SQL with ON CONFLICT DO UPDATE clause
Example Output:
INSERT INTO users (id, name, email) VALUES (%s, %s, %s)
ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name, email = EXCLUDED.email
"""from airflow.providers.postgres.dialects.postgres import PostgresDialect
from airflow.providers.postgres.hooks.postgres import PostgresHook
# Get dialect instance
hook = PostgresHook(postgres_conn_id="postgres_default")
dialect = hook.dialect # Returns PostgresDialect instance
# Or create directly
dialect = PostgresDialect()# Get primary keys for table
pk_columns = dialect.get_primary_keys("users", "public")
print(f"Primary key columns: {pk_columns}") # ["id"]
# Handle table with schema in name
pk_columns = dialect.get_primary_keys("sales.orders")
print(f"Primary key columns: {pk_columns}") # ["order_id"]
# Composite primary key
pk_columns = dialect.get_primary_keys("order_items")
print(f"Composite key: {pk_columns}") # ["order_id", "product_id"]# Generate UPSERT for single-column primary key
upsert_sql = dialect.generate_replace_sql(
table="users",
values=[(1, "John", "john@example.com")],
target_fields=["id", "name", "email"],
replace_index="id"
)
print(upsert_sql)
# INSERT INTO users (id, name, email) VALUES (%s, %s, %s)
# ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name, email = EXCLUDED.email
# Generate UPSERT for composite key
upsert_sql = dialect.generate_replace_sql(
table="user_preferences",
values=[(1, "theme", "dark")],
target_fields=["user_id", "setting_key", "setting_value"],
replace_index=["user_id", "setting_key"]
)
print(upsert_sql)
# INSERT INTO user_preferences (user_id, setting_key, setting_value) VALUES (%s, %s, %s)
# ON CONFLICT (user_id, setting_key) DO UPDATE SET setting_value = EXCLUDED.setting_valueThe dialect is automatically used by PostgresHook for database-specific operations:
hook = PostgresHook(postgres_conn_id="postgres_default")
# insert_rows automatically uses dialect for UPSERT generation
hook.insert_rows(
table="products",
rows=[
(1, "Widget", 19.99),
(2, "Gadget", 29.99)
],
target_fields=["id", "name", "price"],
replace=True, # Triggers UPSERT generation
replace_index="id"
)def analyze_table_structure(table_name, schema_name=None):
"""Analyze table structure using dialect capabilities."""
dialect = PostgresDialect()
# Get primary key information
pk_columns = dialect.get_primary_keys(table_name, schema_name)
if pk_columns:
print(f"Table {table_name} has primary key: {pk_columns}")
# Generate sample UPSERT
sample_values = [tuple(f"value_{i}" for i in range(len(["col1", "col2", "col3"])))]
upsert_sql = dialect.generate_replace_sql(
table=table_name,
values=sample_values,
target_fields=["col1", "col2", "col3"],
replace_index=pk_columns
)
print(f"Sample UPSERT SQL:\n{upsert_sql}")
else:
print(f"Table {table_name} has no primary key")
# Analyze different tables
analyze_table_structure("users", "public")
analyze_table_structure("sales.orders")def bulk_upsert_with_dialect():
"""Perform bulk upsert using dialect-generated SQL."""
hook = PostgresHook()
dialect = hook.dialect
# Large dataset for upsert
data_rows = [
(i, f"user_{i}", f"user_{i}@example.com")
for i in range(1, 10001)
]
# Get primary key for target table
pk_columns = dialect.get_primary_keys("users", "public")
if pk_columns:
# Use hook's built-in upsert (uses dialect internally)
hook.insert_rows(
table="users",
rows=data_rows,
target_fields=["id", "name", "email"],
replace=True,
replace_index=pk_columns,
commit_every=1000
)
else:
# Fallback to regular insert
hook.insert_rows(
table="users",
rows=data_rows,
target_fields=["id", "name", "email"],
commit_every=1000
)def dynamic_upsert_handler(table_name, data_dict_list, schema="public"):
"""
Handle upsert operations dynamically based on table structure.
"""
hook = PostgresHook()
dialect = hook.dialect
# Get table primary key
pk_columns = dialect.get_primary_keys(table_name, schema)
if not data_dict_list:
return
# Extract fields from data
target_fields = list(data_dict_list[0].keys())
rows = [list(row.values()) for row in data_dict_list]
if pk_columns:
# Check if all primary key columns are present
missing_pk_cols = set(pk_columns) - set(target_fields)
if missing_pk_cols:
raise ValueError(f"Missing primary key columns: {missing_pk_cols}")
# Perform upsert
hook.insert_rows(
table=table_name,
rows=rows,
target_fields=target_fields,
replace=True,
replace_index=pk_columns
)
print(f"Upserted {len(rows)} rows into {table_name}")
else:
# Regular insert for tables without primary key
hook.insert_rows(
table=table_name,
rows=rows,
target_fields=target_fields
)
print(f"Inserted {len(rows)} rows into {table_name}")
# Usage
user_data = [
{"id": 1, "name": "Alice", "email": "alice@example.com"},
{"id": 2, "name": "Bob", "email": "bob@example.com"}
]
dynamic_upsert_handler("users", user_data)The dialect uses LRU caching to optimize primary key lookups:
# First call queries database
pk1 = dialect.get_primary_keys("users") # Database query
# Subsequent calls use cache
pk2 = dialect.get_primary_keys("users") # From cache
pk3 = dialect.get_primary_keys("users") # From cache
# Clear cache if needed (rare)
dialect.get_primary_keys.cache_clear()def performance_comparison():
"""Compare UPSERT vs INSERT performance."""
import time
hook = PostgresHook()
test_data = [(i, f"user_{i}") for i in range(1000)]
# Regular INSERT
start_time = time.time()
hook.insert_rows("test_users", test_data, ["id", "name"])
insert_time = time.time() - start_time
# UPSERT (with conflict resolution)
start_time = time.time()
hook.insert_rows(
"test_users",
test_data,
["id", "name"],
replace=True,
replace_index="id"
)
upsert_time = time.time() - start_time
print(f"INSERT time: {insert_time:.2f}s")
print(f"UPSERT time: {upsert_time:.2f}s")The dialect generates UPSERT statements with the following structure:
INSERT INTO table_name (column1, column2, ...)
VALUES (%s, %s, ...)
ON CONFLICT (conflict_columns)
DO UPDATE SET
column1 = EXCLUDED.column1,
column2 = EXCLUDED.column2,
...# Single column conflict
upsert_sql = dialect.generate_replace_sql(
table="users",
values=data,
target_fields=["id", "name", "email"],
replace_index="id" # Single column
)
# Multi-column conflict (composite key)
upsert_sql = dialect.generate_replace_sql(
table="user_settings",
values=data,
target_fields=["user_id", "setting_name", "value"],
replace_index=["user_id", "setting_name"] # Multiple columns
)The dialect leverages PostgreSQL-specific features:
Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-postgres