or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

asset-management.mdaws-integration.mdbulk-operations.mddata-retrieval.mddatabase-connection.mdindex.mdopenlineage-integration.mdschema-operations.mdsql-dialect.md

asset-management.mddocs/

0

# Asset and Dataset Management

1

2

PostgreSQL asset/dataset URI handling with validation, sanitization, and integration with Airflow's data lineage and dependency management systems. Supports both legacy dataset terminology and modern asset terminology.

3

4

## Capabilities

5

6

### URI Sanitization

7

8

Sanitize and validate PostgreSQL asset/dataset URIs for consistency and proper format.

9

10

```python { .api }

11

def sanitize_uri(uri: SplitResult) -> SplitResult:

12

"""

13

Sanitize PostgreSQL asset/dataset URIs.

14

15

Parameters:

16

- uri: SplitResult, parsed URI components from urllib.parse.urlsplit

17

18

Returns:

19

SplitResult: Sanitized URI components with validated structure

20

21

URI Format:

22

postgres://host:port/database/schema/table

23

postgresql://host:port/database/schema/table

24

25

Validation:

26

- Ensures URI contains host, database, schema, and table components

27

- Adds default port 5432 if not specified

28

- Validates path structure matches /database/schema/table format

29

30

Raises:

31

ValueError: If URI structure is invalid or missing required components

32

"""

33

```

34

35

## Usage Examples

36

37

### Basic URI Sanitization

38

39

```python

40

import urllib.parse

41

from airflow.providers.postgres.assets.postgres import sanitize_uri

42

43

# Parse and sanitize PostgreSQL URI

44

uri_str = "postgres://localhost/mydb/public/users"

45

parsed_uri = urllib.parse.urlsplit(uri_str)

46

sanitized_uri = sanitize_uri(parsed_uri)

47

48

print(f"Original: {uri_str}")

49

print(f"Sanitized: {urllib.parse.urlunsplit(sanitized_uri)}")

50

# Output: postgres://localhost:5432/mydb/public/users

51

```

52

53

### Asset Definition in DAGs

54

55

```python

56

from airflow import DAG

57

from airflow.datasets import Dataset

58

from airflow.providers.postgres.assets.postgres import sanitize_uri

59

import urllib.parse

60

61

# Define PostgreSQL dataset/asset

62

def create_postgres_asset(host, database, schema, table, port=5432):

63

uri_str = f"postgres://{host}:{port}/{database}/{schema}/{table}"

64

parsed_uri = urllib.parse.urlsplit(uri_str)

65

sanitized_uri = sanitize_uri(parsed_uri)

66

return Dataset(urllib.parse.urlunsplit(sanitized_uri))

67

68

# Create assets for data pipeline

69

users_table = create_postgres_asset("db.example.com", "app_db", "public", "users")

70

orders_table = create_postgres_asset("db.example.com", "app_db", "public", "orders")

71

analytics_table = create_postgres_asset("warehouse.example.com", "analytics", "marts", "user_orders")

72

73

# Use in DAG with data dependencies

74

with DAG("data_pipeline", schedule=None) as dag:

75

76

@task(outlets=[users_table])

77

def extract_users():

78

# Extract user data

79

pass

80

81

@task(inlets=[users_table], outlets=[orders_table])

82

def extract_orders():

83

# Extract order data

84

pass

85

86

@task(inlets=[users_table, orders_table], outlets=[analytics_table])

87

def create_analytics():

88

# Create analytics table

89

pass

90

```

91

92

### Asset URI Validation

93

94

```python

95

def validate_postgres_asset(uri_string):

96

"""Validate PostgreSQL asset URI format."""

97

try:

98

parsed_uri = urllib.parse.urlsplit(uri_string)

99

sanitized_uri = sanitize_uri(parsed_uri)

100

101

print(f"✓ Valid PostgreSQL asset URI: {urllib.parse.urlunsplit(sanitized_uri)}")

102

return True

103

104

except Exception as e:

105

print(f"✗ Invalid PostgreSQL asset URI: {e}")

106

return False

107

108

# Test various URI formats

109

test_uris = [

110

"postgres://localhost/mydb/public/users", # Valid

111

"postgresql://host:5432/db/schema/table", # Valid

112

"postgres://host/database/schema", # Invalid - missing table

113

"postgres://host/database", # Invalid - missing schema/table

114

"mysql://host/database/table" # Invalid - wrong scheme

115

]

116

117

for uri in test_uris:

118

validate_postgres_asset(uri)

119

```

120

121

### Cross-Database Asset Dependencies

122

123

```python

124

# Define assets across different databases

125

source_table = create_postgres_asset("source-db", "raw_data", "public", "events")

126

staging_table = create_postgres_asset("staging-db", "staging", "events", "processed_events")

127

warehouse_table = create_postgres_asset("warehouse-db", "analytics", "facts", "event_summary")

128

129

with DAG("cross_db_pipeline", schedule=[source_table]) as dag:

130

131

@task(inlets=[source_table], outlets=[staging_table])

132

def stage_events():

133

# Move data from source to staging

134

source_hook = PostgresHook(postgres_conn_id="source_db")

135

staging_hook = PostgresHook(postgres_conn_id="staging_db")

136

137

# Extract from source

138

data = source_hook.get_df("SELECT * FROM events WHERE processed = false")

139

140

# Load to staging

141

staging_hook.insert_rows(

142

"processed_events",

143

data.values.tolist(),

144

target_fields=list(data.columns)

145

)

146

147

@task(inlets=[staging_table], outlets=[warehouse_table])

148

def aggregate_events():

149

# Aggregate staging data to warehouse

150

pass

151

```

152

153

## Asset URI Formats

154

155

### Supported Schemes

156

157

- **postgres**: PostgreSQL URI scheme

158

- **postgresql**: Alternative PostgreSQL URI scheme

159

160

Both schemes are handled identically by the sanitize_uri function.

161

162

### URI Structure

163

164

```

165

postgres://[user[:password]@]host[:port]/database/schema/table

166

```

167

168

### Components

169

170

- **scheme**: `postgres` or `postgresql`

171

- **user**: Optional database username

172

- **password**: Optional database password

173

- **host**: Database hostname or IP address (required)

174

- **port**: Database port (defaults to 5432)

175

- **database**: Database name (required)

176

- **schema**: Schema name (required)

177

- **table**: Table name (required)

178

179

### Examples of Valid URIs

180

181

```python

182

valid_uris = [

183

"postgres://localhost:5432/myapp/public/users",

184

"postgresql://db.example.com/warehouse/sales/orders",

185

"postgres://user:pass@host:5433/db/schema/table",

186

"postgres://readonly@analytics-db/reports/monthly/revenue"

187

]

188

```

189

190

## Integration with Airflow Assets

191

192

### Asset-Aware Task Definition

193

194

```python

195

from airflow.decorators import task

196

from airflow.datasets import Dataset

197

198

# Define PostgreSQL assets

199

customer_data = Dataset("postgres://db/crm/public/customers")

200

order_data = Dataset("postgres://db/sales/public/orders")

201

report_data = Dataset("postgres://warehouse/reports/public/daily_summary")

202

203

@task(outlets=[customer_data])

204

def sync_customers():

205

"""Sync customer data from external source."""

206

hook = PostgresHook()

207

# Sync logic here

208

pass

209

210

@task(inlets=[customer_data, order_data], outlets=[report_data])

211

def generate_daily_report():

212

"""Generate daily report from customer and order data."""

213

hook = PostgresHook()

214

# Report generation logic here

215

pass

216

```

217

218

### Dataset-Triggered DAGs

219

220

```python

221

# DAG that runs when PostgreSQL assets are updated

222

with DAG(

223

"report_generator",

224

schedule=[

225

Dataset("postgres://db/sales/public/orders"),

226

Dataset("postgres://db/crm/public/customers")

227

]

228

) as report_dag:

229

230

generate_reports_task = generate_daily_report()

231

```

232

233

### Asset Lineage Tracking

234

235

```python

236

def track_data_lineage():

237

"""Example of tracking data lineage with PostgreSQL assets."""

238

239

# Source data assets

240

raw_events = Dataset("postgres://source/raw/public/events")

241

raw_users = Dataset("postgres://source/raw/public/users")

242

243

# Intermediate processing assets

244

clean_events = Dataset("postgres://staging/clean/public/events")

245

enriched_events = Dataset("postgres://staging/enriched/public/events")

246

247

# Final output assets

248

user_metrics = Dataset("postgres://warehouse/metrics/public/user_activity")

249

250

# Define processing pipeline with clear lineage

251

with DAG("event_processing_pipeline") as dag:

252

253

@task(inlets=[raw_events], outlets=[clean_events])

254

def clean_events_task():

255

# Data cleaning logic

256

pass

257

258

@task(inlets=[clean_events, raw_users], outlets=[enriched_events])

259

def enrich_events_task():

260

# Data enrichment logic

261

pass

262

263

@task(inlets=[enriched_events], outlets=[user_metrics])

264

def calculate_metrics_task():

265

# Metrics calculation logic

266

pass

267

```

268

269

## Provider Registration

270

271

The PostgreSQL provider automatically registers asset URI handlers with Airflow:

272

273

### Asset URI Registration

274

275

```python

276

# From provider.yaml - automatically registered

277

"asset-uris":

278

- schemes: [postgres, postgresql]

279

handler: airflow.providers.postgres.assets.postgres.sanitize_uri

280

281

# Legacy dataset URI support (backward compatibility)

282

"dataset-uris":

283

- schemes: [postgres, postgresql]

284

handler: airflow.providers.postgres.assets.postgres.sanitize_uri

285

```

286

287

### Handler Function

288

289

The `sanitize_uri` function is automatically called by Airflow when:

290

291

- Creating Dataset/Asset objects with postgres:// or postgresql:// URIs

292

- Validating asset dependencies in DAGs

293

- Processing asset lineage information

294

- Comparing asset URIs for dependency resolution

295

296

## Error Handling

297

298

### Common URI Validation Errors

299

300

```python

301

def handle_uri_errors():

302

"""Handle common PostgreSQL asset URI errors."""

303

304

problematic_uris = [

305

"postgres://host/db", # Missing schema/table

306

"postgres://host/db/schema", # Missing table

307

"postgres:///db/schema/table", # Missing host

308

"mysql://host/db/schema/table" # Wrong scheme

309

]

310

311

for uri_str in problematic_uris:

312

try:

313

parsed = urllib.parse.urlsplit(uri_str)

314

sanitized = sanitize_uri(parsed)

315

print(f"✓ {uri_str}")

316

except Exception as e:

317

print(f"✗ {uri_str}: {e}")

318

```

319

320

### Best Practices

321

322

```python

323

def create_safe_postgres_asset(host, database, schema, table, port=5432):

324

"""Safely create PostgreSQL asset with validation."""

325

326

# Validate inputs

327

if not all([host, database, schema, table]):

328

raise ValueError("All components (host, database, schema, table) are required")

329

330

# Construct URI

331

uri_str = f"postgres://{host}:{port}/{database}/{schema}/{table}"

332

333

try:

334

# Parse and sanitize

335

parsed_uri = urllib.parse.urlsplit(uri_str)

336

sanitized_uri = sanitize_uri(parsed_uri)

337

338

# Create Dataset

339

return Dataset(urllib.parse.urlunsplit(sanitized_uri))

340

341

except Exception as e:

342

raise ValueError(f"Failed to create PostgreSQL asset: {e}")

343

344

# Safe usage

345

try:

346

asset = create_safe_postgres_asset("db.example.com", "myapp", "public", "users")

347

print(f"Created asset: {asset.uri}")

348

except ValueError as e:

349

print(f"Error: {e}")

350

```