0
# Freshness Checks
1
2
Build asset checks for dbt source freshness validation. This module provides functionality to automatically generate Dagster asset checks from dbt source freshness configurations, enabling data quality monitoring within Dagster pipelines.
3
4
## Capabilities
5
6
### Freshness Check Builder
7
8
#### build_freshness_checks_from_dbt_assets
9
10
Creates asset checks from dbt source freshness configurations defined in dbt assets.
11
12
```python { .api }
13
def build_freshness_checks_from_dbt_assets(
14
dbt_assets: Sequence[AssetsDefinition]
15
) -> Sequence[AssetChecksDefinition]:
16
"""
17
Build freshness checks from dbt assets with source freshness configurations.
18
19
This function analyzes dbt assets to identify sources with freshness policies
20
and creates corresponding Dagster asset checks that validate data freshness
21
according to dbt source configurations.
22
23
Parameters:
24
- dbt_assets: Sequence of AssetsDefinition objects created from dbt models
25
26
Returns:
27
Sequence of AssetChecksDefinition objects that validate source freshness
28
29
Raises:
30
DagsterDbtError: If dbt assets don't contain required metadata
31
"""
32
```
33
34
## Usage Examples
35
36
### Basic Freshness Check Creation
37
38
```python
39
from dagster import Definitions, AssetExecutionContext
40
from dagster_dbt import DbtCliResource, dbt_assets, build_freshness_checks_from_dbt_assets
41
42
# Create dbt assets with CLI resource
43
dbt_cli_resource = DbtCliResource(project_dir="./my_dbt_project")
44
45
@dbt_assets(manifest="./my_dbt_project/target/manifest.json")
46
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
47
yield from dbt.cli(["build"], context=context).stream()
48
49
# Build freshness checks from the dbt assets
50
freshness_checks = build_freshness_checks_from_dbt_assets([my_dbt_assets])
51
52
# Include in definitions
53
defs = Definitions(
54
assets=[my_dbt_assets],
55
asset_checks=freshness_checks,
56
resources={"dbt": dbt_cli_resource}
57
)
58
```
59
60
### dbt Source Configuration
61
62
Configure freshness policies in your dbt `schema.yml` files:
63
64
```yaml
65
version: 2
66
67
sources:
68
- name: raw_data
69
description: Raw data from external systems
70
freshness:
71
warn_after: {count: 12, period: hour}
72
error_after: {count: 24, period: hour}
73
tables:
74
- name: users
75
description: User data
76
freshness:
77
warn_after: {count: 2, period: hour}
78
error_after: {count: 6, period: hour}
79
- name: orders
80
description: Order data
81
freshness:
82
warn_after: {count: 1, period: hour}
83
error_after: {count: 3, period: hour}
84
```
85
86
### Custom Freshness Check Configuration
87
88
```python
89
from dagster import Definitions, AssetExecutionContext
90
from dagster_dbt import DbtCliResource, dbt_assets, build_freshness_checks_from_dbt_assets, DagsterDbtTranslator
91
from dagster import AssetKey
92
93
class CustomFreshnessTranslator(DagsterDbtTranslator):
94
def get_freshness_policy(self, dbt_resource_props):
95
"""Custom freshness policy based on resource configuration."""
96
config = dbt_resource_props.get("config", {})
97
98
# Set custom freshness based on model tags
99
tags = dbt_resource_props.get("tags", [])
100
if "critical" in tags:
101
return {"warn_after": {"count": 30, "period": "minute"},
102
"error_after": {"count": 60, "period": "minute"}}
103
elif "daily" in tags:
104
return {"warn_after": {"count": 25, "period": "hour"},
105
"error_after": {"count": 30, "period": "hour"}}
106
107
return None
108
109
@dbt_assets(
110
manifest="./my_dbt_project/target/manifest.json",
111
dagster_dbt_translator=CustomFreshnessTranslator()
112
)
113
def my_custom_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
114
yield from dbt.cli(["build"], context=context).stream()
115
116
# Build freshness checks with custom configuration
117
freshness_checks = build_freshness_checks_from_dbt_assets([my_custom_dbt_assets])
118
119
defs = Definitions(
120
assets=[my_custom_dbt_assets],
121
asset_checks=freshness_checks,
122
resources={"dbt": dbt_cli_resource}
123
)
124
```
125
126
### Running Freshness Checks
127
128
```python
129
from dagster import job, op, Definitions
130
from dagster_dbt import DbtCliResource, build_freshness_checks_from_dbt_assets
131
132
@op
133
def run_freshness_checks(context, dbt: DbtCliResource):
134
"""Execute dbt source freshness command."""
135
result = dbt.cli(["source", "freshness"], context=context)
136
context.log.info(f"Freshness check completed: {result.is_successful()}")
137
return result
138
139
@job
140
def daily_freshness_job():
141
"""Job to run daily freshness checks."""
142
run_freshness_checks()
143
144
# Combine with scheduled execution
145
from dagster import ScheduleDefinition
146
147
freshness_schedule = ScheduleDefinition(
148
job=daily_freshness_job,
149
cron_schedule="0 6 * * *", # Run at 6 AM daily
150
name="daily_freshness_schedule"
151
)
152
153
defs = Definitions(
154
jobs=[daily_freshness_job],
155
schedules=[freshness_schedule],
156
resources={"dbt": dbt_cli_resource}
157
)
158
```
159
160
### Monitoring Freshness Results
161
162
```python
163
from dagster import sensor, RunRequest, SkipReason, SensorDefinition
164
from dagster_dbt import DbtCliResource
165
166
@sensor(asset_selection="*")
167
def freshness_alert_sensor(context, dbt: DbtCliResource):
168
"""Sensor to monitor freshness check failures."""
169
170
# Check most recent freshness results
171
try:
172
result = dbt.cli(["source", "freshness", "--output", "json"])
173
freshness_data = result.get_artifact("sources.json")
174
175
failed_sources = []
176
for source_name, source_data in freshness_data.get("sources", {}).items():
177
for table_name, table_data in source_data.get("tables", {}).items():
178
freshness_status = table_data.get("freshness", {}).get("status")
179
if freshness_status == "error":
180
failed_sources.append(f"{source_name}.{table_name}")
181
182
if failed_sources:
183
context.log.warning(f"Freshness check failures: {failed_sources}")
184
# Could trigger alerts, create incidents, etc.
185
return RunRequest(run_key=f"freshness_alert_{context.cursor}")
186
else:
187
return SkipReason("All freshness checks passed")
188
189
except Exception as e:
190
context.log.error(f"Error checking freshness: {e}")
191
return SkipReason(f"Error checking freshness: {e}")
192
193
defs = Definitions(
194
sensors=[freshness_alert_sensor],
195
resources={"dbt": dbt_cli_resource}
196
)
197
```
198
199
## Integration with dbt Commands
200
201
### Source Freshness Command
202
203
The freshness checks integrate with dbt's built-in source freshness functionality:
204
205
```bash
206
# Run freshness checks directly with dbt CLI
207
dbt source freshness
208
209
# Run specific source freshness
210
dbt source freshness --select source:raw_data
211
212
# Output freshness results to JSON
213
dbt source freshness --output json --output-path target/
214
```
215
216
### Integration with dbt Tests
217
218
Combine freshness checks with other dbt data quality tests:
219
220
```python
221
@dbt_assets(manifest="./my_dbt_project/target/manifest.json")
222
def comprehensive_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
223
# Run models and tests
224
yield from dbt.cli(["build"], context=context).stream()
225
226
# Run source freshness checks
227
yield from dbt.cli(["source", "freshness"], context=context).stream()
228
229
# Build both model assets and freshness checks
230
model_assets = [comprehensive_dbt_assets]
231
freshness_checks = build_freshness_checks_from_dbt_assets(model_assets)
232
233
defs = Definitions(
234
assets=model_assets,
235
asset_checks=freshness_checks,
236
resources={"dbt": dbt_cli_resource}
237
)
238
```
239
240
## Configuration Options
241
242
### Freshness Policy Configuration
243
244
Control how freshness policies are interpreted and applied:
245
246
```python
247
from dagster_dbt import DagsterDbtTranslator
248
249
class FreshnessPolicyTranslator(DagsterDbtTranslator):
250
def get_freshness_policy(self, dbt_resource_props):
251
"""Convert dbt freshness config to Dagster freshness policy."""
252
freshness_config = dbt_resource_props.get("freshness")
253
if not freshness_config:
254
return None
255
256
# Convert dbt time periods to minutes
257
def parse_time_period(config):
258
count = config.get("count", 0)
259
period = config.get("period", "hour")
260
261
period_multipliers = {
262
"minute": 1,
263
"hour": 60,
264
"day": 60 * 24
265
}
266
267
return count * period_multipliers.get(period, 60)
268
269
warn_after = freshness_config.get("warn_after")
270
error_after = freshness_config.get("error_after")
271
272
if error_after:
273
maximum_lag_minutes = parse_time_period(error_after)
274
return {"maximum_lag_minutes": maximum_lag_minutes}
275
276
return None
277
```
278
279
## Error Handling
280
281
Handle common freshness check errors:
282
283
```python
284
from dagster_dbt.errors import DagsterDbtError, DagsterDbtCliRuntimeError
285
286
@op
287
def robust_freshness_check(context, dbt: DbtCliResource):
288
"""Freshness check with error handling."""
289
try:
290
result = dbt.cli(["source", "freshness"], context=context)
291
292
if not result.is_successful():
293
# Log details but don't fail the op
294
context.log.warning("Some freshness checks failed, but continuing pipeline")
295
296
return result
297
298
except DagsterDbtCliRuntimeError as e:
299
context.log.error(f"dbt CLI error during freshness check: {e}")
300
# Could choose to fail or continue based on requirements
301
raise
302
303
except DagsterDbtError as e:
304
context.log.error(f"dbt integration error: {e}")
305
raise
306
```