Web UI server for Dagster, providing GraphQL API, asset reporting, and browser-based interface for data orchestration platform.
—
Specialized debugging functionality for loading debug export files and running webserver instances with ephemeral data. Enables detailed troubleshooting, development workflows, and analysis of production issues in isolated environments.
Specialized workspace context that works with ephemeral instances for debugging scenarios.
class WebserverDebugWorkspaceProcessContext(IWorkspaceProcessContext):
"""
IWorkspaceProcessContext that works with an ephemeral instance.
Needed for dagster-webserver debug to work with preloaded debug data.
"""
def __init__(self, instance: DagsterInstance):
"""
Initialize debug workspace context.
Args:
instance: Ephemeral DagsterInstance with preloaded debug data
"""
def create_request_context(self, source: Optional[Any] = None) -> BaseWorkspaceRequestContext:
"""
Create request context for debug mode.
Args:
source: Optional source context
Returns:
BaseWorkspaceRequestContext: Context with ephemeral instance and empty workspace
"""
def refresh_code_location(self, name: str) -> None:
"""Refresh code location (not implemented in debug mode)."""
def reload_code_location(self, name: str) -> None:
"""Reload code location (not implemented in debug mode)."""
def reload_workspace(self) -> None:
"""Reload workspace (no-op in debug mode)."""
def refresh_workspace(self) -> None:
"""Refresh workspace (no-op in debug mode)."""
@property
def instance(self) -> DagsterInstance:
"""Get the ephemeral debug instance."""
@property
def version(self) -> str:
"""Get webserver version."""Usage Examples:
from dagster import DagsterInstance
from dagster._core.debug import DebugRunPayload
from dagster_webserver.debug import WebserverDebugWorkspaceProcessContext
from dagster_webserver.app import create_app_from_workspace_process_context
# Load debug payloads from export files
debug_payloads = []
for debug_file in debug_files:
with gzip.open(debug_file, 'rb') as f:
payload = deserialize_value(f.read().decode('utf-8'), DebugRunPayload)
debug_payloads.append(payload)
# Create ephemeral instance with debug data
debug_instance = DagsterInstance.ephemeral(preload=debug_payloads)
# Create debug workspace context
with WebserverDebugWorkspaceProcessContext(debug_instance) as debug_context:
# Create webserver app
app = create_app_from_workspace_process_context(debug_context)
# Run webserver
uvicorn.run(app, host="127.0.0.1", port=3000)Command-line interface for loading debug export files and starting a webserver with the debug data.
@click.command(name="debug")
@click.argument("input_files", nargs=-1, type=click.Path(exists=True))
@click.option("--port", "-p", type=click.INT, default=3000, help="Port to run server on")
def webserver_debug_command(input_files, port):
"""
Load webserver with ephemeral instance from dagster debug export files.
Args:
input_files: Paths to debug export files (gzipped)
port: Port to run webserver on
"""
def main():
"""Entry point for dagster-webserver-debug CLI."""
def load_debug_files(file_paths: List[str]) -> List[DebugRunPayload]:
"""
Load debug payloads from compressed export files.
Args:
file_paths: List of paths to .gz debug export files
Returns:
List[DebugRunPayload]: Loaded and deserialized debug payloads
"""CLI Usage Examples:
# Load single debug export file
dagster-webserver-debug /path/to/debug_export.gz
# Load multiple debug files
dagster-webserver-debug debug1.gz debug2.gz debug3.gz
# Specify custom port
dagster-webserver-debug --port 8080 /path/to/debug_export.gz
# Load from current directory
dagster-webserver-debug *.gzProcess for loading and deserializing debug export files:
from gzip import GzipFile
from dagster._core.debug import DebugRunPayload
from dagster_shared.serdes import deserialize_value
def load_debug_files(file_paths):
"""
Load debug payloads from export files.
Args:
file_paths: List of paths to debug export files
Returns:
list[DebugRunPayload]: Loaded debug payloads
"""
debug_payloads = []
for file_path in file_paths:
print(f"Loading {file_path}...")
with GzipFile(file_path, "rb") as file:
blob = file.read().decode("utf-8")
debug_payload = deserialize_value(blob, DebugRunPayload)
print(f" run_id: {debug_payload.dagster_run.run_id}")
print(f" dagster version: {debug_payload.version}")
debug_payloads.append(debug_payload)
return debug_payloads
# Usage
debug_payloads = load_debug_files(["debug1.gz", "debug2.gz"])Create DagsterInstance with preloaded debug data:
from dagster import DagsterInstance
def create_debug_instance(debug_payloads):
"""
Create ephemeral instance with debug data.
Args:
debug_payloads: List of DebugRunPayload objects
Returns:
DagsterInstance: Ephemeral instance with preloaded data
"""
return DagsterInstance.ephemeral(preload=debug_payloads)
# Usage
debug_instance = create_debug_instance(debug_payloads)
# Instance contains all debug data
runs = debug_instance.get_runs()
print(f"Loaded {len(runs)} runs from debug files")Debug export files contain serialized Dagster run data:
@dataclass
class DebugRunPayload:
"""Payload for debug export containing run and related data."""
version: str # Dagster version
dagster_run: DagsterRun # Run configuration and metadata
event_list: List[DagsterEvent] # All events for the run
instance_settings: dict # Instance configuration
workspace_context: dict # Workspace information# Debug exports are typically created using dagster CLI
# dagster debug export <run_id> --output debug_export.gz
# Programmatic export (advanced usage)
from dagster._core.debug import DebugRunPayload
from dagster_shared.serdes import serialize_value
import gzip
def create_debug_export(instance, run_id, output_path):
"""Create debug export file for a run."""
run = instance.get_run_by_id(run_id)
events = instance.get_logs_for_run(run_id).all_events
payload = DebugRunPayload(
version=__version__,
dagster_run=run,
event_list=events,
instance_settings=instance.get_settings(),
workspace_context={}
)
serialized = serialize_value(payload)
with gzip.open(output_path, 'wb') as f:
f.write(serialized.encode('utf-8'))# Load production debug data locally
debug_files = ["prod_run_123.gz", "prod_run_124.gz"]
debug_payloads = load_debug_files(debug_files)
# Create local debug environment
debug_instance = DagsterInstance.ephemeral(preload=debug_payloads)
with WebserverDebugWorkspaceProcessContext(debug_instance) as context:
app = create_app_from_workspace_process_context(context)
# Access production data locally without connecting to prod
uvicorn.run(app, host="127.0.0.1", port=3000)# 1. Export debug data from production
dagster debug export failing_run_id --output failing_run.gz
# 2. Load in local debug environment
dagster-webserver-debug failing_run.gz
# 3. Navigate to http://localhost:3000 to investigate
# - View run logs and events
# - Analyze execution timeline
# - Examine asset lineage
# - Debug configuration issues# Create test debug data
test_payloads = create_test_debug_payloads()
test_instance = DagsterInstance.ephemeral(preload=test_payloads)
with WebserverDebugWorkspaceProcessContext(test_instance) as context:
# Test webserver functionality
app = create_app_from_workspace_process_context(context)
# Run integration tests
test_client = TestClient(app)
response = test_client.get("/graphql")
assert response.status_code == 200# Load multiple runs for performance analysis
large_debug_files = glob.glob("debug_exports/*.gz")
debug_payloads = load_debug_files(large_debug_files)
print(f"Loaded {len(debug_payloads)} runs for analysis")
# Create instance with all data
analysis_instance = DagsterInstance.ephemeral(preload=debug_payloads)
# Analyze patterns across multiple runs
with WebserverDebugWorkspaceProcessContext(analysis_instance) as context:
app = create_app_from_workspace_process_context(context)
# Use webserver UI to analyze trends and patternsclass CustomDebugWorkspaceProcessContext(WebserverDebugWorkspaceProcessContext):
"""Extended debug context with custom functionality."""
def __init__(self, instance, custom_config):
super().__init__(instance)
self.custom_config = custom_config
def create_request_context(self, source=None):
context = super().create_request_context(source)
# Add custom debugging information
context.debug_config = self.custom_config
return context
# Usage with custom context
custom_context = CustomDebugWorkspaceProcessContext(
debug_instance,
{"debug_mode": True, "verbose_logging": True}
)def filter_debug_payloads(payloads, criteria):
"""Filter debug payloads based on criteria."""
filtered = []
for payload in payloads:
if criteria.get("status") and payload.dagster_run.status != criteria["status"]:
continue
if criteria.get("job_name") and payload.dagster_run.job_name != criteria["job_name"]:
continue
filtered.append(payload)
return filtered
# Load only failed runs
failed_runs = filter_debug_payloads(
all_payloads,
{"status": DagsterRunStatus.FAILURE}
)
debug_instance = DagsterInstance.ephemeral(preload=failed_runs)# Load debug data from multiple environments
prod_payloads = load_debug_files(glob.glob("prod_exports/*.gz"))
staging_payloads = load_debug_files(glob.glob("staging_exports/*.gz"))
# Create separate debug instances
prod_instance = DagsterInstance.ephemeral(preload=prod_payloads)
staging_instance = DagsterInstance.ephemeral(preload=staging_payloads)
# Compare environments
print(f"Prod runs: {len(prod_instance.get_runs())}")
print(f"Staging runs: {len(staging_instance.get_runs())}")Debug mode should only be used in secure environments:
# Ensure debug webserver is not exposed publicly
if os.getenv("ENVIRONMENT") == "production":
raise Exception("Debug mode not allowed in production")
# Bind to localhost only
host_dagster_ui_with_workspace_process_context(
debug_context,
host="127.0.0.1", # Never use 0.0.0.0 for debug
port=port,
path_prefix="",
log_level="debug"
)Debug mode has several limitations:
These limitations ensure debug mode is safe for analyzing production data without affecting live systems.
Install with Tessl CLI
npx tessl i tessl/pypi-dagster-webserver