or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

asset-management.mddata-transfers.mddatabase-operations.mdindex.md

asset-management.mddocs/

0

# Asset Management

1

2

URI validation and management for Trino data assets. Provides standardized handling of Trino URIs with proper format validation, default port configuration, and integration with Airflow's asset and dataset management systems.

3

4

## Capabilities

5

6

### URI Sanitization

7

8

Validates and sanitizes Trino URI format to ensure proper addressing of Trino resources.

9

10

```python { .api }

11

def sanitize_uri(uri: SplitResult) -> SplitResult:

12

"""

13

Validate and sanitize Trino URI format.

14

15

Ensures the URI follows the proper trino:// format with required components:

16

- Host must be present

17

- Port defaults to 8080 if not specified

18

- Path must contain catalog, schema, and table names

19

20

Parameters:

21

- uri: SplitResult object from urllib.parse.urlsplit()

22

23

Returns:

24

SplitResult with validated and sanitized URI components

25

26

Raises:

27

ValueError: If URI format is invalid or missing required components

28

"""

29

pass

30

```

31

32

## URI Format Requirements

33

34

Trino URIs must follow the standardized format for proper asset identification:

35

36

### Standard Format

37

```

38

trino://host:port/catalog/schema/table

39

```

40

41

### Components

42

43

- **Scheme**: Must be `trino://`

44

- **Host**: Trino coordinator hostname (required)

45

- **Port**: Trino coordinator port (defaults to 8080 if not specified)

46

- **Path**: Must contain exactly three path segments:

47

1. Catalog name

48

2. Schema name

49

3. Table name

50

51

## Usage Examples

52

53

### Basic URI Sanitization

54

55

```python

56

from urllib.parse import urlsplit

57

from airflow.providers.trino.assets.trino import sanitize_uri

58

59

# Complete URI with port

60

uri_string = "trino://trino-cluster.example.com:8080/analytics/sales/daily_transactions"

61

uri = urlsplit(uri_string)

62

sanitized_uri = sanitize_uri(uri)

63

print(f"Sanitized URI: {sanitized_uri.geturl()}")

64

```

65

66

### URI with Default Port

67

68

```python

69

from urllib.parse import urlsplit

70

from airflow.providers.trino.assets.trino import sanitize_uri

71

72

# URI without port - will default to 8080

73

uri_string = "trino://trino-cluster.example.com/warehouse/customers/profiles"

74

uri = urlsplit(uri_string)

75

sanitized_uri = sanitize_uri(uri)

76

print(f"URI with default port: {sanitized_uri.geturl()}")

77

# Output: trino://trino-cluster.example.com:8080/warehouse/customers/profiles

78

```

79

80

### URI Validation Error Handling

81

82

```python

83

from urllib.parse import urlsplit

84

from airflow.providers.trino.assets.trino import sanitize_uri

85

86

try:

87

# Invalid URI - missing host

88

invalid_uri = urlsplit("trino:///catalog/schema/table")

89

sanitize_uri(invalid_uri)

90

except ValueError as e:

91

print(f"Validation error: {e}")

92

# Output: URI format trino:// must contain a host

93

94

try:

95

# Invalid URI - incomplete path

96

invalid_uri = urlsplit("trino://host:8080/catalog/schema")

97

sanitize_uri(invalid_uri)

98

except ValueError as e:

99

print(f"Validation error: {e}")

100

# Output: URI format trino:// must contain catalog, schema, and table names

101

```

102

103

## Integration with Airflow Assets

104

105

The sanitized URIs integrate with Airflow's asset and dataset management for data lineage tracking:

106

107

### Asset Definition

108

109

```python

110

from airflow import DAG, Dataset

111

from airflow.providers.trino.assets.trino import sanitize_uri

112

from urllib.parse import urlsplit

113

114

# Define Trino dataset

115

trino_uri = "trino://production-cluster:8080/analytics/sales/daily_revenue"

116

parsed_uri = urlsplit(trino_uri)

117

sanitized_uri = sanitize_uri(parsed_uri)

118

119

# Create Airflow dataset

120

sales_dataset = Dataset(sanitized_uri.geturl())

121

```

122

123

### Asset-Aware DAG

124

125

```python

126

from airflow import DAG, Dataset

127

from airflow.operators.python import PythonOperator

128

from airflow.providers.trino.hooks.trino import TrinoHook

129

from datetime import datetime

130

131

# Define datasets

132

input_dataset = Dataset("trino://cluster:8080/raw/transactions/daily")

133

output_dataset = Dataset("trino://cluster:8080/analytics/sales/summary")

134

135

def process_sales_data():

136

hook = TrinoHook(trino_conn_id='trino_default')

137

138

# Transform data

139

sql = """

140

INSERT INTO analytics.sales.summary

141

SELECT

142

date_trunc('day', transaction_time) as date,

143

sum(amount) as total_revenue,

144

count(*) as transaction_count

145

FROM raw.transactions.daily

146

WHERE transaction_time >= current_date

147

GROUP BY 1

148

"""

149

150

hook.run(sql)

151

152

dag = DAG(

153

'sales_processing',

154

start_date=datetime(2023, 1, 1),

155

schedule=[input_dataset], # Triggered by input dataset updates

156

catchup=False

157

)

158

159

process_task = PythonOperator(

160

task_id='process_sales',

161

python_callable=process_sales_data,

162

outlets=[output_dataset], # Produces output dataset

163

dag=dag

164

)

165

```

166

167

## Provider Registration

168

169

The asset URI handling is automatically registered through the provider configuration:

170

171

```python

172

# From get_provider_info()

173

{

174

"asset-uris": [

175

{"schemes": ["trino"], "handler": "airflow.providers.trino.assets.trino.sanitize_uri"}

176

],

177

"dataset-uris": [

178

{"schemes": ["trino"], "handler": "airflow.providers.trino.assets.trino.sanitize_uri"}

179

]

180

}

181

```

182

183

This registration enables Airflow to automatically use the sanitization function for any `trino://` URIs in asset and dataset definitions.

184

185

## Error Scenarios

186

187

The sanitization function handles several error conditions:

188

189

### Missing Host

190

191

```python

192

# This will raise ValueError

193

uri = urlsplit("trino:///catalog/schema/table")

194

sanitize_uri(uri) # ValueError: URI format trino:// must contain a host

195

```

196

197

### Incomplete Path

198

199

```python

200

# This will raise ValueError - missing table name

201

uri = urlsplit("trino://host:8080/catalog/schema")

202

sanitize_uri(uri) # ValueError: URI format trino:// must contain catalog, schema, and table names

203

204

# This will raise ValueError - missing schema and table

205

uri = urlsplit("trino://host:8080/catalog")

206

sanitize_uri(uri) # ValueError: URI format trino:// must contain catalog, schema, and table names

207

```

208

209

### Valid URI Examples

210

211

```python

212

# All of these are valid and will be properly sanitized:

213

214

# With explicit port

215

"trino://cluster.example.com:8080/catalog/schema/table"

216

217

# Without port (defaults to 8080)

218

"trino://cluster.example.com/catalog/schema/table"

219

220

# With complex names

221

"trino://trino-prod.company.com:8443/data_warehouse/customer_analytics/daily_active_users"

222

223

# With numeric components

224

"trino://trino01.internal:9000/db2023/schema_v2/table_001"

225

```

226

227

## Best Practices

228

229

### URI Construction

230

231

1. **Always include scheme**: Use `trino://` prefix

232

2. **Specify host clearly**: Use fully qualified domain names when possible

233

3. **Use default port**: Omit port if using standard 8080

234

4. **Follow naming conventions**: Use consistent catalog/schema/table naming

235

236

### Error Handling

237

238

```python

239

from urllib.parse import urlsplit

240

from airflow.providers.trino.assets.trino import sanitize_uri

241

242

def create_trino_dataset(uri_string: str):

243

"""Create a validated Trino dataset."""

244

try:

245

uri = urlsplit(uri_string)

246

sanitized_uri = sanitize_uri(uri)

247

return Dataset(sanitized_uri.geturl())

248

except ValueError as e:

249

raise ValueError(f"Invalid Trino URI '{uri_string}': {e}")

250

251

# Usage

252

try:

253

dataset = create_trino_dataset("trino://cluster/analytics/sales/daily")

254

print(f"Created dataset: {dataset.uri}")

255

except ValueError as e:

256

print(f"Dataset creation failed: {e}")

257

```

258

259

### Integration Testing

260

261

```python

262

def test_trino_uri_sanitization():

263

"""Test URI sanitization behavior."""

264

from urllib.parse import urlsplit

265

from airflow.providers.trino.assets.trino import sanitize_uri

266

267

# Test valid URI

268

uri = urlsplit("trino://host/cat/sch/tbl")

269

result = sanitize_uri(uri)

270

assert result.port == 8080

271

assert result.netloc == "host:8080"

272

273

# Test error cases

274

try:

275

sanitize_uri(urlsplit("trino:///cat/sch/tbl"))

276

assert False, "Should have raised ValueError"

277

except ValueError:

278

pass # Expected

279

```