The GraphQL frontend to python dagster providing programmatic interaction with Dagster runs, repositories, and jobs through a comprehensive GraphQL API.
npx @tessl/cli install tessl/pypi-dagster-graphql@1.11.0A comprehensive GraphQL API interface for the Dagster data orchestration platform. This package enables programmatic interaction with Dagster runs, repositories, and jobs through both a Python client and direct GraphQL operations, providing full access to Dagster's data orchestration capabilities including job execution, run management, repository operations, and real-time monitoring.
pip install dagster-graphqlfrom dagster_graphql import (
DagsterGraphQLClient,
DagsterGraphQLClientError,
ReloadRepositoryLocationInfo,
ReloadRepositoryLocationStatus,
ShutdownRepositoryLocationInfo,
ShutdownRepositoryLocationStatus,
InvalidOutputErrorInfo
)For GraphQL schema operations:
from dagster_graphql.schema import create_schemaFor test utilities:
from dagster_graphql.test.utils import (
execute_dagster_graphql,
define_out_of_process_context
)from dagster_graphql import DagsterGraphQLClient, ReloadRepositoryLocationStatus
# Connect to Dagster GraphQL server
client = DagsterGraphQLClient("localhost", port_number=3000)
# Submit a job for execution
run_id = client.submit_job_execution(
job_name="my_job",
run_config={"ops": {"my_op": {"config": {"value": 42}}}},
tags={"env": "dev"}
)
# Check run status
status = client.get_run_status(run_id)
print(f"Run {run_id} status: {status}")
# Reload repository location
reload_info = client.reload_repository_location("my_location")
if reload_info.status == ReloadRepositoryLocationStatus.SUCCESS:
print("Repository location reloaded successfully")The dagster-graphql package provides multiple interaction layers:
This layered architecture enables both simple programmatic access and full GraphQL flexibility, supporting everything from basic job execution to complex data pipeline orchestration workflows.
Core Python client functionality for job execution, run management, and repository operations. The DagsterGraphQLClient provides high-level methods for the most common Dagster operations.
class DagsterGraphQLClient:
def __init__(
self,
hostname: str,
port_number: Optional[int] = None,
transport: Optional[Transport] = None,
use_https: bool = False,
timeout: int = 300,
headers: Optional[dict[str, str]] = None,
auth: Optional[AuthBase] = None,
): ...
def submit_job_execution(
self,
job_name: str,
repository_location_name: Optional[str] = None,
repository_name: Optional[str] = None,
run_config: Optional[Union[RunConfig, Mapping[str, Any]]] = None,
tags: Optional[dict[str, Any]] = None,
op_selection: Optional[Sequence[str]] = None,
asset_selection: Optional[Sequence[CoercibleToAssetKey]] = None,
) -> str: ...
def get_run_status(self, run_id: str) -> DagsterRunStatus: ...
def reload_repository_location(self, repository_location_name: str) -> ReloadRepositoryLocationInfo: ...
def terminate_run(self, run_id: str): ...
def terminate_runs(self, run_ids: list[str]): ...Direct GraphQL schema creation and execution for custom queries, mutations, and subscriptions. Enables full access to Dagster's GraphQL API surface.
def create_schema() -> graphene.Schema: ...
def execute_query(
workspace_process_context: WorkspaceProcessContext,
query: str,
variables: Optional[Mapping[str, object]] = None,
): ...Command-line interface for executing GraphQL operations from scripts and automation workflows.
def main(): ...
def execute_query_from_cli(
workspace_process_context,
query,
variables=None,
output=None
): ...
def execute_query_against_remote(host, query, variables): ...Comprehensive testing framework for GraphQL-based testing with workspace management, query execution, and result processing utilities.
def execute_dagster_graphql(
context: WorkspaceRequestContext,
query: str,
variables: Optional[GqlVariables] = None,
schema: graphene.Schema = SCHEMA,
) -> GqlResult: ...
def define_out_of_process_context(
python_or_workspace_file: str,
fn_name: Optional[str],
instance: DagsterInstance,
read_only: bool = False,
read_only_locations: Optional[Mapping[str, bool]] = None,
) -> Iterator[WorkspaceRequestContext]: ...class DagsterGraphQLClientError(Exception):
def __init__(self, *args, body=None): ...class ReloadRepositoryLocationStatus(Enum):
SUCCESS = "SUCCESS"
FAILURE = "FAILURE"
class ShutdownRepositoryLocationStatus(Enum):
SUCCESS = "SUCCESS"
FAILURE = "FAILURE"
class ReloadRepositoryLocationInfo(NamedTuple):
status: ReloadRepositoryLocationStatus
failure_type: Optional[str] = None
message: Optional[str] = None
class ShutdownRepositoryLocationInfo(NamedTuple):
status: ShutdownRepositoryLocationStatus
message: Optional[str] = None
class InvalidOutputErrorInfo(NamedTuple):
step_key: str
invalid_output_name: str