or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application.mdasset-reporting.mdcli.mddebug.mdgraphql.mdindex.md

asset-reporting.mddocs/

0

# Asset Reporting

1

2

HTTP endpoints for external systems to report asset events directly to Dagster instances. Enables integration with external data pipelines, monitoring systems, and third-party tools that need to communicate asset state changes to Dagster.

3

4

## Capabilities

5

6

### Asset Materialization Reporting

7

8

Report when external systems have materialized (created or updated) assets, allowing Dagster to track data lineage and freshness across the entire data ecosystem.

9

10

```python { .api }

11

async def handle_report_asset_materialization_request(

12

context: BaseWorkspaceRequestContext,

13

request: Request

14

) -> JSONResponse:

15

"""

16

Handle asset materialization reporting via HTTP.

17

18

Args:

19

context: Workspace request context with instance access

20

request: HTTP request containing asset materialization data

21

22

Returns:

23

JSONResponse: Success/error response with event details

24

"""

25

```

26

27

**Endpoint**: `POST /report_asset_materialization/{asset_key:path}`

28

29

**Usage Examples:**

30

31

```python

32

import requests

33

import json

34

35

# Basic asset materialization report

36

response = requests.post(

37

"http://localhost:3000/report_asset_materialization/my_dataset",

38

json={

39

"metadata": {

40

"rows": 1000,

41

"columns": 10,

42

"file_size": "5.2MB"

43

},

44

"description": "Daily ETL run completed successfully"

45

}

46

)

47

48

# With data version and partition

49

response = requests.post(

50

"http://localhost:3000/report_asset_materialization/sales/daily_summary",

51

json={

52

"data_version": "v1.2.3",

53

"partition": "2024-01-15",

54

"metadata": {

55

"total_sales": 50000.00,

56

"transaction_count": 1250

57

}

58

}

59

)

60

61

# Query parameters instead of JSON body

62

response = requests.post(

63

"http://localhost:3000/report_asset_materialization/my_asset",

64

params={

65

"data_version": "v1.0.0",

66

"description": "Processed via external pipeline"

67

}

68

)

69

```

70

71

### Asset Check Reporting

72

73

Report the results of data quality checks or validation tests performed by external systems.

74

75

```python { .api }

76

async def handle_report_asset_check_request(

77

context: BaseWorkspaceRequestContext,

78

request: Request

79

) -> JSONResponse:

80

"""

81

Handle asset check evaluation reporting via HTTP.

82

83

Args:

84

context: Workspace request context with instance access

85

request: HTTP request containing asset check evaluation data

86

87

Returns:

88

JSONResponse: Success/error response with check details

89

"""

90

```

91

92

**Endpoint**: `POST /report_asset_check/{asset_key:path}`

93

94

**Usage Examples:**

95

96

```python

97

# Successful data quality check

98

response = requests.post(

99

"http://localhost:3000/report_asset_check/customer_data",

100

json={

101

"check_name": "null_values_check",

102

"passed": True,

103

"metadata": {

104

"null_count": 0,

105

"total_rows": 10000,

106

"check_duration": "0.5s"

107

}

108

}

109

)

110

111

# Failed data quality check

112

response = requests.post(

113

"http://localhost:3000/report_asset_check/product_catalog",

114

json={

115

"check_name": "price_validation",

116

"passed": False,

117

"severity": "ERROR",

118

"metadata": {

119

"invalid_prices": 15,

120

"error_details": "Negative prices found in products"

121

}

122

}

123

)

124

125

# Warning level check

126

response = requests.post(

127

"http://localhost:3000/report_asset_check/user_events",

128

json={

129

"check_name": "volume_check",

130

"passed": True,

131

"severity": "WARN",

132

"metadata": {

133

"daily_volume": 50000,

134

"expected_min": 75000,

135

"note": "Volume below expected threshold"

136

}

137

}

138

)

139

```

140

141

### Asset Observation Reporting

142

143

Report observations about asset state without indicating materialization, useful for monitoring and metadata collection.

144

145

```python { .api }

146

async def handle_report_asset_observation_request(

147

context: BaseWorkspaceRequestContext,

148

request: Request

149

) -> JSONResponse:

150

"""

151

Handle asset observation reporting via HTTP.

152

153

Args:

154

context: Workspace request context with instance access

155

request: HTTP request containing asset observation data

156

157

Returns:

158

JSONResponse: Success/error response with observation details

159

"""

160

```

161

162

**Endpoint**: `POST /report_asset_observation/{asset_key:path}`

163

164

**Usage Examples:**

165

166

```python

167

# Schema change observation

168

response = requests.post(

169

"http://localhost:3000/report_asset_observation/warehouse/customers",

170

json={

171

"metadata": {

172

"schema_version": "v2.1",

173

"columns_added": ["phone_verified", "last_login"],

174

"migration_applied": "2024-01-15T10:30:00Z"

175

},

176

"description": "Schema migration applied successfully"

177

}

178

)

179

180

# Performance metrics observation

181

response = requests.post(

182

"http://localhost:3000/report_asset_observation/api/user_service",

183

json={

184

"metadata": {

185

"avg_response_time": "45ms",

186

"error_rate": "0.1%",

187

"throughput": "1000 req/min",

188

"monitoring_window": "1h"

189

}

190

}

191

)

192

193

# Data freshness observation

194

response = requests.post(

195

"http://localhost:3000/report_asset_observation/reports/daily_kpis",

196

json={

197

"data_version": "2024-01-15",

198

"metadata": {

199

"last_updated": "2024-01-15T23:45:00Z",

200

"lag_minutes": 15,

201

"source_system": "analytics_pipeline"

202

}

203

}

204

)

205

```

206

207

## Parameter Classes

208

209

Utility classes defining parameter names for consistent API usage:

210

211

```python { .api }

212

class ReportAssetMatParam:

213

"""Parameter names for asset materialization reporting."""

214

asset_key = "asset_key"

215

data_version = "data_version"

216

metadata = "metadata"

217

description = "description"

218

partition = "partition"

219

220

class ReportAssetCheckEvalParam:

221

"""Parameter names for asset check evaluation reporting."""

222

asset_key = "asset_key"

223

check_name = "check_name"

224

metadata = "metadata"

225

severity = "severity"

226

passed = "passed"

227

228

class ReportAssetObsParam:

229

"""Parameter names for asset observation reporting."""

230

asset_key = "asset_key"

231

data_version = "data_version"

232

metadata = "metadata"

233

description = "description"

234

partition = "partition"

235

```

236

237

## Asset Key Formats

238

239

Asset keys can be specified in multiple ways:

240

241

### URL Path Format

242

```python

243

# Single part asset key

244

POST /report_asset_materialization/my_asset

245

246

# Multi-part asset key (separated by /)

247

POST /report_asset_materialization/warehouse/customers/daily

248

```

249

250

### JSON Body Format

251

```python

252

# Multi-part asset key as array

253

{

254

"asset_key": ["warehouse", "customers", "daily"],

255

"metadata": {...}

256

}

257

```

258

259

### Query Parameter Format

260

```python

261

# Database string format

262

POST /report_asset_materialization/my_asset?asset_key=warehouse.customers.daily

263

```

264

265

## Request Formats

266

267

All endpoints support both JSON body and query parameters:

268

269

### JSON Request Body

270

```python

271

requests.post(url, json={

272

"metadata": {"key": "value"},

273

"description": "Event description"

274

})

275

```

276

277

### Query Parameters

278

```python

279

requests.post(url, params={

280

"metadata": json.dumps({"key": "value"}),

281

"description": "Event description"

282

})

283

```

284

285

### Mixed Format

286

```python

287

# JSON body takes precedence over query parameters

288

requests.post(

289

url + "?description=fallback",

290

json={"description": "primary", "metadata": {...}}

291

)

292

```

293

294

## Metadata Handling

295

296

Metadata can be any JSON-serializable data:

297

298

```python

299

# Simple metadata

300

{

301

"metadata": {

302

"rows": 1000,

303

"size_mb": 15.5,

304

"success": True

305

}

306

}

307

308

# Complex nested metadata

309

{

310

"metadata": {

311

"statistics": {

312

"mean": 42.5,

313

"std_dev": 10.2,

314

"quartiles": [35.0, 42.0, 50.0]

315

},

316

"quality_checks": [

317

{"name": "null_check", "passed": True},

318

{"name": "range_check", "passed": False, "details": "5 values out of range"}

319

],

320

"processing_time": "00:05:23",

321

"source_files": ["data1.csv", "data2.csv"]

322

}

323

}

324

```

325

326

## Error Handling

327

328

The endpoints return appropriate HTTP status codes and error messages:

329

330

```python

331

# Success response (200)

332

{

333

"status": "success",

334

"event_id": "12345",

335

"message": "Asset materialization recorded"

336

}

337

338

# Bad request (400) - missing required fields

339

{

340

"error": "Missing required field: check_name",

341

"status": "error"

342

}

343

344

# Not found (404) - invalid asset key

345

{

346

"error": "Asset key 'invalid/asset' not found",

347

"status": "error"

348

}

349

350

# Server error (500) - internal processing error

351

{

352

"error": "Failed to process asset event",

353

"status": "error"

354

}

355

```

356

357

## Authentication and Security

358

359

When deployed in production, these endpoints should be secured:

360

361

```python

362

# Example middleware for API key authentication

363

class AssetReportingAuthMiddleware(BaseHTTPMiddleware):

364

async def dispatch(self, request, call_next):

365

if request.url.path.startswith("/report_asset"):

366

api_key = request.headers.get("X-API-Key")

367

if not api_key or not validate_api_key(api_key):

368

return JSONResponse(

369

{"error": "Invalid API key"},

370

status_code=401

371

)

372

return await call_next(request)

373

374

# Deploy with authentication

375

webserver = DagsterWebserver(workspace_context)

376

app = webserver.create_asgi_app(

377

middleware=[Middleware(AssetReportingAuthMiddleware)]

378

)

379

```

380

381

## Integration Patterns

382

383

### ETL Pipeline Integration

384

```python

385

# At end of ETL job

386

def report_completion(asset_name, stats):

387

requests.post(

388

f"http://dagster-webserver/report_asset_materialization/{asset_name}",

389

json={

390

"metadata": stats,

391

"description": f"ETL completed for {asset_name}"

392

},

393

headers={"X-API-Key": os.getenv("DAGSTER_API_KEY")}

394

)

395

```

396

397

### Data Quality Monitoring

398

```python

399

# After running data quality checks

400

def report_quality_check(dataset, check_name, results):

401

requests.post(

402

f"http://dagster-webserver/report_asset_check/{dataset}",

403

json={

404

"check_name": check_name,

405

"passed": results["passed"],

406

"severity": "ERROR" if not results["passed"] else "INFO",

407

"metadata": results["details"]

408

}

409

)

410

```

411

412

### External System Monitoring

413

```python

414

# Periodic monitoring of external systems

415

def monitor_system_health(system_name):

416

health_stats = get_system_health()

417

requests.post(

418

f"http://dagster-webserver/report_asset_observation/{system_name}",

419

json={

420

"metadata": health_stats,

421

"description": "System health check"

422

}

423

)

424

```