PostgreSQL integration provider for Apache Airflow with database hooks, assets, and dialect support.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Database schema introspection and metadata operations for analyzing table structures, primary keys, constraints, and database organization. Provides programmatic access to PostgreSQL system catalogs and information schema.
Retrieve primary key information for tables using PostgreSQL system catalogs.
def get_table_primary_key(
self,
table: str,
schema: str | None = "public"
) -> list[str] | None:
"""
Get primary key columns for specified table.
Parameters:
- table: str, table name to inspect
- schema: str or None, schema name (defaults to "public")
Returns:
list[str] or None: List of primary key column names, None if no primary key
Example:
pk_cols = hook.get_table_primary_key("users", "public") # ["id"]
composite_pk = hook.get_table_primary_key("user_roles") # ["user_id", "role_id"]
"""from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(postgres_conn_id="postgres_default")
# Get primary key for table in default schema
pk_columns = hook.get_table_primary_key("users")
print(f"Primary key columns: {pk_columns}") # ["id"]
# Get primary key for table in specific schema
pk_columns = hook.get_table_primary_key("customers", "sales")
print(f"Primary key columns: {pk_columns}") # ["customer_id"]# Tables with composite primary keys
composite_pk = hook.get_table_primary_key("order_items")
if composite_pk:
print(f"Composite key: {composite_pk}") # ["order_id", "product_id"]
# Use in upsert operations
hook.insert_rows(
table="order_items",
rows=[(1, 101, 2), (1, 102, 1)],
target_fields=["order_id", "product_id", "quantity"],
replace=True,
replace_index=composite_pk # Use composite key for conflict resolution
)def validate_table_structure(table_name, expected_pk_columns):
"""Validate table has expected primary key structure."""
actual_pk = hook.get_table_primary_key(table_name)
if actual_pk is None:
raise ValueError(f"Table {table_name} has no primary key")
if set(actual_pk) != set(expected_pk_columns):
raise ValueError(
f"Primary key mismatch for {table_name}. "
f"Expected: {expected_pk_columns}, Actual: {actual_pk}"
)
return True
# Validate table structure before operations
validate_table_structure("users", ["id"])
validate_table_structure("user_permissions", ["user_id", "permission_id"])def smart_upsert(table_name, rows, target_fields):
"""Perform upsert using table's actual primary key."""
# Discover primary key dynamically
pk_columns = hook.get_table_primary_key(table_name)
if pk_columns is None:
# No primary key - use regular insert
hook.insert_rows(
table=table_name,
rows=rows,
target_fields=target_fields
)
else:
# Use discovered primary key for upsert
hook.insert_rows(
table=table_name,
rows=rows,
target_fields=target_fields,
replace=True,
replace_index=pk_columns
)
# Usage
smart_upsert("products", [(1, "Widget", 19.99)], ["id", "name", "price"])def etl_with_schema_validation():
"""ETL pipeline with schema validation."""
# Define expected schemas
expected_schemas = {
"users": ["user_id"],
"orders": ["order_id"],
"order_items": ["order_id", "item_id"]
}
# Validate all tables before processing
for table, expected_pk in expected_schemas.items():
try:
validate_table_structure(table, expected_pk)
print(f"✓ {table} schema validation passed")
except ValueError as e:
print(f"✗ {table} schema validation failed: {e}")
return False
# Proceed with ETL operations
process_etl_data()
return TrueWhile the hook provides primary key introspection, you can also execute custom queries for additional schema information:
# Get all tables in schema
tables = hook.get_records("""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = %s AND table_type = 'BASE TABLE'
""", parameters=["public"])
# Get column information
columns = hook.get_records("""
SELECT column_name, data_type, is_nullable, column_default
FROM information_schema.columns
WHERE table_schema = %s AND table_name = %s
ORDER BY ordinal_position
""", parameters=["public", "users"])# Get foreign key constraints
foreign_keys = hook.get_records("""
SELECT
kcu.column_name,
ccu.table_name AS foreign_table_name,
ccu.column_name AS foreign_column_name
FROM information_schema.table_constraints tc
JOIN information_schema.key_column_usage kcu
ON tc.constraint_name = kcu.constraint_name
JOIN information_schema.constraint_column_usage ccu
ON ccu.constraint_name = tc.constraint_name
WHERE tc.constraint_type = 'FOREIGN KEY'
AND tc.table_schema = %s
AND tc.table_name = %s
""", parameters=["public", "orders"])
# Get unique constraints
unique_constraints = hook.get_records("""
SELECT
tc.constraint_name,
array_agg(kcu.column_name ORDER BY kcu.ordinal_position) as columns
FROM information_schema.table_constraints tc
JOIN information_schema.key_column_usage kcu
ON tc.constraint_name = kcu.constraint_name
WHERE tc.constraint_type = 'UNIQUE'
AND tc.table_schema = %s
AND tc.table_name = %s
GROUP BY tc.constraint_name
""", parameters=["public", "users"])# Get table indexes
indexes = hook.get_records("""
SELECT
i.relname as index_name,
array_agg(a.attname ORDER BY c.ordinality) as columns,
ix.indisunique as is_unique,
ix.indisprimary as is_primary
FROM pg_class t
JOIN pg_index ix ON t.oid = ix.indrelid
JOIN pg_class i ON i.oid = ix.indexrelid
JOIN unnest(ix.indkey) WITH ORDINALITY AS c(attnum, ordinality) ON true
JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = c.attnum
JOIN pg_namespace n ON t.relnamespace = n.oid
WHERE t.relname = %s AND n.nspname = %s
GROUP BY i.relname, ix.indisunique, ix.indisprimary
ORDER BY i.relname
""", parameters=["users", "public"])def safe_get_primary_key(table_name, schema=None):
"""Safely get primary key with error handling."""
try:
pk_columns = hook.get_table_primary_key(table_name, schema)
return pk_columns
except Exception as e:
print(f"Error getting primary key for {table_name}: {e}")
return None
# Usage with error handling
pk = safe_get_primary_key("nonexistent_table")
if pk is not None:
print(f"Primary key: {pk}")
else:
print("Could not determine primary key")Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-postgres