PostgreSQL integration provider for Apache Airflow with database hooks, assets, and dialect support.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
PostgreSQL asset/dataset URI handling with validation, sanitization, and integration with Airflow's data lineage and dependency management systems. Supports both legacy dataset terminology and modern asset terminology.
Sanitize and validate PostgreSQL asset/dataset URIs for consistency and proper format.
def sanitize_uri(uri: SplitResult) -> SplitResult:
"""
Sanitize PostgreSQL asset/dataset URIs.
Parameters:
- uri: SplitResult, parsed URI components from urllib.parse.urlsplit
Returns:
SplitResult: Sanitized URI components with validated structure
URI Format:
postgres://host:port/database/schema/table
postgresql://host:port/database/schema/table
Validation:
- Ensures URI contains host, database, schema, and table components
- Adds default port 5432 if not specified
- Validates path structure matches /database/schema/table format
Raises:
ValueError: If URI structure is invalid or missing required components
"""import urllib.parse
from airflow.providers.postgres.assets.postgres import sanitize_uri
# Parse and sanitize PostgreSQL URI
uri_str = "postgres://localhost/mydb/public/users"
parsed_uri = urllib.parse.urlsplit(uri_str)
sanitized_uri = sanitize_uri(parsed_uri)
print(f"Original: {uri_str}")
print(f"Sanitized: {urllib.parse.urlunsplit(sanitized_uri)}")
# Output: postgres://localhost:5432/mydb/public/usersfrom airflow import DAG
from airflow.datasets import Dataset
from airflow.providers.postgres.assets.postgres import sanitize_uri
import urllib.parse
# Define PostgreSQL dataset/asset
def create_postgres_asset(host, database, schema, table, port=5432):
uri_str = f"postgres://{host}:{port}/{database}/{schema}/{table}"
parsed_uri = urllib.parse.urlsplit(uri_str)
sanitized_uri = sanitize_uri(parsed_uri)
return Dataset(urllib.parse.urlunsplit(sanitized_uri))
# Create assets for data pipeline
users_table = create_postgres_asset("db.example.com", "app_db", "public", "users")
orders_table = create_postgres_asset("db.example.com", "app_db", "public", "orders")
analytics_table = create_postgres_asset("warehouse.example.com", "analytics", "marts", "user_orders")
# Use in DAG with data dependencies
with DAG("data_pipeline", schedule=None) as dag:
@task(outlets=[users_table])
def extract_users():
# Extract user data
pass
@task(inlets=[users_table], outlets=[orders_table])
def extract_orders():
# Extract order data
pass
@task(inlets=[users_table, orders_table], outlets=[analytics_table])
def create_analytics():
# Create analytics table
passdef validate_postgres_asset(uri_string):
"""Validate PostgreSQL asset URI format."""
try:
parsed_uri = urllib.parse.urlsplit(uri_string)
sanitized_uri = sanitize_uri(parsed_uri)
print(f"✓ Valid PostgreSQL asset URI: {urllib.parse.urlunsplit(sanitized_uri)}")
return True
except Exception as e:
print(f"✗ Invalid PostgreSQL asset URI: {e}")
return False
# Test various URI formats
test_uris = [
"postgres://localhost/mydb/public/users", # Valid
"postgresql://host:5432/db/schema/table", # Valid
"postgres://host/database/schema", # Invalid - missing table
"postgres://host/database", # Invalid - missing schema/table
"mysql://host/database/table" # Invalid - wrong scheme
]
for uri in test_uris:
validate_postgres_asset(uri)# Define assets across different databases
source_table = create_postgres_asset("source-db", "raw_data", "public", "events")
staging_table = create_postgres_asset("staging-db", "staging", "events", "processed_events")
warehouse_table = create_postgres_asset("warehouse-db", "analytics", "facts", "event_summary")
with DAG("cross_db_pipeline", schedule=[source_table]) as dag:
@task(inlets=[source_table], outlets=[staging_table])
def stage_events():
# Move data from source to staging
source_hook = PostgresHook(postgres_conn_id="source_db")
staging_hook = PostgresHook(postgres_conn_id="staging_db")
# Extract from source
data = source_hook.get_df("SELECT * FROM events WHERE processed = false")
# Load to staging
staging_hook.insert_rows(
"processed_events",
data.values.tolist(),
target_fields=list(data.columns)
)
@task(inlets=[staging_table], outlets=[warehouse_table])
def aggregate_events():
# Aggregate staging data to warehouse
passBoth schemes are handled identically by the sanitize_uri function.
postgres://[user[:password]@]host[:port]/database/schema/tablepostgres or postgresqlvalid_uris = [
"postgres://localhost:5432/myapp/public/users",
"postgresql://db.example.com/warehouse/sales/orders",
"postgres://user:pass@host:5433/db/schema/table",
"postgres://readonly@analytics-db/reports/monthly/revenue"
]from airflow.decorators import task
from airflow.datasets import Dataset
# Define PostgreSQL assets
customer_data = Dataset("postgres://db/crm/public/customers")
order_data = Dataset("postgres://db/sales/public/orders")
report_data = Dataset("postgres://warehouse/reports/public/daily_summary")
@task(outlets=[customer_data])
def sync_customers():
"""Sync customer data from external source."""
hook = PostgresHook()
# Sync logic here
pass
@task(inlets=[customer_data, order_data], outlets=[report_data])
def generate_daily_report():
"""Generate daily report from customer and order data."""
hook = PostgresHook()
# Report generation logic here
pass# DAG that runs when PostgreSQL assets are updated
with DAG(
"report_generator",
schedule=[
Dataset("postgres://db/sales/public/orders"),
Dataset("postgres://db/crm/public/customers")
]
) as report_dag:
generate_reports_task = generate_daily_report()def track_data_lineage():
"""Example of tracking data lineage with PostgreSQL assets."""
# Source data assets
raw_events = Dataset("postgres://source/raw/public/events")
raw_users = Dataset("postgres://source/raw/public/users")
# Intermediate processing assets
clean_events = Dataset("postgres://staging/clean/public/events")
enriched_events = Dataset("postgres://staging/enriched/public/events")
# Final output assets
user_metrics = Dataset("postgres://warehouse/metrics/public/user_activity")
# Define processing pipeline with clear lineage
with DAG("event_processing_pipeline") as dag:
@task(inlets=[raw_events], outlets=[clean_events])
def clean_events_task():
# Data cleaning logic
pass
@task(inlets=[clean_events, raw_users], outlets=[enriched_events])
def enrich_events_task():
# Data enrichment logic
pass
@task(inlets=[enriched_events], outlets=[user_metrics])
def calculate_metrics_task():
# Metrics calculation logic
passThe PostgreSQL provider automatically registers asset URI handlers with Airflow:
# From provider.yaml - automatically registered
"asset-uris":
- schemes: [postgres, postgresql]
handler: airflow.providers.postgres.assets.postgres.sanitize_uri
# Legacy dataset URI support (backward compatibility)
"dataset-uris":
- schemes: [postgres, postgresql]
handler: airflow.providers.postgres.assets.postgres.sanitize_uriThe sanitize_uri function is automatically called by Airflow when:
def handle_uri_errors():
"""Handle common PostgreSQL asset URI errors."""
problematic_uris = [
"postgres://host/db", # Missing schema/table
"postgres://host/db/schema", # Missing table
"postgres:///db/schema/table", # Missing host
"mysql://host/db/schema/table" # Wrong scheme
]
for uri_str in problematic_uris:
try:
parsed = urllib.parse.urlsplit(uri_str)
sanitized = sanitize_uri(parsed)
print(f"✓ {uri_str}")
except Exception as e:
print(f"✗ {uri_str}: {e}")def create_safe_postgres_asset(host, database, schema, table, port=5432):
"""Safely create PostgreSQL asset with validation."""
# Validate inputs
if not all([host, database, schema, table]):
raise ValueError("All components (host, database, schema, table) are required")
# Construct URI
uri_str = f"postgres://{host}:{port}/{database}/{schema}/{table}"
try:
# Parse and sanitize
parsed_uri = urllib.parse.urlsplit(uri_str)
sanitized_uri = sanitize_uri(parsed_uri)
# Create Dataset
return Dataset(urllib.parse.urlunsplit(sanitized_uri))
except Exception as e:
raise ValueError(f"Failed to create PostgreSQL asset: {e}")
# Safe usage
try:
asset = create_safe_postgres_asset("db.example.com", "myapp", "public", "users")
print(f"Created asset: {asset.uri}")
except ValueError as e:
print(f"Error: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-postgres