A Dagster integration for dbt that enables orchestrating dbt models as Dagster assets with full lineage tracking and metadata propagation
—
dbt project handling, manifest parsing, and project preparation for integration with Dagster. This module provides utilities for managing dbt project structure, parsing manifests, and preparing projects for execution.
Represents a dbt project with paths and configuration for integration with Dagster.
@record_custom
class DbtProject(IHaveNew):
"""
Represents a dbt project with paths and configuration.
Attributes:
- name: Name of the dbt project
- project_dir: Path to the dbt project directory
- target_path: Path to the dbt target directory (contains artifacts)
- profiles_dir: Path to the dbt profiles directory
- profile: Profile name to use (optional)
- target: Target name to use (optional)
- manifest_path: Path to the manifest.json file
- packaged_project_dir: Path to packaged project directory (optional)
- state_path: Path to state directory (optional)
- has_uninstalled_deps: Whether project has uninstalled dependencies
- preparer: DbtProjectPreparer instance for project preparation
"""
name: str
project_dir: Path
target_path: Path
profiles_dir: Path
profile: Optional[str]
target: Optional[str]
manifest_path: Path
packaged_project_dir: Optional[Path]
state_path: Optional[Path]
has_uninstalled_deps: bool
preparer: "DbtProjectPreparer"
def prepare_if_dev(self) -> None:
"""
Prepare project if in development mode.
Runs dbt parse if manifest.json doesn't exist or is outdated.
"""Abstract base class for project preparation strategies.
class DbtProjectPreparer:
"""
Abstract base class for dbt project preparation.
Project preparers handle the setup and validation of dbt projects
before they can be used with Dagster integration.
"""
def prepare(self) -> None:
"""
Prepare the dbt project for use.
This method should ensure the project is ready for execution,
including generating necessary artifacts like manifest.json.
"""Default implementation of project preparer for Dagster integration.
class DagsterDbtProjectPreparer(DbtProjectPreparer):
"""
Default dbt project preparer for Dagster integration.
Handles standard project preparation including manifest generation
and validation of project structure.
Attributes:
- project_dir: Path to dbt project directory
- profiles_dir: Path to dbt profiles directory
- target: dbt target name
"""
project_dir: str
profiles_dir: Optional[str] = None
target: Optional[str] = None
def prepare(self) -> None:
"""
Prepare dbt project by running dbt parse if needed.
Ensures manifest.json exists and is up-to-date by running
dbt parse command when necessary.
"""
def _should_prepare(self) -> bool:
"""
Determine if project preparation is needed.
Returns:
True if preparation is needed, False otherwise
"""
def _run_dbt_parse(self) -> None:
"""
Run dbt parse command to generate manifest.
Raises:
DagsterDbtCliRuntimeError: If dbt parse fails
"""def validate_manifest(manifest: DbtManifestParam) -> dict:
"""
Validate and load a dbt manifest.
Parameters:
- manifest: Path to manifest file, manifest dict, or manifest JSON string
Returns:
Validated manifest dictionary
Raises:
DagsterDbtError: If manifest is invalid or cannot be loaded
"""
def read_manifest_path(manifest_path: str) -> dict:
"""
Read and cache manifest from file path.
This function caches manifest contents to avoid repeated file I/O
for the same manifest file.
Parameters:
- manifest_path: Path to manifest.json file
Returns:
Parsed manifest dictionary
Raises:
DagsterDbtManifestNotFoundError: If manifest file doesn't exist
"""from dagster_dbt.dbt_project import DbtProject, DagsterDbtProjectPreparer
# Create project representation
project = DbtProject.from_project_dir("./my_dbt_project")
# Ensure project is prepared
if not project.has_manifest():
preparer = DagsterDbtProjectPreparer(
project_dir=project.project_dir,
target="dev"
)
preparer.prepare()
# Access manifest
manifest = project.get_manifest_json()
print(f"Project {project.project_name} has {len(manifest['nodes'])} nodes")from dagster_dbt.dbt_project import DbtProjectPreparer
import subprocess
import os
class CustomDbtProjectPreparer(DbtProjectPreparer):
def __init__(self, project_dir: str, custom_profile_path: str):
self.project_dir = project_dir
self.custom_profile_path = custom_profile_path
def prepare(self) -> None:
"""Custom preparation with environment setup."""
# Set custom profiles directory
env = os.environ.copy()
env["DBT_PROFILES_DIR"] = os.path.dirname(self.custom_profile_path)
# Run dbt deps to install packages
subprocess.run([
"dbt", "deps",
"--project-dir", self.project_dir
], env=env, check=True)
# Run dbt parse to generate manifest
subprocess.run([
"dbt", "parse",
"--project-dir", self.project_dir,
"--target", "prod"
], env=env, check=True)
print("Custom dbt project preparation completed")
# Use custom preparer
preparer = CustomDbtProjectPreparer(
project_dir="./my_dbt_project",
custom_profile_path="./custom_profiles/profiles.yml"
)
preparer.prepare()from dagster_dbt.dbt_project import DbtProject
from dagster_dbt.errors import DagsterDbtProjectNotFoundError, DagsterDbtManifestNotFoundError
def validate_dbt_project(project_dir: str) -> dict:
"""Validate dbt project and return project info."""
try:
project = DbtProject.from_project_dir(project_dir)
# Validate project structure
if not os.path.exists(project.project_dir):
raise DagsterDbtProjectNotFoundError(f"Project directory not found: {project_dir}")
# Check for dbt_project.yml
dbt_project_yml = os.path.join(project.project_dir, "dbt_project.yml")
if not os.path.exists(dbt_project_yml):
raise DagsterDbtProjectNotFoundError("dbt_project.yml not found")
# Validate manifest
if not project.has_manifest():
print("Manifest not found, preparing project...")
project.prepare_if_dev()
manifest = project.get_manifest_json()
# Return project info
return {
"project_name": project.project_name,
"manifest_path": project.manifest_path,
"node_count": len(manifest.get("nodes", {})),
"source_count": len(manifest.get("sources", {})),
"test_count": len([
node for node in manifest.get("nodes", {}).values()
if node.get("resource_type") == "test"
]),
"model_count": len([
node for node in manifest.get("nodes", {}).values()
if node.get("resource_type") == "model"
])
}
except DagsterDbtManifestNotFoundError as e:
print(f"Manifest error: {e}")
raise
except Exception as e:
print(f"Project validation failed: {e}")
raise
# Validate project
project_info = validate_dbt_project("./my_dbt_project")
print(f"Validated project: {project_info}")from dagster_dbt.dbt_project import DbtProject
from pathlib import Path
import yaml
class MultiProjectManager:
def __init__(self, workspace_dir: str):
self.workspace_dir = Path(workspace_dir)
self.projects = {}
def discover_projects(self) -> dict:
"""Discover all dbt projects in workspace."""
for project_path in self.workspace_dir.rglob("dbt_project.yml"):
project_dir = project_path.parent
try:
# Read project configuration
with open(project_path) as f:
project_config = yaml.safe_load(f)
project_name = project_config.get("name")
if project_name:
project = DbtProject.from_project_dir(str(project_dir))
self.projects[project_name] = {
"project": project,
"config": project_config,
"path": str(project_dir)
}
except Exception as e:
print(f"Failed to load project at {project_dir}: {e}")
return self.projects
def prepare_all_projects(self) -> None:
"""Prepare all discovered projects."""
for project_name, project_info in self.projects.items():
print(f"Preparing project: {project_name}")
project_info["project"].prepare_if_dev()
def get_project_manifests(self) -> dict:
"""Get manifests for all projects."""
manifests = {}
for project_name, project_info in self.projects.items():
try:
manifest = project_info["project"].get_manifest_json()
manifests[project_name] = manifest
except Exception as e:
print(f"Failed to load manifest for {project_name}: {e}")
return manifests
# Use multi-project manager
manager = MultiProjectManager("./workspace")
projects = manager.discover_projects()
print(f"Discovered {len(projects)} dbt projects")
manager.prepare_all_projects()
manifests = manager.get_project_manifests()from dagster import Definitions
from dagster_dbt import dbt_assets, DbtCliResource
from dagster_dbt.dbt_project import DbtProject
def create_assets_from_project(project_dir: str, target: str = "dev"):
"""Create Dagster assets from dbt project."""
# Set up project
project = DbtProject.from_project_dir(project_dir)
project.prepare_if_dev()
# Create resource
dbt_resource = DbtCliResource(
project_dir=project.project_dir,
target=target
)
# Create assets
@dbt_assets(manifest=project.manifest_path)
def project_assets(context, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
return project_assets, dbt_resource
# Create assets for multiple projects
all_assets = []
all_resources = {}
for project_path in ["./project_a", "./project_b", "./project_c"]:
assets, resource = create_assets_from_project(project_path)
all_assets.append(assets)
all_resources[f"dbt_{Path(project_path).name}"] = resource
defs = Definitions(
assets=all_assets,
resources=all_resources
)from typing import Optional, Union, Dict, Any
from pathlib import Path
# Type alias for manifest parameter types
DbtManifestParam = Union[Dict[str, Any], str, Path]Install with Tessl CLI
npx tessl i tessl/pypi-dagster-dbt