CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dagster-graphql

The GraphQL frontend to python dagster providing programmatic interaction with Dagster runs, repositories, and jobs through a comprehensive GraphQL API.

Pending
Overview
Eval results
Files

test-utilities.mddocs/

Test Utilities

Comprehensive testing framework for GraphQL-based testing with workspace management, query execution, and result processing utilities. These utilities enable robust testing of Dagster GraphQL operations in various contexts.

Capabilities

GraphQL Query Execution

Execute GraphQL queries in test contexts with comprehensive error handling and result processing.

def execute_dagster_graphql(
    context: WorkspaceRequestContext,
    query: str,
    variables: Optional[GqlVariables] = None,
    schema: graphene.Schema = SCHEMA,
) -> GqlResult:
    """
    Execute GraphQL query in test context with error handling.

    Parameters:
    - context (WorkspaceRequestContext): Workspace context for execution
    - query (str): GraphQL query string to execute
    - variables (Optional[GqlVariables]): Variables for GraphQL execution
    - schema (graphene.Schema): GraphQL schema to use (defaults to SCHEMA)

    Returns:
    - GqlResult: Result object with data and errors properties

    Note: Automatically clears loaders between requests and handles GraphQL errors
    by raising the original exception for easier debugging.
    """

def async_execute_dagster_graphql(
    context: WorkspaceRequestContext,
    query: str,
    variables: Optional[GqlVariables] = None,
    schema: graphene.Schema = SCHEMA,
) -> GqlResult:
    """
    Asynchronous version of GraphQL query execution for async tests.

    Parameters: Same as execute_dagster_graphql
    Returns: Same as execute_dagster_graphql
    """

Usage example:

from dagster_graphql.test.utils import execute_dagster_graphql

def test_job_execution(graphql_context):
    query = """
    mutation LaunchJob($selector: PipelineSelector!) {
      launchPipelineExecution(executionParams: {selector: $selector}) {
        __typename
        ... on LaunchRunSuccess {
          run {
            runId
            status
          }
        }
      }
    }
    """
    
    variables = {
        "selector": {
            "repositoryLocationName": "test_location",
            "repositoryName": "test_repo",
            "pipelineName": "my_job"
        }
    }
    
    result = execute_dagster_graphql(graphql_context, query, variables)
    assert result.data["launchPipelineExecution"]["__typename"] == "LaunchRunSuccess"
    run_id = result.data["launchPipelineExecution"]["run"]["runId"]
    assert run_id is not None

Subscription Testing

Execute GraphQL subscriptions for testing real-time event processing and streaming operations.

def execute_dagster_graphql_subscription(
    context: WorkspaceRequestContext,
    query: str,
    variables: Optional[GqlVariables] = None,
    schema: graphene.Schema = SCHEMA,
) -> Sequence[GqlResult]:
    """
    Execute GraphQL subscription query and return results.

    Parameters:
    - context (WorkspaceRequestContext): Workspace context for execution
    - query (str): GraphQL subscription query string
    - variables (Optional[GqlVariables]): Variables for subscription
    - schema (graphene.Schema): GraphQL schema to use

    Returns:
    - Sequence[GqlResult]: Sequence of subscription results

    Note: Returns first payload from subscription for testing purposes.
    """

Usage example:

def test_run_events_subscription(graphql_context):
    subscription = """
    subscription RunEvents($runId: ID!) {
      pipelineRunLogs(runId: $runId) {
        __typename
        ... on PipelineRunLogsSubscriptionSuccess {
          messages {
            __typename
            ... on RunStartEvent {
              runId
            }
          }
        }
      }
    }
    """
    
    results = execute_dagster_graphql_subscription(
        graphql_context, 
        subscription, 
        {"runId": "test-run-id"}
    )
    
    assert len(results) > 0
    assert results[0].data is not None

Test Context Management

Create and manage workspace contexts for testing with various configuration options and repository setups.

def define_out_of_process_context(
    python_or_workspace_file: str,
    fn_name: Optional[str],
    instance: DagsterInstance,
    read_only: bool = False,
    read_only_locations: Optional[Mapping[str, bool]] = None,
) -> Iterator[WorkspaceRequestContext]:
    """
    Create out-of-process workspace context for testing.

    Parameters:
    - python_or_workspace_file (str): Python file or workspace YAML file path
    - fn_name (Optional[str]): Function name for repository definition
    - instance (DagsterInstance): Dagster instance for testing
    - read_only (bool): Whether workspace should be read-only
    - read_only_locations (Optional[Mapping[str, bool]]): Per-location read-only settings

    Yields:
    - WorkspaceRequestContext: Configured workspace context for testing
    """

def temp_workspace_file(python_fns: list[tuple[str, str, Optional[str]]]) -> Iterator[str]:
    """
    Create temporary workspace configuration file for testing.

    Parameters:
    - python_fns (list[tuple[str, str, Optional[str]]]): List of tuples containing
      (location_name, python_file_path, function_name)

    Yields:
    - str: Path to temporary workspace.yaml file
    """

Usage example:

from dagster import DagsterInstance
from dagster_graphql.test.utils import define_out_of_process_context, temp_workspace_file

def test_multi_location_workspace():
    with temp_workspace_file([
        ("location_a", "repos/repo_a.py", "get_repo_a"),
        ("location_b", "repos/repo_b.py", "get_repo_b")
    ]) as workspace_file:
        with DagsterInstance.ephemeral() as instance:
            with define_out_of_process_context(
                workspace_file, None, instance
            ) as context:
                # Test multi-location operations
                locations = context.code_locations
                assert len(locations) == 2
                assert any(loc.name == "location_a" for loc in locations)
                assert any(loc.name == "location_b" for loc in locations)

Test Helper Functions

Utility functions for inferring repository information and creating selectors for GraphQL operations.

def infer_repository(graphql_context: WorkspaceRequestContext) -> RemoteRepository:
    """
    Automatically infer repository from GraphQL context.

    Parameters:
    - graphql_context (WorkspaceRequestContext): Workspace context

    Returns:
    - RemoteRepository: Inferred repository object
    """

def infer_repository_selector(
    graphql_context: WorkspaceRequestContext, 
    location_name: Optional[str] = None
) -> Selector:
    """
    Create repository selector dictionary from context.

    Parameters:
    - graphql_context (WorkspaceRequestContext): Workspace context
    - location_name (Optional[str]): Specific location name to use

    Returns:
    - Selector: Dictionary with repositoryLocationName and repositoryName
    """

def infer_job_selector(
    graphql_context: WorkspaceRequestContext,
    job_name: str,
    op_selection: Optional[Sequence[str]] = None,
    asset_selection: Optional[Sequence[GqlAssetKey]] = None,
    asset_check_selection: Optional[Sequence[GqlAssetCheckHandle]] = None,
    location_name: Optional[str] = None,
) -> Selector:
    """
    Create job selector dictionary with optional op/asset selection.

    Parameters:
    - graphql_context (WorkspaceRequestContext): Workspace context
    - job_name (str): Name of the job
    - op_selection (Optional[Sequence[str]]): Specific ops to select
    - asset_selection (Optional[Sequence[GqlAssetKey]]): Specific assets to select
    - asset_check_selection (Optional[Sequence[GqlAssetCheckHandle]]): Asset checks to select
    - location_name (Optional[str]): Specific location name

    Returns:
    - Selector: Dictionary with job selection parameters
    """

Usage example:

def test_job_with_op_selection(graphql_context):
    # Automatically infer repository and create job selector
    job_selector = infer_job_selector(
        graphql_context,
        "complex_pipeline",
        op_selection=["extract_data", "transform_data"]
    )
    
    query = """
    query JobDetails($selector: PipelineSelector!) {
      pipelineOrError(params: $selector) {
        ... on Pipeline {
          name
          solids {
            name
          }
        }
      }
    }
    """
    
    result = execute_dagster_graphql(
        graphql_context, 
        query, 
        {"selector": job_selector}
    )
    
    assert result.data["pipelineOrError"]["name"] == "complex_pipeline"

Asset Testing Utilities

Specialized utilities for testing asset materialization and asset-based workflows.

def materialize_assets(
    context: WorkspaceRequestContext,
    asset_selection: Optional[Sequence[AssetKey]] = None,
    partition_keys: Optional[Sequence[str]] = None,
    run_config_data: Optional[Mapping[str, Any]] = None,
    location_name: Optional[str] = None,
) -> Union[GqlResult, Sequence[GqlResult]]:
    """
    Execute asset materialization through GraphQL API.

    Parameters:
    - context (WorkspaceRequestContext): Workspace context
    - asset_selection (Optional[Sequence[AssetKey]]): Specific assets to materialize
    - partition_keys (Optional[Sequence[str]]): Partition keys for partitioned assets
    - run_config_data (Optional[Mapping[str, Any]]): Run configuration
    - location_name (Optional[str]): Specific location name

    Returns:
    - Union[GqlResult, Sequence[GqlResult]]: Single result or sequence for partitioned runs
    """

Usage example:

from dagster import AssetKey
from dagster_graphql.test.utils import materialize_assets

def test_asset_materialization(graphql_context):
    # Materialize specific assets
    result = materialize_assets(
        graphql_context,
        asset_selection=[AssetKey("users"), AssetKey("orders")],
        run_config_data={
            "ops": {
                "extract_users": {
                    "config": {"source": "production_db"}
                }
            }
        }
    )
    
    assert result.data["launchPipelineExecution"]["__typename"] == "LaunchRunSuccess"
    
    # Test partitioned asset materialization
    partitioned_results = materialize_assets(
        graphql_context,
        asset_selection=[AssetKey("daily_metrics")],
        partition_keys=["2023-10-01", "2023-10-02", "2023-10-03"]
    )
    
    assert len(partitioned_results) == 3
    for result in partitioned_results:
        assert result.data["launchPipelineExecution"]["__typename"] == "LaunchRunSuccess"

Advanced Testing Patterns

Complete testing patterns for complex GraphQL operations:

def test_complete_workflow(graphql_context):
    """Test complete workflow from job execution to completion."""
    
    # 1. Execute job
    job_result = execute_dagster_graphql(graphql_context, """
        mutation {
          launchPipelineExecution(executionParams: {
            selector: {
              repositoryLocationName: "test_location"
              repositoryName: "test_repo"
              pipelineName: "etl_pipeline"
            }
          }) {
            ... on LaunchRunSuccess {
              run { runId }
            }
          }
        }
    """)
    
    run_id = job_result.data["launchPipelineExecution"]["run"]["runId"]
    
    # 2. Wait for completion with periodic status checks
    execute_dagster_graphql_and_finish_runs(graphql_context, f"""
        query {{
          runOrError(runId: "{run_id}") {{
            ... on Run {{
              status
              stats {{
                stepsSucceededCount
                stepsFailed
              }}
            }}
          }}
        }}
    """)
    
    # 3. Verify final results
    final_result = execute_dagster_graphql(graphql_context, f"""
        query {{
          runOrError(runId: "{run_id}") {{
            ... on Run {{
              status
              assets {{
                key {{
                  path
                }}
                materialization {{
                  timestamp
                }}
              }}
            }}
          }}
        }}
    """)
    
    run_data = final_result.data["runOrError"]
    assert run_data["status"] == "SUCCESS"
    assert len(run_data["assets"]) > 0

Type Definitions for Testing

class GqlResult(Protocol):
    @property
    def data(self) -> Mapping[str, Any]: ...
    
    @property  
    def errors(self) -> Optional[Sequence[str]]: ...

class GqlTag(TypedDict):
    key: str
    value: str

class GqlAssetKey(TypedDict):
    path: Sequence[str]

class GqlAssetCheckHandle(TypedDict):
    assetKey: GqlAssetKey
    name: str

Selector = dict[str, Any]
GqlVariables = Mapping[str, Any]

Install with Tessl CLI

npx tessl i tessl/pypi-dagster-graphql

docs

cli.md

client.md

index.md

schema.md

test-utilities.md

tile.json