The GraphQL frontend to python dagster providing programmatic interaction with Dagster runs, repositories, and jobs through a comprehensive GraphQL API.
—
Direct GraphQL schema operations provide full access to Dagster's GraphQL API surface through schema creation, query execution, and workspace management. This enables custom GraphQL operations beyond what the high-level client provides.
Create and configure the complete Dagster GraphQL schema with queries, mutations, and subscriptions for direct GraphQL operations.
def create_schema() -> graphene.Schema:
"""
Create complete GraphQL schema with queries, mutations, and subscriptions.
Returns:
- graphene.Schema: Configured GraphQL schema for Dagster operations
"""Usage example:
from dagster_graphql.schema import create_schema
import asyncio
schema = create_schema()
# Execute a GraphQL query directly
query = """
query GetRuns($limit: Int) {
runsOrError(limit: $limit) {
... on PipelineRuns {
results {
runId
status
pipelineName
creationTime
}
}
}
}
"""
result = asyncio.run(
schema.execute_async(query, variable_values={"limit": 10})
)
print(result.data)Execute GraphQL queries and mutations within Dagster workspace contexts with comprehensive error handling and result processing.
def execute_query(
workspace_process_context: WorkspaceProcessContext,
query: str,
variables: Optional[Mapping[str, object]] = None,
):
"""
Execute GraphQL query against workspace context.
Parameters:
- workspace_process_context (WorkspaceProcessContext): Workspace context for execution
- query (str): GraphQL query string to execute
- variables (Optional[Mapping[str, object]]): Variables for GraphQL execution
Returns:
- dict: GraphQL execution result with data and potential errors
Note: Includes automatic error handling with stack traces for debugging
"""Usage example:
from dagster_graphql.cli import execute_query
from dagster._core.workspace.context import WorkspaceProcessContext
# Execute custom GraphQL query
query = """
mutation LaunchRun($executionParams: ExecutionParams!) {
launchPipelineExecution(executionParams: $executionParams) {
__typename
... on LaunchRunSuccess {
run {
runId
status
}
}
... on PipelineConfigValidationInvalid {
errors {
message
path
}
}
}
}
"""
variables = {
"executionParams": {
"selector": {
"repositoryLocationName": "my_location",
"repositoryName": "my_repo",
"pipelineName": "my_job"
},
"runConfigData": {},
"mode": "default"
}
}
result = execute_query(workspace_context, query, variables)Execute GraphQL operations from command-line interfaces and automation scripts with file output support and remote server connectivity.
def execute_query_from_cli(
workspace_process_context,
query,
variables=None,
output=None
):
"""
Execute GraphQL query from CLI with optional file output.
Parameters:
- workspace_process_context: Workspace context for execution
- query (str): GraphQL query string to execute
- variables (Optional[str]): JSON-encoded variables for execution
- output (Optional[str]): File path to store GraphQL response
Returns:
- str: JSON-encoded result string
"""
def execute_query_against_remote(host, query, variables):
"""
Execute GraphQL query against remote Dagster server.
Parameters:
- host (str): Host URL for remote Dagster server
- query (str): GraphQL query string to execute
- variables: Variables for GraphQL execution
Returns:
- dict: GraphQL execution result
Raises:
- click.UsageError: If host URL is invalid or server fails sanity check
"""Usage example:
# Execute query with file output
result = execute_query_from_cli(
workspace_context,
query,
variables='{"limit": 5}',
output="/tmp/query_result.json"
)
# Execute against remote server
remote_result = execute_query_against_remote(
"https://my-dagster-server.com",
query,
{"repositoryName": "analytics"}
)Access predefined GraphQL operations for common Dagster tasks including pipeline execution and run management.
PREDEFINED_QUERIES = {
"launchPipelineExecution": LAUNCH_PIPELINE_EXECUTION_MUTATION,
}Available predefined queries include:
Usage example:
from dagster_graphql.client.query import LAUNCH_PIPELINE_EXECUTION_MUTATION
from dagster_graphql.cli import execute_query
# Use predefined mutation
mutation = LAUNCH_PIPELINE_EXECUTION_MUTATION
variables = {
"executionParams": {
"selector": {
"repositoryLocationName": "my_location",
"repositoryName": "my_repo",
"pipelineName": "my_pipeline"
}
}
}
result = execute_query(workspace_context, mutation, variables)The schema includes comprehensive type definitions for:
Example comprehensive GraphQL operation:
complex_query = """
query ComplexDagsterQuery($repoSelector: RepositorySelector!) {
repositoryOrError(repositorySelector: $repoSelector) {
... on Repository {
name
location {
name
}
pipelines {
name
modes {
name
resources {
name
configField {
configType {
key
}
}
}
}
solids {
name
inputs {
definition {
name
type {
displayName
}
}
}
outputs {
definition {
name
type {
displayName
}
}
}
}
}
}
... on RepositoryNotFoundError {
message
}
}
}
"""
variables = {
"repoSelector": {
"repositoryLocationName": "my_location",
"repositoryName": "my_repo"
}
}
result = execute_query(workspace_context, complex_query, variables)