0
# Asset Reporting
1
2
HTTP endpoints for external systems to report asset events directly to Dagster instances. Enables integration with external data pipelines, monitoring systems, and third-party tools that need to communicate asset state changes to Dagster.
3
4
## Capabilities
5
6
### Asset Materialization Reporting
7
8
Report when external systems have materialized (created or updated) assets, allowing Dagster to track data lineage and freshness across the entire data ecosystem.
9
10
```python { .api }
11
async def handle_report_asset_materialization_request(
12
context: BaseWorkspaceRequestContext,
13
request: Request
14
) -> JSONResponse:
15
"""
16
Handle asset materialization reporting via HTTP.
17
18
Args:
19
context: Workspace request context with instance access
20
request: HTTP request containing asset materialization data
21
22
Returns:
23
JSONResponse: Success/error response with event details
24
"""
25
```
26
27
**Endpoint**: `POST /report_asset_materialization/{asset_key:path}`
28
29
**Usage Examples:**
30
31
```python
32
import requests
33
import json
34
35
# Basic asset materialization report
36
response = requests.post(
37
"http://localhost:3000/report_asset_materialization/my_dataset",
38
json={
39
"metadata": {
40
"rows": 1000,
41
"columns": 10,
42
"file_size": "5.2MB"
43
},
44
"description": "Daily ETL run completed successfully"
45
}
46
)
47
48
# With data version and partition
49
response = requests.post(
50
"http://localhost:3000/report_asset_materialization/sales/daily_summary",
51
json={
52
"data_version": "v1.2.3",
53
"partition": "2024-01-15",
54
"metadata": {
55
"total_sales": 50000.00,
56
"transaction_count": 1250
57
}
58
}
59
)
60
61
# Query parameters instead of JSON body
62
response = requests.post(
63
"http://localhost:3000/report_asset_materialization/my_asset",
64
params={
65
"data_version": "v1.0.0",
66
"description": "Processed via external pipeline"
67
}
68
)
69
```
70
71
### Asset Check Reporting
72
73
Report the results of data quality checks or validation tests performed by external systems.
74
75
```python { .api }
76
async def handle_report_asset_check_request(
77
context: BaseWorkspaceRequestContext,
78
request: Request
79
) -> JSONResponse:
80
"""
81
Handle asset check evaluation reporting via HTTP.
82
83
Args:
84
context: Workspace request context with instance access
85
request: HTTP request containing asset check evaluation data
86
87
Returns:
88
JSONResponse: Success/error response with check details
89
"""
90
```
91
92
**Endpoint**: `POST /report_asset_check/{asset_key:path}`
93
94
**Usage Examples:**
95
96
```python
97
# Successful data quality check
98
response = requests.post(
99
"http://localhost:3000/report_asset_check/customer_data",
100
json={
101
"check_name": "null_values_check",
102
"passed": True,
103
"metadata": {
104
"null_count": 0,
105
"total_rows": 10000,
106
"check_duration": "0.5s"
107
}
108
}
109
)
110
111
# Failed data quality check
112
response = requests.post(
113
"http://localhost:3000/report_asset_check/product_catalog",
114
json={
115
"check_name": "price_validation",
116
"passed": False,
117
"severity": "ERROR",
118
"metadata": {
119
"invalid_prices": 15,
120
"error_details": "Negative prices found in products"
121
}
122
}
123
)
124
125
# Warning level check
126
response = requests.post(
127
"http://localhost:3000/report_asset_check/user_events",
128
json={
129
"check_name": "volume_check",
130
"passed": True,
131
"severity": "WARN",
132
"metadata": {
133
"daily_volume": 50000,
134
"expected_min": 75000,
135
"note": "Volume below expected threshold"
136
}
137
}
138
)
139
```
140
141
### Asset Observation Reporting
142
143
Report observations about asset state without indicating materialization, useful for monitoring and metadata collection.
144
145
```python { .api }
146
async def handle_report_asset_observation_request(
147
context: BaseWorkspaceRequestContext,
148
request: Request
149
) -> JSONResponse:
150
"""
151
Handle asset observation reporting via HTTP.
152
153
Args:
154
context: Workspace request context with instance access
155
request: HTTP request containing asset observation data
156
157
Returns:
158
JSONResponse: Success/error response with observation details
159
"""
160
```
161
162
**Endpoint**: `POST /report_asset_observation/{asset_key:path}`
163
164
**Usage Examples:**
165
166
```python
167
# Schema change observation
168
response = requests.post(
169
"http://localhost:3000/report_asset_observation/warehouse/customers",
170
json={
171
"metadata": {
172
"schema_version": "v2.1",
173
"columns_added": ["phone_verified", "last_login"],
174
"migration_applied": "2024-01-15T10:30:00Z"
175
},
176
"description": "Schema migration applied successfully"
177
}
178
)
179
180
# Performance metrics observation
181
response = requests.post(
182
"http://localhost:3000/report_asset_observation/api/user_service",
183
json={
184
"metadata": {
185
"avg_response_time": "45ms",
186
"error_rate": "0.1%",
187
"throughput": "1000 req/min",
188
"monitoring_window": "1h"
189
}
190
}
191
)
192
193
# Data freshness observation
194
response = requests.post(
195
"http://localhost:3000/report_asset_observation/reports/daily_kpis",
196
json={
197
"data_version": "2024-01-15",
198
"metadata": {
199
"last_updated": "2024-01-15T23:45:00Z",
200
"lag_minutes": 15,
201
"source_system": "analytics_pipeline"
202
}
203
}
204
)
205
```
206
207
## Parameter Classes
208
209
Utility classes defining parameter names for consistent API usage:
210
211
```python { .api }
212
class ReportAssetMatParam:
213
"""Parameter names for asset materialization reporting."""
214
asset_key = "asset_key"
215
data_version = "data_version"
216
metadata = "metadata"
217
description = "description"
218
partition = "partition"
219
220
class ReportAssetCheckEvalParam:
221
"""Parameter names for asset check evaluation reporting."""
222
asset_key = "asset_key"
223
check_name = "check_name"
224
metadata = "metadata"
225
severity = "severity"
226
passed = "passed"
227
228
class ReportAssetObsParam:
229
"""Parameter names for asset observation reporting."""
230
asset_key = "asset_key"
231
data_version = "data_version"
232
metadata = "metadata"
233
description = "description"
234
partition = "partition"
235
```
236
237
## Asset Key Formats
238
239
Asset keys can be specified in multiple ways:
240
241
### URL Path Format
242
```python
243
# Single part asset key
244
POST /report_asset_materialization/my_asset
245
246
# Multi-part asset key (separated by /)
247
POST /report_asset_materialization/warehouse/customers/daily
248
```
249
250
### JSON Body Format
251
```python
252
# Multi-part asset key as array
253
{
254
"asset_key": ["warehouse", "customers", "daily"],
255
"metadata": {...}
256
}
257
```
258
259
### Query Parameter Format
260
```python
261
# Database string format
262
POST /report_asset_materialization/my_asset?asset_key=warehouse.customers.daily
263
```
264
265
## Request Formats
266
267
All endpoints support both JSON body and query parameters:
268
269
### JSON Request Body
270
```python
271
requests.post(url, json={
272
"metadata": {"key": "value"},
273
"description": "Event description"
274
})
275
```
276
277
### Query Parameters
278
```python
279
requests.post(url, params={
280
"metadata": json.dumps({"key": "value"}),
281
"description": "Event description"
282
})
283
```
284
285
### Mixed Format
286
```python
287
# JSON body takes precedence over query parameters
288
requests.post(
289
url + "?description=fallback",
290
json={"description": "primary", "metadata": {...}}
291
)
292
```
293
294
## Metadata Handling
295
296
Metadata can be any JSON-serializable data:
297
298
```python
299
# Simple metadata
300
{
301
"metadata": {
302
"rows": 1000,
303
"size_mb": 15.5,
304
"success": True
305
}
306
}
307
308
# Complex nested metadata
309
{
310
"metadata": {
311
"statistics": {
312
"mean": 42.5,
313
"std_dev": 10.2,
314
"quartiles": [35.0, 42.0, 50.0]
315
},
316
"quality_checks": [
317
{"name": "null_check", "passed": True},
318
{"name": "range_check", "passed": False, "details": "5 values out of range"}
319
],
320
"processing_time": "00:05:23",
321
"source_files": ["data1.csv", "data2.csv"]
322
}
323
}
324
```
325
326
## Error Handling
327
328
The endpoints return appropriate HTTP status codes and error messages:
329
330
```python
331
# Success response (200)
332
{
333
"status": "success",
334
"event_id": "12345",
335
"message": "Asset materialization recorded"
336
}
337
338
# Bad request (400) - missing required fields
339
{
340
"error": "Missing required field: check_name",
341
"status": "error"
342
}
343
344
# Not found (404) - invalid asset key
345
{
346
"error": "Asset key 'invalid/asset' not found",
347
"status": "error"
348
}
349
350
# Server error (500) - internal processing error
351
{
352
"error": "Failed to process asset event",
353
"status": "error"
354
}
355
```
356
357
## Authentication and Security
358
359
When deployed in production, these endpoints should be secured:
360
361
```python
362
# Example middleware for API key authentication
363
class AssetReportingAuthMiddleware(BaseHTTPMiddleware):
364
async def dispatch(self, request, call_next):
365
if request.url.path.startswith("/report_asset"):
366
api_key = request.headers.get("X-API-Key")
367
if not api_key or not validate_api_key(api_key):
368
return JSONResponse(
369
{"error": "Invalid API key"},
370
status_code=401
371
)
372
return await call_next(request)
373
374
# Deploy with authentication
375
webserver = DagsterWebserver(workspace_context)
376
app = webserver.create_asgi_app(
377
middleware=[Middleware(AssetReportingAuthMiddleware)]
378
)
379
```
380
381
## Integration Patterns
382
383
### ETL Pipeline Integration
384
```python
385
# At end of ETL job
386
def report_completion(asset_name, stats):
387
requests.post(
388
f"http://dagster-webserver/report_asset_materialization/{asset_name}",
389
json={
390
"metadata": stats,
391
"description": f"ETL completed for {asset_name}"
392
},
393
headers={"X-API-Key": os.getenv("DAGSTER_API_KEY")}
394
)
395
```
396
397
### Data Quality Monitoring
398
```python
399
# After running data quality checks
400
def report_quality_check(dataset, check_name, results):
401
requests.post(
402
f"http://dagster-webserver/report_asset_check/{dataset}",
403
json={
404
"check_name": check_name,
405
"passed": results["passed"],
406
"severity": "ERROR" if not results["passed"] else "INFO",
407
"metadata": results["details"]
408
}
409
)
410
```
411
412
### External System Monitoring
413
```python
414
# Periodic monitoring of external systems
415
def monitor_system_health(system_name):
416
health_stats = get_system_health()
417
requests.post(
418
f"http://dagster-webserver/report_asset_observation/{system_name}",
419
json={
420
"metadata": health_stats,
421
"description": "System health check"
422
}
423
)
424
```