The GraphQL frontend to python dagster providing programmatic interaction with Dagster runs, repositories, and jobs through a comprehensive GraphQL API.
—
The DagsterGraphQLClient provides a high-level Python interface for interacting with Dagster's GraphQL API. It handles connection management, authentication, error handling, and provides convenient methods for the most common Dagster operations.
Create a connection to a Dagster GraphQL server with various configuration options including authentication, timeouts, and custom transport protocols.
class DagsterGraphQLClient:
def __init__(
self,
hostname: str,
port_number: Optional[int] = None,
transport: Optional[Transport] = None,
use_https: bool = False,
timeout: int = 300,
headers: Optional[dict[str, str]] = None,
auth: Optional[AuthBase] = None,
):
"""
Initialize GraphQL client for Dagster API.
Parameters:
- hostname (str): Hostname for the Dagster GraphQL API
- port_number (Optional[int]): Port number to connect to on the host
- transport (Optional[Transport]): Custom transport for connection
- use_https (bool): Whether to use https in the URL connection string
- timeout (int): Number of seconds before requests should time out
- headers (Optional[dict[str, str]]): Additional headers to include in requests
- auth (Optional[AuthBase]): Authentication handler for requests
Raises:
- DagsterGraphQLClientError: If connection to the host fails
"""Usage example:
# Basic connection
client = DagsterGraphQLClient("localhost", port_number=3000)
# HTTPS connection with authentication
client = DagsterGraphQLClient(
"my-dagster.example.com",
use_https=True,
headers={"Dagster-Cloud-Api-Token": "your-token-here"},
timeout=60
)Submit jobs for execution with comprehensive configuration options including run config, tags, op selection, and asset selection.
def submit_job_execution(
self,
job_name: str,
repository_location_name: Optional[str] = None,
repository_name: Optional[str] = None,
run_config: Optional[Union[RunConfig, Mapping[str, Any]]] = None,
tags: Optional[dict[str, Any]] = None,
op_selection: Optional[Sequence[str]] = None,
asset_selection: Optional[Sequence[CoercibleToAssetKey]] = None,
) -> str:
"""
Submit a job for execution with configuration.
Parameters:
- job_name (str): The job's name
- repository_location_name (Optional[str]): Repository location name
- repository_name (Optional[str]): Repository name
- run_config (Optional[Union[RunConfig, Mapping[str, Any]]]): Run configuration
- tags (Optional[dict[str, Any]]): Tags to add to the job execution
- op_selection (Optional[Sequence[str]]): List of ops to execute
- asset_selection (Optional[Sequence[CoercibleToAssetKey]]): List of asset keys to execute
Returns:
- str: Run ID of the submitted job
Raises:
- DagsterGraphQLClientError: For various execution errors including invalid steps,
configuration validation errors, job not found, or internal framework errors
"""Usage example:
# Basic job execution
run_id = client.submit_job_execution("my_job")
# Job execution with configuration and tags
run_id = client.submit_job_execution(
job_name="data_pipeline",
run_config={
"ops": {
"process_data": {
"config": {
"input_path": "/data/input.csv",
"output_path": "/data/output.parquet"
}
}
}
},
tags={
"environment": "production",
"team": "data-engineering"
}
)
# Execute specific ops within a job
run_id = client.submit_job_execution(
job_name="complex_pipeline",
op_selection=["extract_data", "transform_data"]
)Monitor and retrieve the status of pipeline runs to track execution progress and outcomes.
def get_run_status(self, run_id: str) -> DagsterRunStatus:
"""
Get the status of a given Pipeline Run.
Parameters:
- run_id (str): Run ID of the requested pipeline run
Returns:
- DagsterRunStatus: Status enum describing the state of the pipeline run
Raises:
- DagsterGraphQLClientError: If the run ID is not found or internal framework errors occur
"""Usage example:
from dagster import DagsterRunStatus
# Check run status
status = client.get_run_status(run_id)
if status == DagsterRunStatus.SUCCESS:
print("Job completed successfully")
elif status == DagsterRunStatus.FAILURE:
print("Job failed")
elif status == DagsterRunStatus.STARTED:
print("Job is currently running")Reload and manage repository locations to refresh metadata and apply configuration changes without restarting the Dagster server.
def reload_repository_location(
self, repository_location_name: str
) -> ReloadRepositoryLocationInfo:
"""
Reload a Dagster Repository Location and all its repositories.
Parameters:
- repository_location_name (str): The name of the repository location
Returns:
- ReloadRepositoryLocationInfo: Object with information about the reload result
"""
def shutdown_repository_location(
self, repository_location_name: str
) -> ShutdownRepositoryLocationInfo:
"""
Shut down the server serving metadata for the repository location.
Note: This method is deprecated and will be removed in version 2.0.
Parameters:
- repository_location_name (str): The name of the repository location
Returns:
- ShutdownRepositoryLocationInfo: Object with information about the shutdown result
"""Usage example:
# Reload repository location
reload_info = client.reload_repository_location("my_code_location")
if reload_info.status == ReloadRepositoryLocationStatus.SUCCESS:
print("Repository location reloaded successfully")
else:
print(f"Reload failed: {reload_info.failure_type} - {reload_info.message}")Stop running pipeline executions programmatically for both individual runs and batches of runs.
def terminate_run(self, run_id: str):
"""
Terminate a pipeline run.
Parameters:
- run_id (str): The run ID of the pipeline run to terminate
Raises:
- DagsterGraphQLClientError: If the run ID is not found or termination fails
"""
def terminate_runs(self, run_ids: list[str]):
"""
Terminate multiple pipeline runs.
Parameters:
- run_ids (list[str]): List of run IDs of the pipeline runs to terminate
Raises:
- DagsterGraphQLClientError: If some or all run terminations fail
"""Usage example:
# Terminate a single run
client.terminate_run(run_id)
# Terminate multiple runs
failed_run_ids = ["run_1", "run_2", "run_3"]
try:
client.terminate_runs(failed_run_ids)
print("All runs terminated successfully")
except DagsterGraphQLClientError as e:
print(f"Some terminations failed: {e}")The client raises DagsterGraphQLClientError for various error conditions:
Example error handling:
from dagster_graphql import DagsterGraphQLClientError
try:
run_id = client.submit_job_execution(
job_name="nonexistent_job",
run_config={"invalid": "config"}
)
except DagsterGraphQLClientError as e:
if "JobNotFoundError" in str(e):
print("Job does not exist")
elif "RunConfigValidationInvalid" in str(e):
print("Invalid run configuration")
else:
print(f"Execution failed: {e}")