or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

cli.mdclient.mdindex.mdschema.mdtest-utilities.md
tile.json

tessl/pypi-dagster-graphql

The GraphQL frontend to python dagster providing programmatic interaction with Dagster runs, repositories, and jobs through a comprehensive GraphQL API.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/dagster-graphql@1.11.x

To install, run

npx @tessl/cli install tessl/pypi-dagster-graphql@1.11.0

index.mddocs/

Dagster GraphQL

A comprehensive GraphQL API interface for the Dagster data orchestration platform. This package enables programmatic interaction with Dagster runs, repositories, and jobs through both a Python client and direct GraphQL operations, providing full access to Dagster's data orchestration capabilities including job execution, run management, repository operations, and real-time monitoring.

Package Information

  • Package Name: dagster-graphql
  • Language: Python
  • Installation: pip install dagster-graphql
  • License: Apache-2.0

Core Imports

from dagster_graphql import (
    DagsterGraphQLClient,
    DagsterGraphQLClientError,
    ReloadRepositoryLocationInfo,
    ReloadRepositoryLocationStatus,
    ShutdownRepositoryLocationInfo,
    ShutdownRepositoryLocationStatus,
    InvalidOutputErrorInfo
)

For GraphQL schema operations:

from dagster_graphql.schema import create_schema

For test utilities:

from dagster_graphql.test.utils import (
    execute_dagster_graphql,
    define_out_of_process_context
)

Basic Usage

from dagster_graphql import DagsterGraphQLClient, ReloadRepositoryLocationStatus

# Connect to Dagster GraphQL server
client = DagsterGraphQLClient("localhost", port_number=3000)

# Submit a job for execution
run_id = client.submit_job_execution(
    job_name="my_job",
    run_config={"ops": {"my_op": {"config": {"value": 42}}}},
    tags={"env": "dev"}
)

# Check run status
status = client.get_run_status(run_id)
print(f"Run {run_id} status: {status}")

# Reload repository location
reload_info = client.reload_repository_location("my_location")
if reload_info.status == ReloadRepositoryLocationStatus.SUCCESS:
    print("Repository location reloaded successfully")

Architecture

The dagster-graphql package provides multiple interaction layers:

  • Python Client: High-level Python client (DagsterGraphQLClient) for common operations
  • GraphQL Schema: Complete GraphQL schema for direct GraphQL operations
  • CLI Interface: Command-line tool for scripted GraphQL operations
  • Test Utilities: Comprehensive testing framework for GraphQL-based tests

This layered architecture enables both simple programmatic access and full GraphQL flexibility, supporting everything from basic job execution to complex data pipeline orchestration workflows.

Capabilities

GraphQL Client Operations

Core Python client functionality for job execution, run management, and repository operations. The DagsterGraphQLClient provides high-level methods for the most common Dagster operations.

class DagsterGraphQLClient:
    def __init__(
        self,
        hostname: str,
        port_number: Optional[int] = None,
        transport: Optional[Transport] = None,
        use_https: bool = False,
        timeout: int = 300,
        headers: Optional[dict[str, str]] = None,
        auth: Optional[AuthBase] = None,
    ): ...
    
    def submit_job_execution(
        self,
        job_name: str,
        repository_location_name: Optional[str] = None,
        repository_name: Optional[str] = None,
        run_config: Optional[Union[RunConfig, Mapping[str, Any]]] = None,
        tags: Optional[dict[str, Any]] = None,
        op_selection: Optional[Sequence[str]] = None,
        asset_selection: Optional[Sequence[CoercibleToAssetKey]] = None,
    ) -> str: ...
    
    def get_run_status(self, run_id: str) -> DagsterRunStatus: ...
    def reload_repository_location(self, repository_location_name: str) -> ReloadRepositoryLocationInfo: ...
    def terminate_run(self, run_id: str): ...
    def terminate_runs(self, run_ids: list[str]): ...

GraphQL Client

GraphQL Schema Operations

Direct GraphQL schema creation and execution for custom queries, mutations, and subscriptions. Enables full access to Dagster's GraphQL API surface.

def create_schema() -> graphene.Schema: ...

def execute_query(
    workspace_process_context: WorkspaceProcessContext,
    query: str,
    variables: Optional[Mapping[str, object]] = None,
): ...

GraphQL Schema

Command Line Interface

Command-line interface for executing GraphQL operations from scripts and automation workflows.

def main(): ...

def execute_query_from_cli(
    workspace_process_context,
    query,
    variables=None,
    output=None
): ...

def execute_query_against_remote(host, query, variables): ...

Command Line Interface

Test Utilities

Comprehensive testing framework for GraphQL-based testing with workspace management, query execution, and result processing utilities.

def execute_dagster_graphql(
    context: WorkspaceRequestContext,
    query: str,
    variables: Optional[GqlVariables] = None,
    schema: graphene.Schema = SCHEMA,
) -> GqlResult: ...

def define_out_of_process_context(
    python_or_workspace_file: str,
    fn_name: Optional[str],
    instance: DagsterInstance,
    read_only: bool = False,
    read_only_locations: Optional[Mapping[str, bool]] = None,
) -> Iterator[WorkspaceRequestContext]: ...

Test Utilities

Exception Classes

class DagsterGraphQLClientError(Exception):
    def __init__(self, *args, body=None): ...

Status and Information Types

class ReloadRepositoryLocationStatus(Enum):
    SUCCESS = "SUCCESS"
    FAILURE = "FAILURE"

class ShutdownRepositoryLocationStatus(Enum):
    SUCCESS = "SUCCESS"
    FAILURE = "FAILURE"

class ReloadRepositoryLocationInfo(NamedTuple):
    status: ReloadRepositoryLocationStatus
    failure_type: Optional[str] = None
    message: Optional[str] = None

class ShutdownRepositoryLocationInfo(NamedTuple):
    status: ShutdownRepositoryLocationStatus
    message: Optional[str] = None

class InvalidOutputErrorInfo(NamedTuple):
    step_key: str
    invalid_output_name: str