0
# Asset and Dataset Management
1
2
PostgreSQL asset/dataset URI handling with validation, sanitization, and integration with Airflow's data lineage and dependency management systems. Supports both legacy dataset terminology and modern asset terminology.
3
4
## Capabilities
5
6
### URI Sanitization
7
8
Sanitize and validate PostgreSQL asset/dataset URIs for consistency and proper format.
9
10
```python { .api }
11
def sanitize_uri(uri: SplitResult) -> SplitResult:
12
"""
13
Sanitize PostgreSQL asset/dataset URIs.
14
15
Parameters:
16
- uri: SplitResult, parsed URI components from urllib.parse.urlsplit
17
18
Returns:
19
SplitResult: Sanitized URI components with validated structure
20
21
URI Format:
22
postgres://host:port/database/schema/table
23
postgresql://host:port/database/schema/table
24
25
Validation:
26
- Ensures URI contains host, database, schema, and table components
27
- Adds default port 5432 if not specified
28
- Validates path structure matches /database/schema/table format
29
30
Raises:
31
ValueError: If URI structure is invalid or missing required components
32
"""
33
```
34
35
## Usage Examples
36
37
### Basic URI Sanitization
38
39
```python
40
import urllib.parse
41
from airflow.providers.postgres.assets.postgres import sanitize_uri
42
43
# Parse and sanitize PostgreSQL URI
44
uri_str = "postgres://localhost/mydb/public/users"
45
parsed_uri = urllib.parse.urlsplit(uri_str)
46
sanitized_uri = sanitize_uri(parsed_uri)
47
48
print(f"Original: {uri_str}")
49
print(f"Sanitized: {urllib.parse.urlunsplit(sanitized_uri)}")
50
# Output: postgres://localhost:5432/mydb/public/users
51
```
52
53
### Asset Definition in DAGs
54
55
```python
56
from airflow import DAG
57
from airflow.datasets import Dataset
58
from airflow.providers.postgres.assets.postgres import sanitize_uri
59
import urllib.parse
60
61
# Define PostgreSQL dataset/asset
62
def create_postgres_asset(host, database, schema, table, port=5432):
63
uri_str = f"postgres://{host}:{port}/{database}/{schema}/{table}"
64
parsed_uri = urllib.parse.urlsplit(uri_str)
65
sanitized_uri = sanitize_uri(parsed_uri)
66
return Dataset(urllib.parse.urlunsplit(sanitized_uri))
67
68
# Create assets for data pipeline
69
users_table = create_postgres_asset("db.example.com", "app_db", "public", "users")
70
orders_table = create_postgres_asset("db.example.com", "app_db", "public", "orders")
71
analytics_table = create_postgres_asset("warehouse.example.com", "analytics", "marts", "user_orders")
72
73
# Use in DAG with data dependencies
74
with DAG("data_pipeline", schedule=None) as dag:
75
76
@task(outlets=[users_table])
77
def extract_users():
78
# Extract user data
79
pass
80
81
@task(inlets=[users_table], outlets=[orders_table])
82
def extract_orders():
83
# Extract order data
84
pass
85
86
@task(inlets=[users_table, orders_table], outlets=[analytics_table])
87
def create_analytics():
88
# Create analytics table
89
pass
90
```
91
92
### Asset URI Validation
93
94
```python
95
def validate_postgres_asset(uri_string):
96
"""Validate PostgreSQL asset URI format."""
97
try:
98
parsed_uri = urllib.parse.urlsplit(uri_string)
99
sanitized_uri = sanitize_uri(parsed_uri)
100
101
print(f"✓ Valid PostgreSQL asset URI: {urllib.parse.urlunsplit(sanitized_uri)}")
102
return True
103
104
except Exception as e:
105
print(f"✗ Invalid PostgreSQL asset URI: {e}")
106
return False
107
108
# Test various URI formats
109
test_uris = [
110
"postgres://localhost/mydb/public/users", # Valid
111
"postgresql://host:5432/db/schema/table", # Valid
112
"postgres://host/database/schema", # Invalid - missing table
113
"postgres://host/database", # Invalid - missing schema/table
114
"mysql://host/database/table" # Invalid - wrong scheme
115
]
116
117
for uri in test_uris:
118
validate_postgres_asset(uri)
119
```
120
121
### Cross-Database Asset Dependencies
122
123
```python
124
# Define assets across different databases
125
source_table = create_postgres_asset("source-db", "raw_data", "public", "events")
126
staging_table = create_postgres_asset("staging-db", "staging", "events", "processed_events")
127
warehouse_table = create_postgres_asset("warehouse-db", "analytics", "facts", "event_summary")
128
129
with DAG("cross_db_pipeline", schedule=[source_table]) as dag:
130
131
@task(inlets=[source_table], outlets=[staging_table])
132
def stage_events():
133
# Move data from source to staging
134
source_hook = PostgresHook(postgres_conn_id="source_db")
135
staging_hook = PostgresHook(postgres_conn_id="staging_db")
136
137
# Extract from source
138
data = source_hook.get_df("SELECT * FROM events WHERE processed = false")
139
140
# Load to staging
141
staging_hook.insert_rows(
142
"processed_events",
143
data.values.tolist(),
144
target_fields=list(data.columns)
145
)
146
147
@task(inlets=[staging_table], outlets=[warehouse_table])
148
def aggregate_events():
149
# Aggregate staging data to warehouse
150
pass
151
```
152
153
## Asset URI Formats
154
155
### Supported Schemes
156
157
- **postgres**: PostgreSQL URI scheme
158
- **postgresql**: Alternative PostgreSQL URI scheme
159
160
Both schemes are handled identically by the sanitize_uri function.
161
162
### URI Structure
163
164
```
165
postgres://[user[:password]@]host[:port]/database/schema/table
166
```
167
168
### Components
169
170
- **scheme**: `postgres` or `postgresql`
171
- **user**: Optional database username
172
- **password**: Optional database password
173
- **host**: Database hostname or IP address (required)
174
- **port**: Database port (defaults to 5432)
175
- **database**: Database name (required)
176
- **schema**: Schema name (required)
177
- **table**: Table name (required)
178
179
### Examples of Valid URIs
180
181
```python
182
valid_uris = [
183
"postgres://localhost:5432/myapp/public/users",
184
"postgresql://db.example.com/warehouse/sales/orders",
185
"postgres://user:pass@host:5433/db/schema/table",
186
"postgres://readonly@analytics-db/reports/monthly/revenue"
187
]
188
```
189
190
## Integration with Airflow Assets
191
192
### Asset-Aware Task Definition
193
194
```python
195
from airflow.decorators import task
196
from airflow.datasets import Dataset
197
198
# Define PostgreSQL assets
199
customer_data = Dataset("postgres://db/crm/public/customers")
200
order_data = Dataset("postgres://db/sales/public/orders")
201
report_data = Dataset("postgres://warehouse/reports/public/daily_summary")
202
203
@task(outlets=[customer_data])
204
def sync_customers():
205
"""Sync customer data from external source."""
206
hook = PostgresHook()
207
# Sync logic here
208
pass
209
210
@task(inlets=[customer_data, order_data], outlets=[report_data])
211
def generate_daily_report():
212
"""Generate daily report from customer and order data."""
213
hook = PostgresHook()
214
# Report generation logic here
215
pass
216
```
217
218
### Dataset-Triggered DAGs
219
220
```python
221
# DAG that runs when PostgreSQL assets are updated
222
with DAG(
223
"report_generator",
224
schedule=[
225
Dataset("postgres://db/sales/public/orders"),
226
Dataset("postgres://db/crm/public/customers")
227
]
228
) as report_dag:
229
230
generate_reports_task = generate_daily_report()
231
```
232
233
### Asset Lineage Tracking
234
235
```python
236
def track_data_lineage():
237
"""Example of tracking data lineage with PostgreSQL assets."""
238
239
# Source data assets
240
raw_events = Dataset("postgres://source/raw/public/events")
241
raw_users = Dataset("postgres://source/raw/public/users")
242
243
# Intermediate processing assets
244
clean_events = Dataset("postgres://staging/clean/public/events")
245
enriched_events = Dataset("postgres://staging/enriched/public/events")
246
247
# Final output assets
248
user_metrics = Dataset("postgres://warehouse/metrics/public/user_activity")
249
250
# Define processing pipeline with clear lineage
251
with DAG("event_processing_pipeline") as dag:
252
253
@task(inlets=[raw_events], outlets=[clean_events])
254
def clean_events_task():
255
# Data cleaning logic
256
pass
257
258
@task(inlets=[clean_events, raw_users], outlets=[enriched_events])
259
def enrich_events_task():
260
# Data enrichment logic
261
pass
262
263
@task(inlets=[enriched_events], outlets=[user_metrics])
264
def calculate_metrics_task():
265
# Metrics calculation logic
266
pass
267
```
268
269
## Provider Registration
270
271
The PostgreSQL provider automatically registers asset URI handlers with Airflow:
272
273
### Asset URI Registration
274
275
```python
276
# From provider.yaml - automatically registered
277
"asset-uris":
278
- schemes: [postgres, postgresql]
279
handler: airflow.providers.postgres.assets.postgres.sanitize_uri
280
281
# Legacy dataset URI support (backward compatibility)
282
"dataset-uris":
283
- schemes: [postgres, postgresql]
284
handler: airflow.providers.postgres.assets.postgres.sanitize_uri
285
```
286
287
### Handler Function
288
289
The `sanitize_uri` function is automatically called by Airflow when:
290
291
- Creating Dataset/Asset objects with postgres:// or postgresql:// URIs
292
- Validating asset dependencies in DAGs
293
- Processing asset lineage information
294
- Comparing asset URIs for dependency resolution
295
296
## Error Handling
297
298
### Common URI Validation Errors
299
300
```python
301
def handle_uri_errors():
302
"""Handle common PostgreSQL asset URI errors."""
303
304
problematic_uris = [
305
"postgres://host/db", # Missing schema/table
306
"postgres://host/db/schema", # Missing table
307
"postgres:///db/schema/table", # Missing host
308
"mysql://host/db/schema/table" # Wrong scheme
309
]
310
311
for uri_str in problematic_uris:
312
try:
313
parsed = urllib.parse.urlsplit(uri_str)
314
sanitized = sanitize_uri(parsed)
315
print(f"✓ {uri_str}")
316
except Exception as e:
317
print(f"✗ {uri_str}: {e}")
318
```
319
320
### Best Practices
321
322
```python
323
def create_safe_postgres_asset(host, database, schema, table, port=5432):
324
"""Safely create PostgreSQL asset with validation."""
325
326
# Validate inputs
327
if not all([host, database, schema, table]):
328
raise ValueError("All components (host, database, schema, table) are required")
329
330
# Construct URI
331
uri_str = f"postgres://{host}:{port}/{database}/{schema}/{table}"
332
333
try:
334
# Parse and sanitize
335
parsed_uri = urllib.parse.urlsplit(uri_str)
336
sanitized_uri = sanitize_uri(parsed_uri)
337
338
# Create Dataset
339
return Dataset(urllib.parse.urlunsplit(sanitized_uri))
340
341
except Exception as e:
342
raise ValueError(f"Failed to create PostgreSQL asset: {e}")
343
344
# Safe usage
345
try:
346
asset = create_safe_postgres_asset("db.example.com", "myapp", "public", "users")
347
print(f"Created asset: {asset.uri}")
348
except ValueError as e:
349
print(f"Error: {e}")
350
```