CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dagster-graphql

The GraphQL frontend to python dagster providing programmatic interaction with Dagster runs, repositories, and jobs through a comprehensive GraphQL API.

Pending
Overview
Eval results
Files

schema.mddocs/

GraphQL Schema

Direct GraphQL schema operations provide full access to Dagster's GraphQL API surface through schema creation, query execution, and workspace management. This enables custom GraphQL operations beyond what the high-level client provides.

Capabilities

Schema Creation

Create and configure the complete Dagster GraphQL schema with queries, mutations, and subscriptions for direct GraphQL operations.

def create_schema() -> graphene.Schema:
    """
    Create complete GraphQL schema with queries, mutations, and subscriptions.

    Returns:
    - graphene.Schema: Configured GraphQL schema for Dagster operations
    """

Usage example:

from dagster_graphql.schema import create_schema
import asyncio

schema = create_schema()

# Execute a GraphQL query directly
query = """
query GetRuns($limit: Int) {
  runsOrError(limit: $limit) {
    ... on PipelineRuns {
      results {
        runId
        status
        pipelineName
        creationTime
      }
    }
  }
}
"""

result = asyncio.run(
    schema.execute_async(query, variable_values={"limit": 10})
)
print(result.data)

Query Execution

Execute GraphQL queries and mutations within Dagster workspace contexts with comprehensive error handling and result processing.

def execute_query(
    workspace_process_context: WorkspaceProcessContext,
    query: str,
    variables: Optional[Mapping[str, object]] = None,
):
    """
    Execute GraphQL query against workspace context.

    Parameters:
    - workspace_process_context (WorkspaceProcessContext): Workspace context for execution
    - query (str): GraphQL query string to execute
    - variables (Optional[Mapping[str, object]]): Variables for GraphQL execution

    Returns:
    - dict: GraphQL execution result with data and potential errors

    Note: Includes automatic error handling with stack traces for debugging
    """

Usage example:

from dagster_graphql.cli import execute_query
from dagster._core.workspace.context import WorkspaceProcessContext

# Execute custom GraphQL query
query = """
mutation LaunchRun($executionParams: ExecutionParams!) {
  launchPipelineExecution(executionParams: $executionParams) {
    __typename
    ... on LaunchRunSuccess {
      run {
        runId
        status
      }
    }
    ... on PipelineConfigValidationInvalid {
      errors {
        message
        path
      }
    }
  }
}
"""

variables = {
    "executionParams": {
        "selector": {
            "repositoryLocationName": "my_location",
            "repositoryName": "my_repo",
            "pipelineName": "my_job"
        },
        "runConfigData": {},
        "mode": "default"
    }
}

result = execute_query(workspace_context, query, variables)

CLI Query Execution

Execute GraphQL operations from command-line interfaces and automation scripts with file output support and remote server connectivity.

def execute_query_from_cli(
    workspace_process_context,
    query,
    variables=None,
    output=None
):
    """
    Execute GraphQL query from CLI with optional file output.

    Parameters:
    - workspace_process_context: Workspace context for execution
    - query (str): GraphQL query string to execute
    - variables (Optional[str]): JSON-encoded variables for execution
    - output (Optional[str]): File path to store GraphQL response

    Returns:
    - str: JSON-encoded result string
    """

def execute_query_against_remote(host, query, variables):
    """
    Execute GraphQL query against remote Dagster server.

    Parameters:
    - host (str): Host URL for remote Dagster server
    - query (str): GraphQL query string to execute
    - variables: Variables for GraphQL execution

    Returns:
    - dict: GraphQL execution result

    Raises:
    - click.UsageError: If host URL is invalid or server fails sanity check
    """

Usage example:

# Execute query with file output
result = execute_query_from_cli(
    workspace_context,
    query,
    variables='{"limit": 5}',
    output="/tmp/query_result.json"
)

# Execute against remote server
remote_result = execute_query_against_remote(
    "https://my-dagster-server.com",
    query,
    {"repositoryName": "analytics"}
)

Predefined Query Operations

Access predefined GraphQL operations for common Dagster tasks including pipeline execution and run management.

PREDEFINED_QUERIES = {
    "launchPipelineExecution": LAUNCH_PIPELINE_EXECUTION_MUTATION,
}

Available predefined queries include:

  • LAUNCH_PIPELINE_EXECUTION_MUTATION: Complete mutation for launching pipeline executions
  • SUBSCRIPTION_QUERY: Real-time subscription for run events
  • RUN_EVENTS_QUERY: Query for retrieving run events and logs
  • LAUNCH_MULTIPLE_RUNS_MUTATION: Batch execution mutation
  • LAUNCH_PIPELINE_REEXECUTION_MUTATION: Pipeline re-execution mutation
  • LAUNCH_PARTITION_BACKFILL_MUTATION: Partition backfill operations

Usage example:

from dagster_graphql.client.query import LAUNCH_PIPELINE_EXECUTION_MUTATION
from dagster_graphql.cli import execute_query

# Use predefined mutation
mutation = LAUNCH_PIPELINE_EXECUTION_MUTATION
variables = {
    "executionParams": {
        "selector": {
            "repositoryLocationName": "my_location",
            "repositoryName": "my_repo", 
            "pipelineName": "my_pipeline"
        }
    }
}

result = execute_query(workspace_context, mutation, variables)

GraphQL Schema Components

The schema includes comprehensive type definitions for:

Query Types

  • Repository and job metadata queries
  • Run status and history queries
  • Asset and partition queries
  • Schedule and sensor queries
  • Configuration schema queries

Mutation Types

  • Job and pipeline execution mutations
  • Run termination mutations
  • Repository location management mutations
  • Backfill and re-execution mutations

Subscription Types

  • Real-time run event subscriptions
  • Log streaming subscriptions
  • Asset materialization subscriptions

Error Handling Types

  • Comprehensive error type definitions
  • Validation error structures
  • Python error serialization with stack traces

Example comprehensive GraphQL operation:

complex_query = """
query ComplexDagsterQuery($repoSelector: RepositorySelector!) {
  repositoryOrError(repositorySelector: $repoSelector) {
    ... on Repository {
      name
      location {
        name
      }
      pipelines {
        name
        modes {
          name
          resources {
            name
            configField {
              configType {
                key
              }
            }
          }
        }
        solids {
          name
          inputs {
            definition {
              name
              type {
                displayName
              }
            }
          }
          outputs {
            definition {
              name
              type {
                displayName
              }
            }
          }
        }
      }
    }
    ... on RepositoryNotFoundError {
      message
    }
  }
}
"""

variables = {
    "repoSelector": {
        "repositoryLocationName": "my_location",
        "repositoryName": "my_repo"
    }
}

result = execute_query(workspace_context, complex_query, variables)

Install with Tessl CLI

npx tessl i tessl/pypi-dagster-graphql

docs

cli.md

client.md

index.md

schema.md

test-utilities.md

tile.json