CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dagster-webserver

Web UI server for Dagster, providing GraphQL API, asset reporting, and browser-based interface for data orchestration platform.

Pending
Overview
Eval results
Files

debug.mddocs/

Debug Tools

Specialized debugging functionality for loading debug export files and running webserver instances with ephemeral data. Enables detailed troubleshooting, development workflows, and analysis of production issues in isolated environments.

Capabilities

Debug Workspace Context

Specialized workspace context that works with ephemeral instances for debugging scenarios.

class WebserverDebugWorkspaceProcessContext(IWorkspaceProcessContext):
    """
    IWorkspaceProcessContext that works with an ephemeral instance.
    Needed for dagster-webserver debug to work with preloaded debug data.
    """
    
    def __init__(self, instance: DagsterInstance):
        """
        Initialize debug workspace context.
        
        Args:
            instance: Ephemeral DagsterInstance with preloaded debug data
        """

    def create_request_context(self, source: Optional[Any] = None) -> BaseWorkspaceRequestContext:
        """
        Create request context for debug mode.
        
        Args:
            source: Optional source context
            
        Returns:
            BaseWorkspaceRequestContext: Context with ephemeral instance and empty workspace
        """

    def refresh_code_location(self, name: str) -> None:
        """Refresh code location (not implemented in debug mode)."""

    def reload_code_location(self, name: str) -> None:
        """Reload code location (not implemented in debug mode)."""

    def reload_workspace(self) -> None:
        """Reload workspace (no-op in debug mode)."""

    def refresh_workspace(self) -> None:
        """Refresh workspace (no-op in debug mode)."""

    @property
    def instance(self) -> DagsterInstance:
        """Get the ephemeral debug instance."""

    @property
    def version(self) -> str:
        """Get webserver version."""

Usage Examples:

from dagster import DagsterInstance
from dagster._core.debug import DebugRunPayload
from dagster_webserver.debug import WebserverDebugWorkspaceProcessContext
from dagster_webserver.app import create_app_from_workspace_process_context

# Load debug payloads from export files
debug_payloads = []
for debug_file in debug_files:
    with gzip.open(debug_file, 'rb') as f:
        payload = deserialize_value(f.read().decode('utf-8'), DebugRunPayload)
        debug_payloads.append(payload)

# Create ephemeral instance with debug data
debug_instance = DagsterInstance.ephemeral(preload=debug_payloads)

# Create debug workspace context
with WebserverDebugWorkspaceProcessContext(debug_instance) as debug_context:
    # Create webserver app
    app = create_app_from_workspace_process_context(debug_context)
    
    # Run webserver
    uvicorn.run(app, host="127.0.0.1", port=3000)

Debug CLI Command

Command-line interface for loading debug export files and starting a webserver with the debug data.

@click.command(name="debug")
@click.argument("input_files", nargs=-1, type=click.Path(exists=True))
@click.option("--port", "-p", type=click.INT, default=3000, help="Port to run server on")
def webserver_debug_command(input_files, port):
    """
    Load webserver with ephemeral instance from dagster debug export files.
    
    Args:
        input_files: Paths to debug export files (gzipped)
        port: Port to run webserver on
    """

def main():
    """Entry point for dagster-webserver-debug CLI."""

def load_debug_files(file_paths: List[str]) -> List[DebugRunPayload]:
    """
    Load debug payloads from compressed export files.
    
    Args:
        file_paths: List of paths to .gz debug export files
        
    Returns:
        List[DebugRunPayload]: Loaded and deserialized debug payloads
    """

CLI Usage Examples:

# Load single debug export file
dagster-webserver-debug /path/to/debug_export.gz

# Load multiple debug files
dagster-webserver-debug debug1.gz debug2.gz debug3.gz

# Specify custom port
dagster-webserver-debug --port 8080 /path/to/debug_export.gz

# Load from current directory
dagster-webserver-debug *.gz

Debug File Loading

Process for loading and deserializing debug export files:

from gzip import GzipFile
from dagster._core.debug import DebugRunPayload
from dagster_shared.serdes import deserialize_value

def load_debug_files(file_paths):
    """
    Load debug payloads from export files.
    
    Args:
        file_paths: List of paths to debug export files
        
    Returns:
        list[DebugRunPayload]: Loaded debug payloads
    """
    debug_payloads = []
    
    for file_path in file_paths:
        print(f"Loading {file_path}...")
        
        with GzipFile(file_path, "rb") as file:
            blob = file.read().decode("utf-8")
            debug_payload = deserialize_value(blob, DebugRunPayload)
            
            print(f"  run_id: {debug_payload.dagster_run.run_id}")
            print(f"  dagster version: {debug_payload.version}")
            
            debug_payloads.append(debug_payload)
    
    return debug_payloads

# Usage
debug_payloads = load_debug_files(["debug1.gz", "debug2.gz"])

Ephemeral Instance Creation

Create DagsterInstance with preloaded debug data:

from dagster import DagsterInstance

def create_debug_instance(debug_payloads):
    """
    Create ephemeral instance with debug data.
    
    Args:
        debug_payloads: List of DebugRunPayload objects
        
    Returns:
        DagsterInstance: Ephemeral instance with preloaded data
    """
    return DagsterInstance.ephemeral(preload=debug_payloads)

# Usage
debug_instance = create_debug_instance(debug_payloads)

# Instance contains all debug data
runs = debug_instance.get_runs()
print(f"Loaded {len(runs)} runs from debug files")

Debug Export File Format

Debug export files contain serialized Dagster run data:

DebugRunPayload Structure

@dataclass
class DebugRunPayload:
    """Payload for debug export containing run and related data."""
    version: str                    # Dagster version
    dagster_run: DagsterRun        # Run configuration and metadata
    event_list: List[DagsterEvent] # All events for the run
    instance_settings: dict        # Instance configuration
    workspace_context: dict        # Workspace information

Creating Debug Exports

# Debug exports are typically created using dagster CLI
# dagster debug export <run_id> --output debug_export.gz

# Programmatic export (advanced usage)
from dagster._core.debug import DebugRunPayload
from dagster_shared.serdes import serialize_value
import gzip

def create_debug_export(instance, run_id, output_path):
    """Create debug export file for a run."""
    run = instance.get_run_by_id(run_id)
    events = instance.get_logs_for_run(run_id).all_events
    
    payload = DebugRunPayload(
        version=__version__,
        dagster_run=run,
        event_list=events,
        instance_settings=instance.get_settings(),
        workspace_context={}
    )
    
    serialized = serialize_value(payload)
    
    with gzip.open(output_path, 'wb') as f:
        f.write(serialized.encode('utf-8'))

Development Workflows

Local Debugging

# Load production debug data locally
debug_files = ["prod_run_123.gz", "prod_run_124.gz"]
debug_payloads = load_debug_files(debug_files)

# Create local debug environment
debug_instance = DagsterInstance.ephemeral(preload=debug_payloads)

with WebserverDebugWorkspaceProcessContext(debug_instance) as context:
    app = create_app_from_workspace_process_context(context)
    
    # Access production data locally without connecting to prod
    uvicorn.run(app, host="127.0.0.1", port=3000)

Issue Investigation

# 1. Export debug data from production
dagster debug export failing_run_id --output failing_run.gz

# 2. Load in local debug environment  
dagster-webserver-debug failing_run.gz

# 3. Navigate to http://localhost:3000 to investigate
# - View run logs and events
# - Analyze execution timeline
# - Examine asset lineage
# - Debug configuration issues

Testing and Development

# Create test debug data
test_payloads = create_test_debug_payloads()
test_instance = DagsterInstance.ephemeral(preload=test_payloads)

with WebserverDebugWorkspaceProcessContext(test_instance) as context:
    # Test webserver functionality
    app = create_app_from_workspace_process_context(context)
    
    # Run integration tests
    test_client = TestClient(app)
    response = test_client.get("/graphql")
    assert response.status_code == 200

Performance Analysis

# Load multiple runs for performance analysis
large_debug_files = glob.glob("debug_exports/*.gz")
debug_payloads = load_debug_files(large_debug_files)

print(f"Loaded {len(debug_payloads)} runs for analysis")

# Create instance with all data
analysis_instance = DagsterInstance.ephemeral(preload=debug_payloads)

# Analyze patterns across multiple runs
with WebserverDebugWorkspaceProcessContext(analysis_instance) as context:
    app = create_app_from_workspace_process_context(context)
    # Use webserver UI to analyze trends and patterns

Advanced Debug Scenarios

Custom Debug Context

class CustomDebugWorkspaceProcessContext(WebserverDebugWorkspaceProcessContext):
    """Extended debug context with custom functionality."""
    
    def __init__(self, instance, custom_config):
        super().__init__(instance)
        self.custom_config = custom_config
    
    def create_request_context(self, source=None):
        context = super().create_request_context(source)
        # Add custom debugging information
        context.debug_config = self.custom_config
        return context

# Usage with custom context
custom_context = CustomDebugWorkspaceProcessContext(
    debug_instance, 
    {"debug_mode": True, "verbose_logging": True}
)

Debug Data Filtering

def filter_debug_payloads(payloads, criteria):
    """Filter debug payloads based on criteria."""
    filtered = []
    
    for payload in payloads:
        if criteria.get("status") and payload.dagster_run.status != criteria["status"]:
            continue
        if criteria.get("job_name") and payload.dagster_run.job_name != criteria["job_name"]:
            continue
        filtered.append(payload)
    
    return filtered

# Load only failed runs
failed_runs = filter_debug_payloads(
    all_payloads, 
    {"status": DagsterRunStatus.FAILURE}
)

debug_instance = DagsterInstance.ephemeral(preload=failed_runs)

Multi-Environment Debug

# Load debug data from multiple environments
prod_payloads = load_debug_files(glob.glob("prod_exports/*.gz"))
staging_payloads = load_debug_files(glob.glob("staging_exports/*.gz"))

# Create separate debug instances
prod_instance = DagsterInstance.ephemeral(preload=prod_payloads)
staging_instance = DagsterInstance.ephemeral(preload=staging_payloads)

# Compare environments
print(f"Prod runs: {len(prod_instance.get_runs())}")
print(f"Staging runs: {len(staging_instance.get_runs())}")

Security Considerations

Debug mode should only be used in secure environments:

# Ensure debug webserver is not exposed publicly
if os.getenv("ENVIRONMENT") == "production":
    raise Exception("Debug mode not allowed in production")

# Bind to localhost only
host_dagster_ui_with_workspace_process_context(
    debug_context,
    host="127.0.0.1",  # Never use 0.0.0.0 for debug
    port=port,
    path_prefix="",
    log_level="debug"
)

Limitations

Debug mode has several limitations:

  • No code locations: Workspace contains no active code locations
  • No mutations: Cannot execute runs or modify instance state
  • Ephemeral data: All data is in-memory and lost when process ends
  • Limited workspace operations: Most workspace operations are no-ops
  • Read-only access: UI provides read-only view of historical data

These limitations ensure debug mode is safe for analyzing production data without affecting live systems.

Install with Tessl CLI

npx tessl i tessl/pypi-dagster-webserver

docs

application.md

asset-reporting.md

cli.md

debug.md

graphql.md

index.md

tile.json