or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

asset-creation.mdcli-resource.mdcomponent-system.mddbt-cloud-legacy.mddbt-cloud-v2.mderror-handling.mdfreshness-checks.mdindex.mdproject-management.mdtranslation-system.mdutilities.md

cli-resource.mddocs/

0

# CLI Resource and Execution

1

2

Local dbt execution through the CLI resource, including command invocation, event streaming, and artifact management. This module provides the core interface for running dbt commands within Dagster ops and assets.

3

4

## Capabilities

5

6

### CLI Resource

7

8

#### DbtCliResource

9

10

The main resource for executing dbt CLI commands within Dagster.

11

12

```python { .api }

13

class DbtCliResource(ConfigurableResource):

14

"""

15

Resource for executing dbt CLI commands.

16

17

Attributes:

18

- project_dir: Path to the dbt project directory

19

- profiles_dir: Path to the dbt profiles directory (optional)

20

- profile: Name of the dbt profile to use (optional)

21

- target: Name of the dbt target to use (optional)

22

- global_config_flags: List of global dbt flags to apply

23

"""

24

25

project_dir: str

26

profiles_dir: Optional[str] = None

27

profile: Optional[str] = None

28

target: Optional[str] = None

29

global_config_flags: List[str] = []

30

31

def cli(

32

self,

33

args: List[str],

34

context: Optional[AssetExecutionContext | OpExecutionContext] = None,

35

**kwargs

36

) -> DbtCliInvocation:

37

"""

38

Execute dbt CLI command.

39

40

Parameters:

41

- args: dbt command arguments (e.g., ["build", "--select", "my_model"])

42

- context: Dagster execution context for logging and metadata

43

- **kwargs: Additional arguments passed to subprocess

44

45

Returns:

46

DbtCliInvocation object for streaming results and accessing artifacts

47

"""

48

```

49

50

#### Usage Example

51

52

```python

53

from dagster import asset, AssetExecutionContext

54

from dagster_dbt import DbtCliResource

55

56

dbt_resource = DbtCliResource(

57

project_dir="./my_dbt_project",

58

profiles_dir="~/.dbt",

59

target="dev"

60

)

61

62

@asset

63

def run_dbt_models(context: AssetExecutionContext, dbt: DbtCliResource):

64

# Run specific models

65

dbt_run = dbt.cli([

66

"run",

67

"--select", "tag:daily",

68

"--exclude", "tag:slow"

69

], context=context)

70

71

# Stream events and get results

72

for event in dbt_run.stream():

73

context.log.info(f"dbt: {event}")

74

75

# Access artifacts

76

run_results = dbt_run.get_artifact("run_results.json")

77

return {"models_run": len(run_results.get("results", []))}

78

```

79

80

### CLI Invocation

81

82

#### DbtCliInvocation

83

84

Represents a dbt CLI command invocation with methods for streaming events and accessing artifacts.

85

86

```python { .api }

87

class DbtCliInvocation:

88

"""

89

Represents a dbt CLI command invocation.

90

91

Provides methods for streaming execution events and accessing

92

generated artifacts like run_results.json and manifest.json.

93

"""

94

95

def stream(self) -> Iterator[DbtCliEventMessage]:

96

"""

97

Stream dbt CLI events as they occur.

98

99

Yields:

100

DbtCliEventMessage objects containing event data

101

"""

102

103

def stream_raw_events(self) -> Iterator[dict]:

104

"""

105

Stream raw dbt event dictionaries.

106

107

Yields:

108

Raw dbt event dictionaries without parsing

109

"""

110

111

def get_artifact(self, artifact_name: str) -> Optional[dict]:

112

"""

113

Get a dbt artifact by name.

114

115

Parameters:

116

- artifact_name: Name of artifact (e.g., "run_results.json", "manifest.json")

117

118

Returns:

119

Parsed artifact dictionary or None if not found

120

"""

121

122

def wait(self) -> CompletedProcess:

123

"""

124

Wait for the dbt command to complete.

125

126

Returns:

127

CompletedProcess with return code and outputs

128

"""

129

130

@property

131

def is_successful(self) -> bool:

132

"""

133

Check if the dbt command completed successfully.

134

135

Returns:

136

True if command succeeded, False otherwise

137

"""

138

```

139

140

### CLI Event Messages

141

142

#### DbtCliEventMessage

143

144

Base class for dbt CLI event messages with common event handling.

145

146

```python { .api }

147

class DbtCliEventMessage:

148

"""

149

Base class for dbt CLI event messages.

150

151

Attributes:

152

- raw_event: Raw event dictionary from dbt

153

- event_type: Type of dbt event

154

- log_level: Logging level of the event

155

"""

156

157

raw_event: dict

158

159

@property

160

def event_type(self) -> str:

161

"""Get the dbt event type."""

162

163

@property

164

def log_level(self) -> str:

165

"""Get the event log level."""

166

167

def to_default_asset_events(

168

self,

169

context: AssetExecutionContext,

170

manifest: dict,

171

**kwargs

172

) -> Iterator[AssetMaterialization | AssetObservation]:

173

"""

174

Convert to Dagster asset events.

175

176

Parameters:

177

- context: Asset execution context

178

- manifest: dbt manifest dictionary

179

- **kwargs: Additional conversion parameters

180

181

Yields:

182

AssetMaterialization or AssetObservation events

183

"""

184

```

185

186

#### DbtCoreCliEventMessage

187

188

Event message implementation for dbt Core CLI commands.

189

190

```python { .api }

191

class DbtCoreCliEventMessage(DbtCliEventMessage):

192

"""

193

dbt Core CLI event message implementation.

194

195

Handles events from dbt Core CLI execution with Core-specific

196

event parsing and asset event generation.

197

"""

198

```

199

200

#### DbtFusionCliEventMessage

201

202

Event message implementation for dbt Fusion CLI commands.

203

204

```python { .api }

205

class DbtFusionCliEventMessage(DbtCliEventMessage):

206

"""

207

dbt Fusion CLI event message implementation.

208

209

Handles events from dbt Fusion CLI execution with Fusion-specific

210

event parsing and optimized performance characteristics.

211

"""

212

```

213

214

## Advanced Usage Examples

215

216

### Custom Event Handling

217

218

```python

219

from dagster import asset, AssetExecutionContext

220

from dagster_dbt import DbtCliResource

221

222

@asset

223

def process_dbt_with_custom_events(

224

context: AssetExecutionContext,

225

dbt: DbtCliResource

226

):

227

dbt_run = dbt.cli(["test"], context=context)

228

229

test_results = []

230

for event in dbt_run.stream():

231

if event.event_type == "test_result":

232

test_results.append({

233

"test_name": event.raw_event.get("data", {}).get("node_name"),

234

"status": event.raw_event.get("data", {}).get("status"),

235

"execution_time": event.raw_event.get("data", {}).get("execution_time")

236

})

237

238

# Log important events

239

if event.log_level in ["error", "warn"]:

240

context.log.warning(f"dbt {event.log_level}: {event.raw_event}")

241

242

return {"test_results": test_results}

243

```

244

245

### Artifact Access

246

247

```python

248

from dagster import asset, AssetExecutionContext

249

from dagster_dbt import DbtCliResource

250

251

@asset

252

def analyze_dbt_run_results(

253

context: AssetExecutionContext,

254

dbt: DbtCliResource

255

):

256

# Run dbt and get artifacts

257

invocation = dbt.cli(["run", "--select", "tag:important"], context=context)

258

259

# Process events

260

for event in invocation.stream():

261

pass # Process events as needed

262

263

# Access run results

264

run_results = invocation.get_artifact("run_results.json")

265

manifest = invocation.get_artifact("manifest.json")

266

267

if run_results and manifest:

268

successful_models = [

269

result["unique_id"]

270

for result in run_results.get("results", [])

271

if result["status"] == "success"

272

]

273

274

return {

275

"successful_models": successful_models,

276

"total_execution_time": run_results.get("elapsed_time", 0)

277

}

278

279

return {"error": "Could not access dbt artifacts"}

280

```

281

282

## Type Definitions

283

284

```python { .api }

285

# Import types for type hints

286

from dagster import AssetExecutionContext, OpExecutionContext

287

from subprocess import CompletedProcess

288

from typing import Iterator, List, Optional, Union, dict

289

```