CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dagster-graphql

The GraphQL frontend to python dagster providing programmatic interaction with Dagster runs, repositories, and jobs through a comprehensive GraphQL API.

Pending
Overview
Eval results
Files

cli.mddocs/

Command Line Interface

The dagster-graphql command-line interface provides scripted access to Dagster's GraphQL API for automation, debugging, and operational tasks. It supports both local workspace operations and remote server connectivity.

Capabilities

CLI Entry Point

Main command-line interface for executing GraphQL operations against Dagster workspaces and remote servers.

def main():
    """
    Main CLI entry point for dagster-graphql command.
    
    Provides access to GraphQL query execution with various input methods
    and output options for automation and debugging workflows.
    """

Command-line usage:

# Install package with CLI
pip install dagster-graphql

# Basic GraphQL query execution
dagster-graphql -t "query { instance { info } }"

# Execute query from file
dagster-graphql -f query.graphql

# Use predefined query with variables
dagster-graphql -p launchPipelineExecution -v '{"executionParams": {...}}'

# Execute against remote server
dagster-graphql -r https://my-dagster.com -t "query { runs { runId } }"

# Save output to file
dagster-graphql -t "query { runs { runId status } }" -o results.json

Query Input Methods

Multiple methods for providing GraphQL queries including text input, file input, and predefined queries.

@click.command(name="ui")
@click.option("--text", "-t", type=click.STRING, help="GraphQL document as string")
@click.option("--file", "-f", type=click.File(), help="GraphQL document from file")  
@click.option("--predefined", "-p", type=click.Choice(list(PREDEFINED_QUERIES.keys())), help="Predefined GraphQL document")
@click.option("--variables", "-v", type=click.STRING, help="JSON encoded variables")
@click.option("--remote", "-r", type=click.STRING, help="Remote server URL")
@click.option("--output", "-o", type=click.STRING, help="Output file path")
@click.option("--ephemeral-instance", is_flag=True, help="Use ephemeral DagsterInstance")
def ui(text, file, predefined, variables, remote, output, ephemeral_instance, **other_opts):
    """
    Execute GraphQL queries against Dagster interface.
    
    Parameters:
    - text: GraphQL query as command-line string
    - file: GraphQL query from file
    - predefined: Use predefined query by name
    - variables: JSON-encoded GraphQL variables
    - remote: Remote Dagster server URL
    - output: File to save query results
    - ephemeral_instance: Use temporary instance instead of DAGSTER_HOME
    """

Usage examples:

# Text query with variables
dagster-graphql -t "query GetRuns(\$limit: Int) { runs(limit: \$limit) { runId } }" -v '{"limit": 5}'

# File-based query
echo 'query { instance { daemonHealth { id healthy } } }' > health.graphql
dagster-graphql -f health.graphql

# Predefined query for job execution
dagster-graphql -p launchPipelineExecution -v '{
  "executionParams": {
    "selector": {
      "repositoryLocationName": "analytics",
      "repositoryName": "data_repo", 
      "pipelineName": "daily_etl"
    }
  }
}'

Remote Server Operations

Execute GraphQL operations against remote Dagster servers with automatic server validation and error handling.

def execute_query_against_remote(host, query, variables):
    """
    Execute GraphQL query against remote Dagster server.

    Parameters:
    - host (str): Host URL for remote Dagster server (must include scheme)
    - query (str): GraphQL query string to execute  
    - variables: Variables for GraphQL execution

    Returns:
    - dict: JSON response from remote server

    Raises:
    - click.UsageError: If host URL is invalid or server fails sanity check
    
    Note: Performs automatic sanity check by verifying /server_info endpoint
    contains 'dagster_webserver' to ensure target is a Dagster server.
    """

Usage examples:

# Query remote Dagster server
dagster-graphql -r https://dagster.mycompany.com -t "query { runs(limit: 10) { runId status } }"

# Remote query with authentication headers (use environment variables for tokens)
export DAGSTER_CLOUD_API_TOKEN="your-token-here"
dagster-graphql -r https://myorg.dagster.cloud -t "query { instance { info } }"

# Remote job execution
dagster-graphql -r https://dagster.mycompany.com -p launchPipelineExecution -v '{
  "executionParams": {
    "selector": {
      "repositoryLocationName": "production",
      "repositoryName": "main_repo",
      "pipelineName": "critical_pipeline" 
    },
    "runConfigData": {"ops": {"extract": {"config": {"date": "2023-10-01"}}}}
  }
}'

Workspace Integration

Integration with Dagster workspace configuration for local development and testing environments.

# Use specific workspace file
dagster-graphql -y workspace.yaml -t "query { repositoriesOrError { ... } }"

# Python file with repository definition
dagster-graphql -f my_repo.py -a my_repository -t "query { runs { runId } }"

# Module-based repository
dagster-graphql -m my_package.repository -a get_repo -t "query { jobs { name } }"

# Ephemeral instance for testing
dagster-graphql --ephemeral-instance -t "query { instance { info } }"

Output Management

Flexible output handling for automation workflows including file output and structured JSON responses.

# Save query results to file (useful for large responses)
dagster-graphql -t "query { runs { runId status logs { level message } } }" -o run_details.json

# Pipe output to other tools
dagster-graphql -t "query { runs { runId } }" | jq '.data.runs[].runId'

# Combine with shell scripting
RUN_IDS=$(dagster-graphql -t "query { runs(filter: {status: STARTED}) { runId } }" | jq -r '.data.runs[].runId')
for run_id in $RUN_IDS; do
  echo "Processing run: $run_id"
done

Error Handling and Debugging

The CLI provides comprehensive error handling with detailed stack traces and debugging information:

# Server connectivity issues
dagster-graphql -r https://invalid-server.com -t "query { runs }"
# Output: Error when connecting to url... Did you specify hostname correctly?

# GraphQL syntax errors  
dagster-graphql -t "query { invalid syntax }"
# Output: GraphQL syntax error with position information

# Query execution errors with stack traces
dagster-graphql -t "query { nonexistentField }"
# Output: Field resolution error with full stack trace if available

Predefined Operations

Available predefined queries for common operations:

  • launchPipelineExecution: Execute jobs/pipelines with full configuration
  • Additional predefined queries can be added for common operational tasks
# List available predefined queries
dagster-graphql --help  # Shows available options for -p/--predefined

# Use predefined query
dagster-graphql -p launchPipelineExecution -v '{
  "executionParams": {
    "selector": {"repositoryLocationName": "main", "repositoryName": "repo", "pipelineName": "job"},
    "runConfigData": {}
  }
}'

Integration Examples

CI/CD Pipeline Integration

#!/bin/bash
# Deploy and verify pipeline
echo "Executing data validation job..."
RESULT=$(dagster-graphql -r $DAGSTER_SERVER -p launchPipelineExecution -v '{
  "executionParams": {
    "selector": {
      "repositoryLocationName": "production",
      "repositoryName": "data_validation", 
      "pipelineName": "daily_validation"
    }
  }
}')

RUN_ID=$(echo $RESULT | jq -r '.data.launchPipelineExecution.run.runId')
echo "Started validation run: $RUN_ID"

# Monitor run status
while true; do
  STATUS=$(dagster-graphql -r $DAGSTER_SERVER -t "query { runOrError(runId: \"$RUN_ID\") { ... on Run { status } } }" | jq -r '.data.runOrError.status')
  if [[ "$STATUS" == "SUCCESS" ]]; then
    echo "Validation completed successfully"
    break
  elif [[ "$STATUS" == "FAILURE" ]]; then
    echo "Validation failed"
    exit 1
  fi
  sleep 30
done

Monitoring and Alerting

#!/bin/bash
# Check for failed runs in last hour
FAILED_RUNS=$(dagster-graphql -r $DAGSTER_SERVER -t '
  query {
    runs(filter: {status: FAILURE, createdAfter: "'$(date -d '1 hour ago' -Iseconds)'"}) {
      runId
      pipelineName
      status
      startTime
    }
  }
' | jq '.data.runs')

if [[ $(echo $FAILED_RUNS | jq 'length') -gt 0 ]]; then
  echo "Alert: Failed runs detected in last hour"
  echo $FAILED_RUNS | jq '.'
  # Send to alerting system
fi

Install with Tessl CLI

npx tessl i tessl/pypi-dagster-graphql

docs

cli.md

client.md

index.md

schema.md

test-utilities.md

tile.json