or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/pypi-dagster-spark

Package for Spark Dagster framework components.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/dagster-spark@0.27.x

To install, run

npx @tessl/cli install tessl/pypi-dagster-spark@0.27.0

index.mddocs/

Dagster Spark

Integration library that enables Apache Spark job execution within the Dagster orchestration framework. It provides resources, operations, and configuration utilities for building data pipelines that leverage Spark's distributed computing capabilities while benefiting from Dagster's lineage tracking, scheduling, and monitoring features.

Package Information

  • Package Name: dagster-spark
  • Language: Python
  • Installation: pip install dagster-spark
  • Dependencies: dagster==1.11.9

Core Imports

from dagster_spark import (
    create_spark_op,
    spark_resource,
    define_spark_config,
    SparkOpError,
    construct_spark_shell_command,
    __version__
)

Basic Usage

from dagster import job
from dagster_spark import create_spark_op, spark_resource

# Create a Spark operation
my_spark_op = create_spark_op(
    name="calculate_pi",
    main_class="org.apache.spark.examples.SparkPi"
)

# Define a job using the Spark resource
@job(resource_defs={"spark": spark_resource})
def spark_pipeline():
    my_spark_op()

# Configuration for the job
config = {
    "ops": {
        "calculate_pi": {
            "config": {
                "master_url": "local[2]",
                "deploy_mode": "client", 
                "application_jar": "/path/to/spark-examples.jar",
                "application_arguments": "10",
                "spark_conf": {
                    "spark": {
                        "app": {
                            "name": "calculate_pi"
                        }
                    }
                }
            }
        }
    }
}

# Execute the job
result = spark_pipeline.execute_in_process(run_config=config)

Capabilities

Spark Operation Factory

Creates parameterized Spark operations that execute specific Spark applications with configurable parameters.

def create_spark_op(
    name: str,
    main_class: str, 
    description: str = None,
    required_resource_keys: frozenset = frozenset(["spark"])
):
    """
    Creates a Dagster op that executes a Spark job.
    
    Parameters:
    - name: Name of the operation
    - main_class: Java/Scala main class to execute
    - description: Optional description of the operation
    - required_resource_keys: Resources required by the operation
    
    Returns:
    Dagster op function configured for Spark execution
    """

Spark Resource

Provides Spark job execution capabilities as a Dagster resource, handling spark-submit command construction and execution.

@resource
def spark_resource(context):
    """
    Dagster resource providing Spark job execution capabilities.
    
    Returns:
    SparkResource instance with run_spark_job method
    """

class SparkResource:
    def __init__(self, logger): ...
    
    def run_spark_job(self, config: dict, main_class: str):
        """
        Executes a Spark job with the given configuration.
        
        Parameters:
        - config: Configuration dictionary with Spark parameters
        - main_class: Java/Scala main class to execute
        
        Raises:
        SparkOpError: If JAR file doesn't exist or job execution fails
        """

Configuration Schema

Defines the configuration schema for Spark job parameters including cluster settings, JAR files, and Spark properties.

def define_spark_config():
    """
    Returns Spark configuration schema with the following fields:
    
    Returns:
    Dictionary containing Field definitions for:
    - master_url: Spark cluster master URL (required)
    - deploy_mode: String value ("client" or "cluster")
    - application_jar: Path to JAR file (required)
    - spark_conf: Nested Spark configuration properties
    - spark_home: Path to Spark installation
    - application_arguments: Arguments for main class
    """

Command Construction Utility

Constructs spark-submit commands with proper parameter formatting and validation.

def construct_spark_shell_command(
    application_jar: str,
    main_class: str,
    master_url: str = None,
    spark_conf: dict = None,
    deploy_mode: str = None, 
    application_arguments: str = None,
    spark_home: str = None
):
    """
    Constructs spark-submit command for Spark job execution.
    
    Parameters:
    - application_jar: Path to JAR file containing Spark application
    - main_class: Java/Scala main class to execute
    - master_url: Spark cluster master URL
    - spark_conf: Dictionary of Spark configuration properties
    - deploy_mode: Deployment mode ("client" or "cluster")
    - application_arguments: Arguments passed to main class
    - spark_home: Path to Spark installation directory
    
    Returns:
    List of command arguments for spark-submit execution
    
    Raises:
    SparkOpError: If SPARK_HOME is not set and spark_home not provided
    """

Types

Exception Types

class SparkOpError(Exception):
    """
    Exception raised when Spark operations fail.
    
    Raised when:
    - Application JAR file doesn't exist
    - SPARK_HOME environment variable not set
    - Spark job execution returns non-zero exit code
    """

Package Version

__version__: str = "0.27.9"

Configuration Structure

Spark operations expect configuration in the following structure:

{
    "master_url": "local[2]",  # Required: Spark master URL
    "deploy_mode": "client",   # Optional: "client" or "cluster"
    "application_jar": "/path/to/app.jar",  # Required: JAR file path
    "application_arguments": "arg1 arg2",   # Optional: arguments
    "spark_home": "/opt/spark",  # Optional: Spark installation path
    "spark_conf": {  # Optional: Spark configuration properties
        "spark": {
            "app": {
                "name": "my_spark_app"
            },
            "driver": {
                "memory": "2g",
                "cores": 2
            },
            "executor": {
                "memory": "4g", 
                "cores": 4,
                "instances": 2
            }
        }
    }
}

Environment Requirements

  • SPARK_HOME: Environment variable pointing to Spark installation directory (or provide spark_home in config)
  • Java Runtime: Java 8+ runtime environment
  • Spark Installation: Apache Spark installation accessible from execution environment
  • Application JAR: Valid JAR file containing Spark application and dependencies

Error Handling

The library raises SparkOpError exceptions in the following cases:

  • Application JAR file specified in configuration doesn't exist
  • SPARK_HOME environment variable not set and spark_home not provided in config
  • Spark job execution fails (non-zero exit code from spark-submit)

Common error handling pattern:

from dagster_spark import SparkOpError

try:
    context.resources.spark.run_spark_job(config, main_class)
except SparkOpError as e:
    context.log.error(f"Spark job failed: {e}")
    raise