or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

asset-uri-handling.mddata-transfer-operations.mddatabase-operations.mdindex.md

asset-uri-handling.mddocs/

0

# Asset URI Handling

1

2

URI sanitization and validation functionality for MySQL and MariaDB assets in Airflow's dataset and asset tracking system. This module ensures proper URI format and provides default configuration for MySQL/MariaDB dataset URIs.

3

4

## Capabilities

5

6

### URI Sanitization

7

8

Sanitize and validate MySQL/MariaDB URIs for use in Airflow's asset and dataset tracking systems.

9

10

```python { .api }

11

def sanitize_uri(uri: SplitResult) -> SplitResult:

12

"""

13

Sanitize MySQL/MariaDB URI for asset handling.

14

15

Validates URI format and applies default port configuration.

16

Ensures URI contains required components for MySQL asset tracking.

17

18

Parameters:

19

- uri: SplitResult object representing the URI to sanitize

20

21

Returns:

22

SplitResult object with sanitized URI components

23

24

Raises:

25

ValueError: If URI format is invalid (missing host, database, or table)

26

27

URI Format Requirements:

28

- Must contain a host (netloc)

29

- Must contain database and table names in path

30

- Port defaults to 3306 if not specified

31

- Scheme is normalized to "mysql"

32

"""

33

```

34

35

## Usage Examples

36

37

### Basic URI Sanitization

38

39

```python

40

from urllib.parse import urlsplit

41

from airflow.providers.mysql.assets.mysql import sanitize_uri

42

43

# Sanitize a MySQL URI

44

raw_uri = "mysql://user:pass@localhost/mydb/users"

45

uri_parts = urlsplit(raw_uri)

46

sanitized_uri = sanitize_uri(uri_parts)

47

48

print(sanitized_uri.geturl()) # mysql://user:pass@localhost:3306/mydb/users

49

```

50

51

### URI Validation and Error Handling

52

53

```python

54

from urllib.parse import urlsplit

55

from airflow.providers.mysql.assets.mysql import sanitize_uri

56

57

# Handle invalid URIs

58

try:

59

# Missing host

60

invalid_uri = urlsplit("mysql:///database/table")

61

sanitize_uri(invalid_uri)

62

except ValueError as e:

63

print(f"Invalid URI: {e}") # "URI format mysql:// must contain a host"

64

65

try:

66

# Missing table name

67

invalid_uri = urlsplit("mysql://host/database")

68

sanitize_uri(invalid_uri)

69

except ValueError as e:

70

print(f"Invalid URI: {e}") # "URI format mysql:// must contain database and table names"

71

```

72

73

### Asset Registration in DAGs

74

75

```python

76

from airflow import DAG, Dataset

77

from airflow.providers.mysql.transfers.s3_to_mysql import S3ToMySqlOperator

78

from datetime import datetime

79

80

# Define MySQL dataset URIs (automatically sanitized by provider)

81

user_dataset = Dataset("mysql://localhost/mydb/users")

82

orders_dataset = Dataset("mysql://localhost/mydb/orders")

83

84

dag = DAG(

85

'mysql_data_pipeline',

86

start_date=datetime(2024, 1, 1),

87

schedule=[user_dataset] # Schedule based on dataset updates

88

)

89

90

# Operator that produces dataset

91

load_users = S3ToMySqlOperator(

92

task_id='load_users',

93

s3_source_key='data/users.csv',

94

mysql_table='mydb.users',

95

outlets=[user_dataset], # Mark as producing this dataset

96

dag=dag

97

)

98

99

# Operator that consumes dataset

100

process_orders = S3ToMySqlOperator(

101

task_id='process_orders',

102

s3_source_key='data/orders.csv',

103

mysql_table='mydb.orders',

104

outlets=[orders_dataset],

105

dag=dag

106

)

107

108

load_users >> process_orders

109

```

110

111

### MariaDB URI Handling

112

113

```python

114

from urllib.parse import urlsplit

115

from airflow.providers.mysql.assets.mysql import sanitize_uri

116

117

# MariaDB URIs are handled identically to MySQL

118

mariadb_uri = urlsplit("mariadb://user:pass@mariadb-server/analytics/metrics")

119

sanitized_uri = sanitize_uri(mariadb_uri)

120

121

# Scheme is normalized to "mysql" for consistency

122

print(sanitized_uri.scheme) # "mysql"

123

print(sanitized_uri.netloc) # "user:pass@mariadb-server:3306"

124

```

125

126

### Custom Port Configuration

127

128

```python

129

from urllib.parse import urlsplit

130

from airflow.providers.mysql.assets.mysql import sanitize_uri

131

132

# URI with custom port (preserved)

133

custom_port_uri = urlsplit("mysql://localhost:3307/mydb/table")

134

sanitized_uri = sanitize_uri(custom_port_uri)

135

print(sanitized_uri.netloc) # "localhost:3307"

136

137

# URI without port (default 3306 added)

138

no_port_uri = urlsplit("mysql://localhost/mydb/table")

139

sanitized_uri = sanitize_uri(no_port_uri)

140

print(sanitized_uri.netloc) # "localhost:3306"

141

```

142

143

## Asset URI Format Specification

144

145

### Valid URI Components

146

147

MySQL/MariaDB asset URIs must follow this format:

148

149

```

150

mysql://[user[:password]@]host[:port]/database/table

151

```

152

153

Components:

154

- **Scheme**: `mysql` or `mariadb` (normalized to `mysql`)

155

- **User**: Optional database username

156

- **Password**: Optional database password

157

- **Host**: Required MySQL/MariaDB server hostname or IP

158

- **Port**: Optional port number (defaults to 3306)

159

- **Database**: Required database name

160

- **Table**: Required table name

161

162

### URI Validation Rules

163

164

```python { .api }

165

# URI validation requirements

166

URIValidationRules = {

167

"host_required": True, # URI must contain netloc (host)

168

"default_port": 3306, # Default port if not specified

169

"path_components": 3, # Must have exactly 3 path components

170

"path_format": "/database/table", # Required path structure

171

"supported_schemes": ["mysql", "mariadb"], # Supported input schemes

172

"normalized_scheme": "mysql" # Output scheme normalization

173

}

174

```

175

176

### Common URI Patterns

177

178

```python

179

# Valid URI examples

180

valid_uris = [

181

"mysql://user:pass@localhost:3306/mydb/users",

182

"mysql://localhost/analytics/daily_metrics",

183

"mariadb://user@mariadb-server:3307/warehouse/facts",

184

"mysql://10.0.1.100/inventory/products"

185

]

186

187

# Invalid URI examples (will raise ValueError)

188

invalid_uris = [

189

"mysql:///database/table", # Missing host

190

"mysql://localhost/database", # Missing table name

191

"mysql://localhost/db/table/col", # Too many path components

192

"mysql://localhost", # Missing database and table

193

]

194

```

195

196

## Integration with Airflow Assets

197

198

### Asset Definition

199

200

```python

201

from airflow import Dataset

202

203

# Assets are automatically sanitized when defined

204

mysql_asset = Dataset("mysql://localhost/mydb/users")

205

206

# Equivalent MariaDB asset (normalized to mysql://)

207

mariadb_asset = Dataset("mariadb://localhost/mydb/users")

208

```

209

210

### Asset-Aware DAG Scheduling

211

212

```python

213

from airflow import DAG, Dataset

214

from datetime import datetime

215

216

# Define datasets

217

source_data = Dataset("mysql://localhost/raw/events")

218

processed_data = Dataset("mysql://localhost/analytics/event_summary")

219

220

# DAG scheduled on dataset updates

221

processing_dag = DAG(

222

'data_processing',

223

start_date=datetime(2024, 1, 1),

224

schedule=[source_data], # Triggered when source_data is updated

225

catchup=False

226

)

227

```

228

229

## Type Definitions

230

231

```python { .api }

232

from urllib.parse import SplitResult

233

from typing import Union

234

235

# URI parsing result type

236

URISplit = SplitResult

237

238

# Supported URI schemes for MySQL assets

239

MySQLSchemes = Union["mysql", "mariadb"]

240

241

# URI validation error types

242

URIValidationError = ValueError

243

244

# Asset URI components

245

AssetURIComponents = {

246

"scheme": str, # URI scheme (mysql/mariadb)

247

"netloc": str, # Network location (user:pass@host:port)

248

"path": str, # Path (/database/table)

249

"query": str, # Query parameters (optional)

250

"fragment": str # Fragment identifier (optional)

251

}

252

```