Package for Spark Dagster framework components.
npx @tessl/cli install tessl/pypi-dagster-spark@0.27.0Integration library that enables Apache Spark job execution within the Dagster orchestration framework. It provides resources, operations, and configuration utilities for building data pipelines that leverage Spark's distributed computing capabilities while benefiting from Dagster's lineage tracking, scheduling, and monitoring features.
pip install dagster-sparkfrom dagster_spark import (
create_spark_op,
spark_resource,
define_spark_config,
SparkOpError,
construct_spark_shell_command,
__version__
)from dagster import job
from dagster_spark import create_spark_op, spark_resource
# Create a Spark operation
my_spark_op = create_spark_op(
name="calculate_pi",
main_class="org.apache.spark.examples.SparkPi"
)
# Define a job using the Spark resource
@job(resource_defs={"spark": spark_resource})
def spark_pipeline():
my_spark_op()
# Configuration for the job
config = {
"ops": {
"calculate_pi": {
"config": {
"master_url": "local[2]",
"deploy_mode": "client",
"application_jar": "/path/to/spark-examples.jar",
"application_arguments": "10",
"spark_conf": {
"spark": {
"app": {
"name": "calculate_pi"
}
}
}
}
}
}
}
# Execute the job
result = spark_pipeline.execute_in_process(run_config=config)Creates parameterized Spark operations that execute specific Spark applications with configurable parameters.
def create_spark_op(
name: str,
main_class: str,
description: str = None,
required_resource_keys: frozenset = frozenset(["spark"])
):
"""
Creates a Dagster op that executes a Spark job.
Parameters:
- name: Name of the operation
- main_class: Java/Scala main class to execute
- description: Optional description of the operation
- required_resource_keys: Resources required by the operation
Returns:
Dagster op function configured for Spark execution
"""Provides Spark job execution capabilities as a Dagster resource, handling spark-submit command construction and execution.
@resource
def spark_resource(context):
"""
Dagster resource providing Spark job execution capabilities.
Returns:
SparkResource instance with run_spark_job method
"""
class SparkResource:
def __init__(self, logger): ...
def run_spark_job(self, config: dict, main_class: str):
"""
Executes a Spark job with the given configuration.
Parameters:
- config: Configuration dictionary with Spark parameters
- main_class: Java/Scala main class to execute
Raises:
SparkOpError: If JAR file doesn't exist or job execution fails
"""Defines the configuration schema for Spark job parameters including cluster settings, JAR files, and Spark properties.
def define_spark_config():
"""
Returns Spark configuration schema with the following fields:
Returns:
Dictionary containing Field definitions for:
- master_url: Spark cluster master URL (required)
- deploy_mode: String value ("client" or "cluster")
- application_jar: Path to JAR file (required)
- spark_conf: Nested Spark configuration properties
- spark_home: Path to Spark installation
- application_arguments: Arguments for main class
"""Constructs spark-submit commands with proper parameter formatting and validation.
def construct_spark_shell_command(
application_jar: str,
main_class: str,
master_url: str = None,
spark_conf: dict = None,
deploy_mode: str = None,
application_arguments: str = None,
spark_home: str = None
):
"""
Constructs spark-submit command for Spark job execution.
Parameters:
- application_jar: Path to JAR file containing Spark application
- main_class: Java/Scala main class to execute
- master_url: Spark cluster master URL
- spark_conf: Dictionary of Spark configuration properties
- deploy_mode: Deployment mode ("client" or "cluster")
- application_arguments: Arguments passed to main class
- spark_home: Path to Spark installation directory
Returns:
List of command arguments for spark-submit execution
Raises:
SparkOpError: If SPARK_HOME is not set and spark_home not provided
"""class SparkOpError(Exception):
"""
Exception raised when Spark operations fail.
Raised when:
- Application JAR file doesn't exist
- SPARK_HOME environment variable not set
- Spark job execution returns non-zero exit code
"""__version__: str = "0.27.9"Spark operations expect configuration in the following structure:
{
"master_url": "local[2]", # Required: Spark master URL
"deploy_mode": "client", # Optional: "client" or "cluster"
"application_jar": "/path/to/app.jar", # Required: JAR file path
"application_arguments": "arg1 arg2", # Optional: arguments
"spark_home": "/opt/spark", # Optional: Spark installation path
"spark_conf": { # Optional: Spark configuration properties
"spark": {
"app": {
"name": "my_spark_app"
},
"driver": {
"memory": "2g",
"cores": 2
},
"executor": {
"memory": "4g",
"cores": 4,
"instances": 2
}
}
}
}The library raises SparkOpError exceptions in the following cases:
Common error handling pattern:
from dagster_spark import SparkOpError
try:
context.resources.spark.run_spark_job(config, main_class)
except SparkOpError as e:
context.log.error(f"Spark job failed: {e}")
raise