The GraphQL frontend to python dagster providing programmatic interaction with Dagster runs, repositories, and jobs through a comprehensive GraphQL API.
—
The dagster-graphql command-line interface provides scripted access to Dagster's GraphQL API for automation, debugging, and operational tasks. It supports both local workspace operations and remote server connectivity.
Main command-line interface for executing GraphQL operations against Dagster workspaces and remote servers.
def main():
"""
Main CLI entry point for dagster-graphql command.
Provides access to GraphQL query execution with various input methods
and output options for automation and debugging workflows.
"""Command-line usage:
# Install package with CLI
pip install dagster-graphql
# Basic GraphQL query execution
dagster-graphql -t "query { instance { info } }"
# Execute query from file
dagster-graphql -f query.graphql
# Use predefined query with variables
dagster-graphql -p launchPipelineExecution -v '{"executionParams": {...}}'
# Execute against remote server
dagster-graphql -r https://my-dagster.com -t "query { runs { runId } }"
# Save output to file
dagster-graphql -t "query { runs { runId status } }" -o results.jsonMultiple methods for providing GraphQL queries including text input, file input, and predefined queries.
@click.command(name="ui")
@click.option("--text", "-t", type=click.STRING, help="GraphQL document as string")
@click.option("--file", "-f", type=click.File(), help="GraphQL document from file")
@click.option("--predefined", "-p", type=click.Choice(list(PREDEFINED_QUERIES.keys())), help="Predefined GraphQL document")
@click.option("--variables", "-v", type=click.STRING, help="JSON encoded variables")
@click.option("--remote", "-r", type=click.STRING, help="Remote server URL")
@click.option("--output", "-o", type=click.STRING, help="Output file path")
@click.option("--ephemeral-instance", is_flag=True, help="Use ephemeral DagsterInstance")
def ui(text, file, predefined, variables, remote, output, ephemeral_instance, **other_opts):
"""
Execute GraphQL queries against Dagster interface.
Parameters:
- text: GraphQL query as command-line string
- file: GraphQL query from file
- predefined: Use predefined query by name
- variables: JSON-encoded GraphQL variables
- remote: Remote Dagster server URL
- output: File to save query results
- ephemeral_instance: Use temporary instance instead of DAGSTER_HOME
"""Usage examples:
# Text query with variables
dagster-graphql -t "query GetRuns(\$limit: Int) { runs(limit: \$limit) { runId } }" -v '{"limit": 5}'
# File-based query
echo 'query { instance { daemonHealth { id healthy } } }' > health.graphql
dagster-graphql -f health.graphql
# Predefined query for job execution
dagster-graphql -p launchPipelineExecution -v '{
"executionParams": {
"selector": {
"repositoryLocationName": "analytics",
"repositoryName": "data_repo",
"pipelineName": "daily_etl"
}
}
}'Execute GraphQL operations against remote Dagster servers with automatic server validation and error handling.
def execute_query_against_remote(host, query, variables):
"""
Execute GraphQL query against remote Dagster server.
Parameters:
- host (str): Host URL for remote Dagster server (must include scheme)
- query (str): GraphQL query string to execute
- variables: Variables for GraphQL execution
Returns:
- dict: JSON response from remote server
Raises:
- click.UsageError: If host URL is invalid or server fails sanity check
Note: Performs automatic sanity check by verifying /server_info endpoint
contains 'dagster_webserver' to ensure target is a Dagster server.
"""Usage examples:
# Query remote Dagster server
dagster-graphql -r https://dagster.mycompany.com -t "query { runs(limit: 10) { runId status } }"
# Remote query with authentication headers (use environment variables for tokens)
export DAGSTER_CLOUD_API_TOKEN="your-token-here"
dagster-graphql -r https://myorg.dagster.cloud -t "query { instance { info } }"
# Remote job execution
dagster-graphql -r https://dagster.mycompany.com -p launchPipelineExecution -v '{
"executionParams": {
"selector": {
"repositoryLocationName": "production",
"repositoryName": "main_repo",
"pipelineName": "critical_pipeline"
},
"runConfigData": {"ops": {"extract": {"config": {"date": "2023-10-01"}}}}
}
}'Integration with Dagster workspace configuration for local development and testing environments.
# Use specific workspace file
dagster-graphql -y workspace.yaml -t "query { repositoriesOrError { ... } }"
# Python file with repository definition
dagster-graphql -f my_repo.py -a my_repository -t "query { runs { runId } }"
# Module-based repository
dagster-graphql -m my_package.repository -a get_repo -t "query { jobs { name } }"
# Ephemeral instance for testing
dagster-graphql --ephemeral-instance -t "query { instance { info } }"Flexible output handling for automation workflows including file output and structured JSON responses.
# Save query results to file (useful for large responses)
dagster-graphql -t "query { runs { runId status logs { level message } } }" -o run_details.json
# Pipe output to other tools
dagster-graphql -t "query { runs { runId } }" | jq '.data.runs[].runId'
# Combine with shell scripting
RUN_IDS=$(dagster-graphql -t "query { runs(filter: {status: STARTED}) { runId } }" | jq -r '.data.runs[].runId')
for run_id in $RUN_IDS; do
echo "Processing run: $run_id"
doneThe CLI provides comprehensive error handling with detailed stack traces and debugging information:
# Server connectivity issues
dagster-graphql -r https://invalid-server.com -t "query { runs }"
# Output: Error when connecting to url... Did you specify hostname correctly?
# GraphQL syntax errors
dagster-graphql -t "query { invalid syntax }"
# Output: GraphQL syntax error with position information
# Query execution errors with stack traces
dagster-graphql -t "query { nonexistentField }"
# Output: Field resolution error with full stack trace if availableAvailable predefined queries for common operations:
# List available predefined queries
dagster-graphql --help # Shows available options for -p/--predefined
# Use predefined query
dagster-graphql -p launchPipelineExecution -v '{
"executionParams": {
"selector": {"repositoryLocationName": "main", "repositoryName": "repo", "pipelineName": "job"},
"runConfigData": {}
}
}'#!/bin/bash
# Deploy and verify pipeline
echo "Executing data validation job..."
RESULT=$(dagster-graphql -r $DAGSTER_SERVER -p launchPipelineExecution -v '{
"executionParams": {
"selector": {
"repositoryLocationName": "production",
"repositoryName": "data_validation",
"pipelineName": "daily_validation"
}
}
}')
RUN_ID=$(echo $RESULT | jq -r '.data.launchPipelineExecution.run.runId')
echo "Started validation run: $RUN_ID"
# Monitor run status
while true; do
STATUS=$(dagster-graphql -r $DAGSTER_SERVER -t "query { runOrError(runId: \"$RUN_ID\") { ... on Run { status } } }" | jq -r '.data.runOrError.status')
if [[ "$STATUS" == "SUCCESS" ]]; then
echo "Validation completed successfully"
break
elif [[ "$STATUS" == "FAILURE" ]]; then
echo "Validation failed"
exit 1
fi
sleep 30
done#!/bin/bash
# Check for failed runs in last hour
FAILED_RUNS=$(dagster-graphql -r $DAGSTER_SERVER -t '
query {
runs(filter: {status: FAILURE, createdAfter: "'$(date -d '1 hour ago' -Iseconds)'"}) {
runId
pipelineName
status
startTime
}
}
' | jq '.data.runs')
if [[ $(echo $FAILED_RUNS | jq 'length') -gt 0 ]]; then
echo "Alert: Failed runs detected in last hour"
echo $FAILED_RUNS | jq '.'
# Send to alerting system
fi