0
# Asset Management
1
2
URI validation and management for Trino data assets. Provides standardized handling of Trino URIs with proper format validation, default port configuration, and integration with Airflow's asset and dataset management systems.
3
4
## Capabilities
5
6
### URI Sanitization
7
8
Validates and sanitizes Trino URI format to ensure proper addressing of Trino resources.
9
10
```python { .api }
11
def sanitize_uri(uri: SplitResult) -> SplitResult:
12
"""
13
Validate and sanitize Trino URI format.
14
15
Ensures the URI follows the proper trino:// format with required components:
16
- Host must be present
17
- Port defaults to 8080 if not specified
18
- Path must contain catalog, schema, and table names
19
20
Parameters:
21
- uri: SplitResult object from urllib.parse.urlsplit()
22
23
Returns:
24
SplitResult with validated and sanitized URI components
25
26
Raises:
27
ValueError: If URI format is invalid or missing required components
28
"""
29
pass
30
```
31
32
## URI Format Requirements
33
34
Trino URIs must follow the standardized format for proper asset identification:
35
36
### Standard Format
37
```
38
trino://host:port/catalog/schema/table
39
```
40
41
### Components
42
43
- **Scheme**: Must be `trino://`
44
- **Host**: Trino coordinator hostname (required)
45
- **Port**: Trino coordinator port (defaults to 8080 if not specified)
46
- **Path**: Must contain exactly three path segments:
47
1. Catalog name
48
2. Schema name
49
3. Table name
50
51
## Usage Examples
52
53
### Basic URI Sanitization
54
55
```python
56
from urllib.parse import urlsplit
57
from airflow.providers.trino.assets.trino import sanitize_uri
58
59
# Complete URI with port
60
uri_string = "trino://trino-cluster.example.com:8080/analytics/sales/daily_transactions"
61
uri = urlsplit(uri_string)
62
sanitized_uri = sanitize_uri(uri)
63
print(f"Sanitized URI: {sanitized_uri.geturl()}")
64
```
65
66
### URI with Default Port
67
68
```python
69
from urllib.parse import urlsplit
70
from airflow.providers.trino.assets.trino import sanitize_uri
71
72
# URI without port - will default to 8080
73
uri_string = "trino://trino-cluster.example.com/warehouse/customers/profiles"
74
uri = urlsplit(uri_string)
75
sanitized_uri = sanitize_uri(uri)
76
print(f"URI with default port: {sanitized_uri.geturl()}")
77
# Output: trino://trino-cluster.example.com:8080/warehouse/customers/profiles
78
```
79
80
### URI Validation Error Handling
81
82
```python
83
from urllib.parse import urlsplit
84
from airflow.providers.trino.assets.trino import sanitize_uri
85
86
try:
87
# Invalid URI - missing host
88
invalid_uri = urlsplit("trino:///catalog/schema/table")
89
sanitize_uri(invalid_uri)
90
except ValueError as e:
91
print(f"Validation error: {e}")
92
# Output: URI format trino:// must contain a host
93
94
try:
95
# Invalid URI - incomplete path
96
invalid_uri = urlsplit("trino://host:8080/catalog/schema")
97
sanitize_uri(invalid_uri)
98
except ValueError as e:
99
print(f"Validation error: {e}")
100
# Output: URI format trino:// must contain catalog, schema, and table names
101
```
102
103
## Integration with Airflow Assets
104
105
The sanitized URIs integrate with Airflow's asset and dataset management for data lineage tracking:
106
107
### Asset Definition
108
109
```python
110
from airflow import DAG, Dataset
111
from airflow.providers.trino.assets.trino import sanitize_uri
112
from urllib.parse import urlsplit
113
114
# Define Trino dataset
115
trino_uri = "trino://production-cluster:8080/analytics/sales/daily_revenue"
116
parsed_uri = urlsplit(trino_uri)
117
sanitized_uri = sanitize_uri(parsed_uri)
118
119
# Create Airflow dataset
120
sales_dataset = Dataset(sanitized_uri.geturl())
121
```
122
123
### Asset-Aware DAG
124
125
```python
126
from airflow import DAG, Dataset
127
from airflow.operators.python import PythonOperator
128
from airflow.providers.trino.hooks.trino import TrinoHook
129
from datetime import datetime
130
131
# Define datasets
132
input_dataset = Dataset("trino://cluster:8080/raw/transactions/daily")
133
output_dataset = Dataset("trino://cluster:8080/analytics/sales/summary")
134
135
def process_sales_data():
136
hook = TrinoHook(trino_conn_id='trino_default')
137
138
# Transform data
139
sql = """
140
INSERT INTO analytics.sales.summary
141
SELECT
142
date_trunc('day', transaction_time) as date,
143
sum(amount) as total_revenue,
144
count(*) as transaction_count
145
FROM raw.transactions.daily
146
WHERE transaction_time >= current_date
147
GROUP BY 1
148
"""
149
150
hook.run(sql)
151
152
dag = DAG(
153
'sales_processing',
154
start_date=datetime(2023, 1, 1),
155
schedule=[input_dataset], # Triggered by input dataset updates
156
catchup=False
157
)
158
159
process_task = PythonOperator(
160
task_id='process_sales',
161
python_callable=process_sales_data,
162
outlets=[output_dataset], # Produces output dataset
163
dag=dag
164
)
165
```
166
167
## Provider Registration
168
169
The asset URI handling is automatically registered through the provider configuration:
170
171
```python
172
# From get_provider_info()
173
{
174
"asset-uris": [
175
{"schemes": ["trino"], "handler": "airflow.providers.trino.assets.trino.sanitize_uri"}
176
],
177
"dataset-uris": [
178
{"schemes": ["trino"], "handler": "airflow.providers.trino.assets.trino.sanitize_uri"}
179
]
180
}
181
```
182
183
This registration enables Airflow to automatically use the sanitization function for any `trino://` URIs in asset and dataset definitions.
184
185
## Error Scenarios
186
187
The sanitization function handles several error conditions:
188
189
### Missing Host
190
191
```python
192
# This will raise ValueError
193
uri = urlsplit("trino:///catalog/schema/table")
194
sanitize_uri(uri) # ValueError: URI format trino:// must contain a host
195
```
196
197
### Incomplete Path
198
199
```python
200
# This will raise ValueError - missing table name
201
uri = urlsplit("trino://host:8080/catalog/schema")
202
sanitize_uri(uri) # ValueError: URI format trino:// must contain catalog, schema, and table names
203
204
# This will raise ValueError - missing schema and table
205
uri = urlsplit("trino://host:8080/catalog")
206
sanitize_uri(uri) # ValueError: URI format trino:// must contain catalog, schema, and table names
207
```
208
209
### Valid URI Examples
210
211
```python
212
# All of these are valid and will be properly sanitized:
213
214
# With explicit port
215
"trino://cluster.example.com:8080/catalog/schema/table"
216
217
# Without port (defaults to 8080)
218
"trino://cluster.example.com/catalog/schema/table"
219
220
# With complex names
221
"trino://trino-prod.company.com:8443/data_warehouse/customer_analytics/daily_active_users"
222
223
# With numeric components
224
"trino://trino01.internal:9000/db2023/schema_v2/table_001"
225
```
226
227
## Best Practices
228
229
### URI Construction
230
231
1. **Always include scheme**: Use `trino://` prefix
232
2. **Specify host clearly**: Use fully qualified domain names when possible
233
3. **Use default port**: Omit port if using standard 8080
234
4. **Follow naming conventions**: Use consistent catalog/schema/table naming
235
236
### Error Handling
237
238
```python
239
from urllib.parse import urlsplit
240
from airflow.providers.trino.assets.trino import sanitize_uri
241
242
def create_trino_dataset(uri_string: str):
243
"""Create a validated Trino dataset."""
244
try:
245
uri = urlsplit(uri_string)
246
sanitized_uri = sanitize_uri(uri)
247
return Dataset(sanitized_uri.geturl())
248
except ValueError as e:
249
raise ValueError(f"Invalid Trino URI '{uri_string}': {e}")
250
251
# Usage
252
try:
253
dataset = create_trino_dataset("trino://cluster/analytics/sales/daily")
254
print(f"Created dataset: {dataset.uri}")
255
except ValueError as e:
256
print(f"Dataset creation failed: {e}")
257
```
258
259
### Integration Testing
260
261
```python
262
def test_trino_uri_sanitization():
263
"""Test URI sanitization behavior."""
264
from urllib.parse import urlsplit
265
from airflow.providers.trino.assets.trino import sanitize_uri
266
267
# Test valid URI
268
uri = urlsplit("trino://host/cat/sch/tbl")
269
result = sanitize_uri(uri)
270
assert result.port == 8080
271
assert result.netloc == "host:8080"
272
273
# Test error cases
274
try:
275
sanitize_uri(urlsplit("trino:///cat/sch/tbl"))
276
assert False, "Should have raised ValueError"
277
except ValueError:
278
pass # Expected
279
```