or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

asset-creation.mdcli-resource.mdcomponent-system.mddbt-cloud-legacy.mddbt-cloud-v2.mderror-handling.mdfreshness-checks.mdindex.mdproject-management.mdtranslation-system.mdutilities.md

freshness-checks.mddocs/

0

# Freshness Checks

1

2

Build asset checks for dbt source freshness validation. This module provides functionality to automatically generate Dagster asset checks from dbt source freshness configurations, enabling data quality monitoring within Dagster pipelines.

3

4

## Capabilities

5

6

### Freshness Check Builder

7

8

#### build_freshness_checks_from_dbt_assets

9

10

Creates asset checks from dbt source freshness configurations defined in dbt assets.

11

12

```python { .api }

13

def build_freshness_checks_from_dbt_assets(

14

dbt_assets: Sequence[AssetsDefinition]

15

) -> Sequence[AssetChecksDefinition]:

16

"""

17

Build freshness checks from dbt assets with source freshness configurations.

18

19

This function analyzes dbt assets to identify sources with freshness policies

20

and creates corresponding Dagster asset checks that validate data freshness

21

according to dbt source configurations.

22

23

Parameters:

24

- dbt_assets: Sequence of AssetsDefinition objects created from dbt models

25

26

Returns:

27

Sequence of AssetChecksDefinition objects that validate source freshness

28

29

Raises:

30

DagsterDbtError: If dbt assets don't contain required metadata

31

"""

32

```

33

34

## Usage Examples

35

36

### Basic Freshness Check Creation

37

38

```python

39

from dagster import Definitions, AssetExecutionContext

40

from dagster_dbt import DbtCliResource, dbt_assets, build_freshness_checks_from_dbt_assets

41

42

# Create dbt assets with CLI resource

43

dbt_cli_resource = DbtCliResource(project_dir="./my_dbt_project")

44

45

@dbt_assets(manifest="./my_dbt_project/target/manifest.json")

46

def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):

47

yield from dbt.cli(["build"], context=context).stream()

48

49

# Build freshness checks from the dbt assets

50

freshness_checks = build_freshness_checks_from_dbt_assets([my_dbt_assets])

51

52

# Include in definitions

53

defs = Definitions(

54

assets=[my_dbt_assets],

55

asset_checks=freshness_checks,

56

resources={"dbt": dbt_cli_resource}

57

)

58

```

59

60

### dbt Source Configuration

61

62

Configure freshness policies in your dbt `schema.yml` files:

63

64

```yaml

65

version: 2

66

67

sources:

68

- name: raw_data

69

description: Raw data from external systems

70

freshness:

71

warn_after: {count: 12, period: hour}

72

error_after: {count: 24, period: hour}

73

tables:

74

- name: users

75

description: User data

76

freshness:

77

warn_after: {count: 2, period: hour}

78

error_after: {count: 6, period: hour}

79

- name: orders

80

description: Order data

81

freshness:

82

warn_after: {count: 1, period: hour}

83

error_after: {count: 3, period: hour}

84

```

85

86

### Custom Freshness Check Configuration

87

88

```python

89

from dagster import Definitions, AssetExecutionContext

90

from dagster_dbt import DbtCliResource, dbt_assets, build_freshness_checks_from_dbt_assets, DagsterDbtTranslator

91

from dagster import AssetKey

92

93

class CustomFreshnessTranslator(DagsterDbtTranslator):

94

def get_freshness_policy(self, dbt_resource_props):

95

"""Custom freshness policy based on resource configuration."""

96

config = dbt_resource_props.get("config", {})

97

98

# Set custom freshness based on model tags

99

tags = dbt_resource_props.get("tags", [])

100

if "critical" in tags:

101

return {"warn_after": {"count": 30, "period": "minute"},

102

"error_after": {"count": 60, "period": "minute"}}

103

elif "daily" in tags:

104

return {"warn_after": {"count": 25, "period": "hour"},

105

"error_after": {"count": 30, "period": "hour"}}

106

107

return None

108

109

@dbt_assets(

110

manifest="./my_dbt_project/target/manifest.json",

111

dagster_dbt_translator=CustomFreshnessTranslator()

112

)

113

def my_custom_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):

114

yield from dbt.cli(["build"], context=context).stream()

115

116

# Build freshness checks with custom configuration

117

freshness_checks = build_freshness_checks_from_dbt_assets([my_custom_dbt_assets])

118

119

defs = Definitions(

120

assets=[my_custom_dbt_assets],

121

asset_checks=freshness_checks,

122

resources={"dbt": dbt_cli_resource}

123

)

124

```

125

126

### Running Freshness Checks

127

128

```python

129

from dagster import job, op, Definitions

130

from dagster_dbt import DbtCliResource, build_freshness_checks_from_dbt_assets

131

132

@op

133

def run_freshness_checks(context, dbt: DbtCliResource):

134

"""Execute dbt source freshness command."""

135

result = dbt.cli(["source", "freshness"], context=context)

136

context.log.info(f"Freshness check completed: {result.is_successful()}")

137

return result

138

139

@job

140

def daily_freshness_job():

141

"""Job to run daily freshness checks."""

142

run_freshness_checks()

143

144

# Combine with scheduled execution

145

from dagster import ScheduleDefinition

146

147

freshness_schedule = ScheduleDefinition(

148

job=daily_freshness_job,

149

cron_schedule="0 6 * * *", # Run at 6 AM daily

150

name="daily_freshness_schedule"

151

)

152

153

defs = Definitions(

154

jobs=[daily_freshness_job],

155

schedules=[freshness_schedule],

156

resources={"dbt": dbt_cli_resource}

157

)

158

```

159

160

### Monitoring Freshness Results

161

162

```python

163

from dagster import sensor, RunRequest, SkipReason, SensorDefinition

164

from dagster_dbt import DbtCliResource

165

166

@sensor(asset_selection="*")

167

def freshness_alert_sensor(context, dbt: DbtCliResource):

168

"""Sensor to monitor freshness check failures."""

169

170

# Check most recent freshness results

171

try:

172

result = dbt.cli(["source", "freshness", "--output", "json"])

173

freshness_data = result.get_artifact("sources.json")

174

175

failed_sources = []

176

for source_name, source_data in freshness_data.get("sources", {}).items():

177

for table_name, table_data in source_data.get("tables", {}).items():

178

freshness_status = table_data.get("freshness", {}).get("status")

179

if freshness_status == "error":

180

failed_sources.append(f"{source_name}.{table_name}")

181

182

if failed_sources:

183

context.log.warning(f"Freshness check failures: {failed_sources}")

184

# Could trigger alerts, create incidents, etc.

185

return RunRequest(run_key=f"freshness_alert_{context.cursor}")

186

else:

187

return SkipReason("All freshness checks passed")

188

189

except Exception as e:

190

context.log.error(f"Error checking freshness: {e}")

191

return SkipReason(f"Error checking freshness: {e}")

192

193

defs = Definitions(

194

sensors=[freshness_alert_sensor],

195

resources={"dbt": dbt_cli_resource}

196

)

197

```

198

199

## Integration with dbt Commands

200

201

### Source Freshness Command

202

203

The freshness checks integrate with dbt's built-in source freshness functionality:

204

205

```bash

206

# Run freshness checks directly with dbt CLI

207

dbt source freshness

208

209

# Run specific source freshness

210

dbt source freshness --select source:raw_data

211

212

# Output freshness results to JSON

213

dbt source freshness --output json --output-path target/

214

```

215

216

### Integration with dbt Tests

217

218

Combine freshness checks with other dbt data quality tests:

219

220

```python

221

@dbt_assets(manifest="./my_dbt_project/target/manifest.json")

222

def comprehensive_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):

223

# Run models and tests

224

yield from dbt.cli(["build"], context=context).stream()

225

226

# Run source freshness checks

227

yield from dbt.cli(["source", "freshness"], context=context).stream()

228

229

# Build both model assets and freshness checks

230

model_assets = [comprehensive_dbt_assets]

231

freshness_checks = build_freshness_checks_from_dbt_assets(model_assets)

232

233

defs = Definitions(

234

assets=model_assets,

235

asset_checks=freshness_checks,

236

resources={"dbt": dbt_cli_resource}

237

)

238

```

239

240

## Configuration Options

241

242

### Freshness Policy Configuration

243

244

Control how freshness policies are interpreted and applied:

245

246

```python

247

from dagster_dbt import DagsterDbtTranslator

248

249

class FreshnessPolicyTranslator(DagsterDbtTranslator):

250

def get_freshness_policy(self, dbt_resource_props):

251

"""Convert dbt freshness config to Dagster freshness policy."""

252

freshness_config = dbt_resource_props.get("freshness")

253

if not freshness_config:

254

return None

255

256

# Convert dbt time periods to minutes

257

def parse_time_period(config):

258

count = config.get("count", 0)

259

period = config.get("period", "hour")

260

261

period_multipliers = {

262

"minute": 1,

263

"hour": 60,

264

"day": 60 * 24

265

}

266

267

return count * period_multipliers.get(period, 60)

268

269

warn_after = freshness_config.get("warn_after")

270

error_after = freshness_config.get("error_after")

271

272

if error_after:

273

maximum_lag_minutes = parse_time_period(error_after)

274

return {"maximum_lag_minutes": maximum_lag_minutes}

275

276

return None

277

```

278

279

## Error Handling

280

281

Handle common freshness check errors:

282

283

```python

284

from dagster_dbt.errors import DagsterDbtError, DagsterDbtCliRuntimeError

285

286

@op

287

def robust_freshness_check(context, dbt: DbtCliResource):

288

"""Freshness check with error handling."""

289

try:

290

result = dbt.cli(["source", "freshness"], context=context)

291

292

if not result.is_successful():

293

# Log details but don't fail the op

294

context.log.warning("Some freshness checks failed, but continuing pipeline")

295

296

return result

297

298

except DagsterDbtCliRuntimeError as e:

299

context.log.error(f"dbt CLI error during freshness check: {e}")

300

# Could choose to fail or continue based on requirements

301

raise

302

303

except DagsterDbtError as e:

304

context.log.error(f"dbt integration error: {e}")

305

raise

306

```