0
# Project Management
1
2
dbt project handling, manifest parsing, and project preparation for integration with Dagster. This module provides utilities for managing dbt project structure, parsing manifests, and preparing projects for execution.
3
4
## Capabilities
5
6
### Project Representation
7
8
#### DbtProject
9
10
Represents a dbt project with paths and configuration for integration with Dagster.
11
12
```python { .api }
13
@record_custom
14
class DbtProject(IHaveNew):
15
"""
16
Represents a dbt project with paths and configuration.
17
18
Attributes:
19
- name: Name of the dbt project
20
- project_dir: Path to the dbt project directory
21
- target_path: Path to the dbt target directory (contains artifacts)
22
- profiles_dir: Path to the dbt profiles directory
23
- profile: Profile name to use (optional)
24
- target: Target name to use (optional)
25
- manifest_path: Path to the manifest.json file
26
- packaged_project_dir: Path to packaged project directory (optional)
27
- state_path: Path to state directory (optional)
28
- has_uninstalled_deps: Whether project has uninstalled dependencies
29
- preparer: DbtProjectPreparer instance for project preparation
30
"""
31
32
name: str
33
project_dir: Path
34
target_path: Path
35
profiles_dir: Path
36
profile: Optional[str]
37
target: Optional[str]
38
manifest_path: Path
39
packaged_project_dir: Optional[Path]
40
state_path: Optional[Path]
41
has_uninstalled_deps: bool
42
preparer: "DbtProjectPreparer"
43
44
def prepare_if_dev(self) -> None:
45
"""
46
Prepare project if in development mode.
47
48
Runs dbt parse if manifest.json doesn't exist or is outdated.
49
"""
50
```
51
52
### Project Preparation
53
54
#### DbtProjectPreparer
55
56
Abstract base class for project preparation strategies.
57
58
```python { .api }
59
class DbtProjectPreparer:
60
"""
61
Abstract base class for dbt project preparation.
62
63
Project preparers handle the setup and validation of dbt projects
64
before they can be used with Dagster integration.
65
"""
66
67
def prepare(self) -> None:
68
"""
69
Prepare the dbt project for use.
70
71
This method should ensure the project is ready for execution,
72
including generating necessary artifacts like manifest.json.
73
"""
74
```
75
76
#### DagsterDbtProjectPreparer
77
78
Default implementation of project preparer for Dagster integration.
79
80
```python { .api }
81
class DagsterDbtProjectPreparer(DbtProjectPreparer):
82
"""
83
Default dbt project preparer for Dagster integration.
84
85
Handles standard project preparation including manifest generation
86
and validation of project structure.
87
88
Attributes:
89
- project_dir: Path to dbt project directory
90
- profiles_dir: Path to dbt profiles directory
91
- target: dbt target name
92
"""
93
94
project_dir: str
95
profiles_dir: Optional[str] = None
96
target: Optional[str] = None
97
98
def prepare(self) -> None:
99
"""
100
Prepare dbt project by running dbt parse if needed.
101
102
Ensures manifest.json exists and is up-to-date by running
103
dbt parse command when necessary.
104
"""
105
106
def _should_prepare(self) -> bool:
107
"""
108
Determine if project preparation is needed.
109
110
Returns:
111
True if preparation is needed, False otherwise
112
"""
113
114
def _run_dbt_parse(self) -> None:
115
"""
116
Run dbt parse command to generate manifest.
117
118
Raises:
119
DagsterDbtCliRuntimeError: If dbt parse fails
120
"""
121
```
122
123
### Manifest Operations
124
125
#### Manifest Validation and Loading
126
127
```python { .api }
128
def validate_manifest(manifest: DbtManifestParam) -> dict:
129
"""
130
Validate and load a dbt manifest.
131
132
Parameters:
133
- manifest: Path to manifest file, manifest dict, or manifest JSON string
134
135
Returns:
136
Validated manifest dictionary
137
138
Raises:
139
DagsterDbtError: If manifest is invalid or cannot be loaded
140
"""
141
142
def read_manifest_path(manifest_path: str) -> dict:
143
"""
144
Read and cache manifest from file path.
145
146
This function caches manifest contents to avoid repeated file I/O
147
for the same manifest file.
148
149
Parameters:
150
- manifest_path: Path to manifest.json file
151
152
Returns:
153
Parsed manifest dictionary
154
155
Raises:
156
DagsterDbtManifestNotFoundError: If manifest file doesn't exist
157
"""
158
```
159
160
## Usage Examples
161
162
### Basic Project Setup
163
164
```python
165
from dagster_dbt.dbt_project import DbtProject, DagsterDbtProjectPreparer
166
167
# Create project representation
168
project = DbtProject.from_project_dir("./my_dbt_project")
169
170
# Ensure project is prepared
171
if not project.has_manifest():
172
preparer = DagsterDbtProjectPreparer(
173
project_dir=project.project_dir,
174
target="dev"
175
)
176
preparer.prepare()
177
178
# Access manifest
179
manifest = project.get_manifest_json()
180
print(f"Project {project.project_name} has {len(manifest['nodes'])} nodes")
181
```
182
183
### Custom Project Preparation
184
185
```python
186
from dagster_dbt.dbt_project import DbtProjectPreparer
187
import subprocess
188
import os
189
190
class CustomDbtProjectPreparer(DbtProjectPreparer):
191
def __init__(self, project_dir: str, custom_profile_path: str):
192
self.project_dir = project_dir
193
self.custom_profile_path = custom_profile_path
194
195
def prepare(self) -> None:
196
"""Custom preparation with environment setup."""
197
# Set custom profiles directory
198
env = os.environ.copy()
199
env["DBT_PROFILES_DIR"] = os.path.dirname(self.custom_profile_path)
200
201
# Run dbt deps to install packages
202
subprocess.run([
203
"dbt", "deps",
204
"--project-dir", self.project_dir
205
], env=env, check=True)
206
207
# Run dbt parse to generate manifest
208
subprocess.run([
209
"dbt", "parse",
210
"--project-dir", self.project_dir,
211
"--target", "prod"
212
], env=env, check=True)
213
214
print("Custom dbt project preparation completed")
215
216
# Use custom preparer
217
preparer = CustomDbtProjectPreparer(
218
project_dir="./my_dbt_project",
219
custom_profile_path="./custom_profiles/profiles.yml"
220
)
221
preparer.prepare()
222
```
223
224
### Project Validation
225
226
```python
227
from dagster_dbt.dbt_project import DbtProject
228
from dagster_dbt.errors import DagsterDbtProjectNotFoundError, DagsterDbtManifestNotFoundError
229
230
def validate_dbt_project(project_dir: str) -> dict:
231
"""Validate dbt project and return project info."""
232
try:
233
project = DbtProject.from_project_dir(project_dir)
234
235
# Validate project structure
236
if not os.path.exists(project.project_dir):
237
raise DagsterDbtProjectNotFoundError(f"Project directory not found: {project_dir}")
238
239
# Check for dbt_project.yml
240
dbt_project_yml = os.path.join(project.project_dir, "dbt_project.yml")
241
if not os.path.exists(dbt_project_yml):
242
raise DagsterDbtProjectNotFoundError("dbt_project.yml not found")
243
244
# Validate manifest
245
if not project.has_manifest():
246
print("Manifest not found, preparing project...")
247
project.prepare_if_dev()
248
249
manifest = project.get_manifest_json()
250
251
# Return project info
252
return {
253
"project_name": project.project_name,
254
"manifest_path": project.manifest_path,
255
"node_count": len(manifest.get("nodes", {})),
256
"source_count": len(manifest.get("sources", {})),
257
"test_count": len([
258
node for node in manifest.get("nodes", {}).values()
259
if node.get("resource_type") == "test"
260
]),
261
"model_count": len([
262
node for node in manifest.get("nodes", {}).values()
263
if node.get("resource_type") == "model"
264
])
265
}
266
267
except DagsterDbtManifestNotFoundError as e:
268
print(f"Manifest error: {e}")
269
raise
270
except Exception as e:
271
print(f"Project validation failed: {e}")
272
raise
273
274
# Validate project
275
project_info = validate_dbt_project("./my_dbt_project")
276
print(f"Validated project: {project_info}")
277
```
278
279
### Multi-Project Management
280
281
```python
282
from dagster_dbt.dbt_project import DbtProject
283
from pathlib import Path
284
import yaml
285
286
class MultiProjectManager:
287
def __init__(self, workspace_dir: str):
288
self.workspace_dir = Path(workspace_dir)
289
self.projects = {}
290
291
def discover_projects(self) -> dict:
292
"""Discover all dbt projects in workspace."""
293
for project_path in self.workspace_dir.rglob("dbt_project.yml"):
294
project_dir = project_path.parent
295
296
try:
297
# Read project configuration
298
with open(project_path) as f:
299
project_config = yaml.safe_load(f)
300
301
project_name = project_config.get("name")
302
if project_name:
303
project = DbtProject.from_project_dir(str(project_dir))
304
self.projects[project_name] = {
305
"project": project,
306
"config": project_config,
307
"path": str(project_dir)
308
}
309
310
except Exception as e:
311
print(f"Failed to load project at {project_dir}: {e}")
312
313
return self.projects
314
315
def prepare_all_projects(self) -> None:
316
"""Prepare all discovered projects."""
317
for project_name, project_info in self.projects.items():
318
print(f"Preparing project: {project_name}")
319
project_info["project"].prepare_if_dev()
320
321
def get_project_manifests(self) -> dict:
322
"""Get manifests for all projects."""
323
manifests = {}
324
for project_name, project_info in self.projects.items():
325
try:
326
manifest = project_info["project"].get_manifest_json()
327
manifests[project_name] = manifest
328
except Exception as e:
329
print(f"Failed to load manifest for {project_name}: {e}")
330
331
return manifests
332
333
# Use multi-project manager
334
manager = MultiProjectManager("./workspace")
335
projects = manager.discover_projects()
336
print(f"Discovered {len(projects)} dbt projects")
337
338
manager.prepare_all_projects()
339
manifests = manager.get_project_manifests()
340
```
341
342
### Integration with Asset Creation
343
344
```python
345
from dagster import Definitions
346
from dagster_dbt import dbt_assets, DbtCliResource
347
from dagster_dbt.dbt_project import DbtProject
348
349
def create_assets_from_project(project_dir: str, target: str = "dev"):
350
"""Create Dagster assets from dbt project."""
351
# Set up project
352
project = DbtProject.from_project_dir(project_dir)
353
project.prepare_if_dev()
354
355
# Create resource
356
dbt_resource = DbtCliResource(
357
project_dir=project.project_dir,
358
target=target
359
)
360
361
# Create assets
362
@dbt_assets(manifest=project.manifest_path)
363
def project_assets(context, dbt: DbtCliResource):
364
yield from dbt.cli(["build"], context=context).stream()
365
366
return project_assets, dbt_resource
367
368
# Create assets for multiple projects
369
all_assets = []
370
all_resources = {}
371
372
for project_path in ["./project_a", "./project_b", "./project_c"]:
373
assets, resource = create_assets_from_project(project_path)
374
all_assets.append(assets)
375
all_resources[f"dbt_{Path(project_path).name}"] = resource
376
377
defs = Definitions(
378
assets=all_assets,
379
resources=all_resources
380
)
381
```
382
383
## Type Definitions
384
385
```python { .api }
386
from typing import Optional, Union, Dict, Any
387
from pathlib import Path
388
389
# Type alias for manifest parameter types
390
DbtManifestParam = Union[Dict[str, Any], str, Path]
391
```