CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dagster-graphql

The GraphQL frontend to python dagster providing programmatic interaction with Dagster runs, repositories, and jobs through a comprehensive GraphQL API.

Pending
Overview
Eval results
Files

client.mddocs/

GraphQL Client

The DagsterGraphQLClient provides a high-level Python interface for interacting with Dagster's GraphQL API. It handles connection management, authentication, error handling, and provides convenient methods for the most common Dagster operations.

Capabilities

Client Initialization

Create a connection to a Dagster GraphQL server with various configuration options including authentication, timeouts, and custom transport protocols.

class DagsterGraphQLClient:
    def __init__(
        self,
        hostname: str,
        port_number: Optional[int] = None,
        transport: Optional[Transport] = None,
        use_https: bool = False,
        timeout: int = 300,
        headers: Optional[dict[str, str]] = None,
        auth: Optional[AuthBase] = None,
    ):
        """
        Initialize GraphQL client for Dagster API.

        Parameters:
        - hostname (str): Hostname for the Dagster GraphQL API
        - port_number (Optional[int]): Port number to connect to on the host
        - transport (Optional[Transport]): Custom transport for connection
        - use_https (bool): Whether to use https in the URL connection string
        - timeout (int): Number of seconds before requests should time out
        - headers (Optional[dict[str, str]]): Additional headers to include in requests
        - auth (Optional[AuthBase]): Authentication handler for requests

        Raises:
        - DagsterGraphQLClientError: If connection to the host fails
        """

Usage example:

# Basic connection
client = DagsterGraphQLClient("localhost", port_number=3000)

# HTTPS connection with authentication
client = DagsterGraphQLClient(
    "my-dagster.example.com",
    use_https=True,
    headers={"Dagster-Cloud-Api-Token": "your-token-here"},
    timeout=60
)

Job Execution

Submit jobs for execution with comprehensive configuration options including run config, tags, op selection, and asset selection.

def submit_job_execution(
    self,
    job_name: str,
    repository_location_name: Optional[str] = None,
    repository_name: Optional[str] = None,
    run_config: Optional[Union[RunConfig, Mapping[str, Any]]] = None,
    tags: Optional[dict[str, Any]] = None,
    op_selection: Optional[Sequence[str]] = None,
    asset_selection: Optional[Sequence[CoercibleToAssetKey]] = None,
) -> str:
    """
    Submit a job for execution with configuration.

    Parameters:
    - job_name (str): The job's name
    - repository_location_name (Optional[str]): Repository location name
    - repository_name (Optional[str]): Repository name
    - run_config (Optional[Union[RunConfig, Mapping[str, Any]]]): Run configuration
    - tags (Optional[dict[str, Any]]): Tags to add to the job execution
    - op_selection (Optional[Sequence[str]]): List of ops to execute
    - asset_selection (Optional[Sequence[CoercibleToAssetKey]]): List of asset keys to execute

    Returns:
    - str: Run ID of the submitted job

    Raises:
    - DagsterGraphQLClientError: For various execution errors including invalid steps, 
      configuration validation errors, job not found, or internal framework errors
    """

Usage example:

# Basic job execution
run_id = client.submit_job_execution("my_job")

# Job execution with configuration and tags
run_id = client.submit_job_execution(
    job_name="data_pipeline",
    run_config={
        "ops": {
            "process_data": {
                "config": {
                    "input_path": "/data/input.csv",
                    "output_path": "/data/output.parquet"
                }
            }
        }
    },
    tags={
        "environment": "production",
        "team": "data-engineering"
    }
)

# Execute specific ops within a job
run_id = client.submit_job_execution(
    job_name="complex_pipeline",
    op_selection=["extract_data", "transform_data"]
)

Run Status Management

Monitor and retrieve the status of pipeline runs to track execution progress and outcomes.

def get_run_status(self, run_id: str) -> DagsterRunStatus:
    """
    Get the status of a given Pipeline Run.

    Parameters:
    - run_id (str): Run ID of the requested pipeline run

    Returns:
    - DagsterRunStatus: Status enum describing the state of the pipeline run

    Raises:
    - DagsterGraphQLClientError: If the run ID is not found or internal framework errors occur
    """

Usage example:

from dagster import DagsterRunStatus

# Check run status
status = client.get_run_status(run_id)

if status == DagsterRunStatus.SUCCESS:
    print("Job completed successfully")
elif status == DagsterRunStatus.FAILURE:
    print("Job failed")
elif status == DagsterRunStatus.STARTED:
    print("Job is currently running")

Repository Location Management

Reload and manage repository locations to refresh metadata and apply configuration changes without restarting the Dagster server.

def reload_repository_location(
    self, repository_location_name: str
) -> ReloadRepositoryLocationInfo:
    """
    Reload a Dagster Repository Location and all its repositories.

    Parameters:
    - repository_location_name (str): The name of the repository location

    Returns:
    - ReloadRepositoryLocationInfo: Object with information about the reload result
    """

def shutdown_repository_location(
    self, repository_location_name: str
) -> ShutdownRepositoryLocationInfo:
    """
    Shut down the server serving metadata for the repository location.
    
    Note: This method is deprecated and will be removed in version 2.0.

    Parameters:
    - repository_location_name (str): The name of the repository location

    Returns:
    - ShutdownRepositoryLocationInfo: Object with information about the shutdown result
    """

Usage example:

# Reload repository location
reload_info = client.reload_repository_location("my_code_location")

if reload_info.status == ReloadRepositoryLocationStatus.SUCCESS:
    print("Repository location reloaded successfully")
else:
    print(f"Reload failed: {reload_info.failure_type} - {reload_info.message}")

Run Termination

Stop running pipeline executions programmatically for both individual runs and batches of runs.

def terminate_run(self, run_id: str):
    """
    Terminate a pipeline run.

    Parameters:
    - run_id (str): The run ID of the pipeline run to terminate

    Raises:
    - DagsterGraphQLClientError: If the run ID is not found or termination fails
    """

def terminate_runs(self, run_ids: list[str]):
    """
    Terminate multiple pipeline runs.

    Parameters:
    - run_ids (list[str]): List of run IDs of the pipeline runs to terminate

    Raises:
    - DagsterGraphQLClientError: If some or all run terminations fail
    """

Usage example:

# Terminate a single run
client.terminate_run(run_id)

# Terminate multiple runs
failed_run_ids = ["run_1", "run_2", "run_3"]
try:
    client.terminate_runs(failed_run_ids)
    print("All runs terminated successfully")
except DagsterGraphQLClientError as e:
    print(f"Some terminations failed: {e}")

Error Handling

The client raises DagsterGraphQLClientError for various error conditions:

  • Connection errors: When unable to connect to the GraphQL server
  • Invalid step errors: When a job has invalid steps
  • Configuration validation errors: When run config doesn't match job requirements
  • Job not found errors: When the specified job doesn't exist
  • Run conflicts: When conflicting job runs exist in storage
  • Python errors: For internal framework errors

Example error handling:

from dagster_graphql import DagsterGraphQLClientError

try:
    run_id = client.submit_job_execution(
        job_name="nonexistent_job",
        run_config={"invalid": "config"}
    )
except DagsterGraphQLClientError as e:
    if "JobNotFoundError" in str(e):
        print("Job does not exist")
    elif "RunConfigValidationInvalid" in str(e):
        print("Invalid run configuration")
    else:
        print(f"Execution failed: {e}")

Install with Tessl CLI

npx tessl i tessl/pypi-dagster-graphql

docs

cli.md

client.md

index.md

schema.md

test-utilities.md

tile.json