0
# Asset URI Handling
1
2
URI sanitization and validation functionality for MySQL and MariaDB assets in Airflow's dataset and asset tracking system. This module ensures proper URI format and provides default configuration for MySQL/MariaDB dataset URIs.
3
4
## Capabilities
5
6
### URI Sanitization
7
8
Sanitize and validate MySQL/MariaDB URIs for use in Airflow's asset and dataset tracking systems.
9
10
```python { .api }
11
def sanitize_uri(uri: SplitResult) -> SplitResult:
12
"""
13
Sanitize MySQL/MariaDB URI for asset handling.
14
15
Validates URI format and applies default port configuration.
16
Ensures URI contains required components for MySQL asset tracking.
17
18
Parameters:
19
- uri: SplitResult object representing the URI to sanitize
20
21
Returns:
22
SplitResult object with sanitized URI components
23
24
Raises:
25
ValueError: If URI format is invalid (missing host, database, or table)
26
27
URI Format Requirements:
28
- Must contain a host (netloc)
29
- Must contain database and table names in path
30
- Port defaults to 3306 if not specified
31
- Scheme is normalized to "mysql"
32
"""
33
```
34
35
## Usage Examples
36
37
### Basic URI Sanitization
38
39
```python
40
from urllib.parse import urlsplit
41
from airflow.providers.mysql.assets.mysql import sanitize_uri
42
43
# Sanitize a MySQL URI
44
raw_uri = "mysql://user:pass@localhost/mydb/users"
45
uri_parts = urlsplit(raw_uri)
46
sanitized_uri = sanitize_uri(uri_parts)
47
48
print(sanitized_uri.geturl()) # mysql://user:pass@localhost:3306/mydb/users
49
```
50
51
### URI Validation and Error Handling
52
53
```python
54
from urllib.parse import urlsplit
55
from airflow.providers.mysql.assets.mysql import sanitize_uri
56
57
# Handle invalid URIs
58
try:
59
# Missing host
60
invalid_uri = urlsplit("mysql:///database/table")
61
sanitize_uri(invalid_uri)
62
except ValueError as e:
63
print(f"Invalid URI: {e}") # "URI format mysql:// must contain a host"
64
65
try:
66
# Missing table name
67
invalid_uri = urlsplit("mysql://host/database")
68
sanitize_uri(invalid_uri)
69
except ValueError as e:
70
print(f"Invalid URI: {e}") # "URI format mysql:// must contain database and table names"
71
```
72
73
### Asset Registration in DAGs
74
75
```python
76
from airflow import DAG, Dataset
77
from airflow.providers.mysql.transfers.s3_to_mysql import S3ToMySqlOperator
78
from datetime import datetime
79
80
# Define MySQL dataset URIs (automatically sanitized by provider)
81
user_dataset = Dataset("mysql://localhost/mydb/users")
82
orders_dataset = Dataset("mysql://localhost/mydb/orders")
83
84
dag = DAG(
85
'mysql_data_pipeline',
86
start_date=datetime(2024, 1, 1),
87
schedule=[user_dataset] # Schedule based on dataset updates
88
)
89
90
# Operator that produces dataset
91
load_users = S3ToMySqlOperator(
92
task_id='load_users',
93
s3_source_key='data/users.csv',
94
mysql_table='mydb.users',
95
outlets=[user_dataset], # Mark as producing this dataset
96
dag=dag
97
)
98
99
# Operator that consumes dataset
100
process_orders = S3ToMySqlOperator(
101
task_id='process_orders',
102
s3_source_key='data/orders.csv',
103
mysql_table='mydb.orders',
104
outlets=[orders_dataset],
105
dag=dag
106
)
107
108
load_users >> process_orders
109
```
110
111
### MariaDB URI Handling
112
113
```python
114
from urllib.parse import urlsplit
115
from airflow.providers.mysql.assets.mysql import sanitize_uri
116
117
# MariaDB URIs are handled identically to MySQL
118
mariadb_uri = urlsplit("mariadb://user:pass@mariadb-server/analytics/metrics")
119
sanitized_uri = sanitize_uri(mariadb_uri)
120
121
# Scheme is normalized to "mysql" for consistency
122
print(sanitized_uri.scheme) # "mysql"
123
print(sanitized_uri.netloc) # "user:pass@mariadb-server:3306"
124
```
125
126
### Custom Port Configuration
127
128
```python
129
from urllib.parse import urlsplit
130
from airflow.providers.mysql.assets.mysql import sanitize_uri
131
132
# URI with custom port (preserved)
133
custom_port_uri = urlsplit("mysql://localhost:3307/mydb/table")
134
sanitized_uri = sanitize_uri(custom_port_uri)
135
print(sanitized_uri.netloc) # "localhost:3307"
136
137
# URI without port (default 3306 added)
138
no_port_uri = urlsplit("mysql://localhost/mydb/table")
139
sanitized_uri = sanitize_uri(no_port_uri)
140
print(sanitized_uri.netloc) # "localhost:3306"
141
```
142
143
## Asset URI Format Specification
144
145
### Valid URI Components
146
147
MySQL/MariaDB asset URIs must follow this format:
148
149
```
150
mysql://[user[:password]@]host[:port]/database/table
151
```
152
153
Components:
154
- **Scheme**: `mysql` or `mariadb` (normalized to `mysql`)
155
- **User**: Optional database username
156
- **Password**: Optional database password
157
- **Host**: Required MySQL/MariaDB server hostname or IP
158
- **Port**: Optional port number (defaults to 3306)
159
- **Database**: Required database name
160
- **Table**: Required table name
161
162
### URI Validation Rules
163
164
```python { .api }
165
# URI validation requirements
166
URIValidationRules = {
167
"host_required": True, # URI must contain netloc (host)
168
"default_port": 3306, # Default port if not specified
169
"path_components": 3, # Must have exactly 3 path components
170
"path_format": "/database/table", # Required path structure
171
"supported_schemes": ["mysql", "mariadb"], # Supported input schemes
172
"normalized_scheme": "mysql" # Output scheme normalization
173
}
174
```
175
176
### Common URI Patterns
177
178
```python
179
# Valid URI examples
180
valid_uris = [
181
"mysql://user:pass@localhost:3306/mydb/users",
182
"mysql://localhost/analytics/daily_metrics",
183
"mariadb://user@mariadb-server:3307/warehouse/facts",
184
"mysql://10.0.1.100/inventory/products"
185
]
186
187
# Invalid URI examples (will raise ValueError)
188
invalid_uris = [
189
"mysql:///database/table", # Missing host
190
"mysql://localhost/database", # Missing table name
191
"mysql://localhost/db/table/col", # Too many path components
192
"mysql://localhost", # Missing database and table
193
]
194
```
195
196
## Integration with Airflow Assets
197
198
### Asset Definition
199
200
```python
201
from airflow import Dataset
202
203
# Assets are automatically sanitized when defined
204
mysql_asset = Dataset("mysql://localhost/mydb/users")
205
206
# Equivalent MariaDB asset (normalized to mysql://)
207
mariadb_asset = Dataset("mariadb://localhost/mydb/users")
208
```
209
210
### Asset-Aware DAG Scheduling
211
212
```python
213
from airflow import DAG, Dataset
214
from datetime import datetime
215
216
# Define datasets
217
source_data = Dataset("mysql://localhost/raw/events")
218
processed_data = Dataset("mysql://localhost/analytics/event_summary")
219
220
# DAG scheduled on dataset updates
221
processing_dag = DAG(
222
'data_processing',
223
start_date=datetime(2024, 1, 1),
224
schedule=[source_data], # Triggered when source_data is updated
225
catchup=False
226
)
227
```
228
229
## Type Definitions
230
231
```python { .api }
232
from urllib.parse import SplitResult
233
from typing import Union
234
235
# URI parsing result type
236
URISplit = SplitResult
237
238
# Supported URI schemes for MySQL assets
239
MySQLSchemes = Union["mysql", "mariadb"]
240
241
# URI validation error types
242
URIValidationError = ValueError
243
244
# Asset URI components
245
AssetURIComponents = {
246
"scheme": str, # URI scheme (mysql/mariadb)
247
"netloc": str, # Network location (user:pass@host:port)
248
"path": str, # Path (/database/table)
249
"query": str, # Query parameters (optional)
250
"fragment": str # Fragment identifier (optional)
251
}
252
```