Package for Spark Dagster framework components.
npx @tessl/cli install tessl/pypi-dagster-spark@0.27.00
# Dagster Spark
1
2
Integration library that enables Apache Spark job execution within the Dagster orchestration framework. It provides resources, operations, and configuration utilities for building data pipelines that leverage Spark's distributed computing capabilities while benefiting from Dagster's lineage tracking, scheduling, and monitoring features.
3
4
## Package Information
5
6
- **Package Name**: dagster-spark
7
- **Language**: Python
8
- **Installation**: `pip install dagster-spark`
9
- **Dependencies**: dagster==1.11.9
10
11
## Core Imports
12
13
```python
14
from dagster_spark import (
15
create_spark_op,
16
spark_resource,
17
define_spark_config,
18
SparkOpError,
19
construct_spark_shell_command,
20
__version__
21
)
22
```
23
24
## Basic Usage
25
26
```python
27
from dagster import job
28
from dagster_spark import create_spark_op, spark_resource
29
30
# Create a Spark operation
31
my_spark_op = create_spark_op(
32
name="calculate_pi",
33
main_class="org.apache.spark.examples.SparkPi"
34
)
35
36
# Define a job using the Spark resource
37
@job(resource_defs={"spark": spark_resource})
38
def spark_pipeline():
39
my_spark_op()
40
41
# Configuration for the job
42
config = {
43
"ops": {
44
"calculate_pi": {
45
"config": {
46
"master_url": "local[2]",
47
"deploy_mode": "client",
48
"application_jar": "/path/to/spark-examples.jar",
49
"application_arguments": "10",
50
"spark_conf": {
51
"spark": {
52
"app": {
53
"name": "calculate_pi"
54
}
55
}
56
}
57
}
58
}
59
}
60
}
61
62
# Execute the job
63
result = spark_pipeline.execute_in_process(run_config=config)
64
```
65
66
## Capabilities
67
68
### Spark Operation Factory
69
70
Creates parameterized Spark operations that execute specific Spark applications with configurable parameters.
71
72
```python { .api }
73
def create_spark_op(
74
name: str,
75
main_class: str,
76
description: str = None,
77
required_resource_keys: frozenset = frozenset(["spark"])
78
):
79
"""
80
Creates a Dagster op that executes a Spark job.
81
82
Parameters:
83
- name: Name of the operation
84
- main_class: Java/Scala main class to execute
85
- description: Optional description of the operation
86
- required_resource_keys: Resources required by the operation
87
88
Returns:
89
Dagster op function configured for Spark execution
90
"""
91
```
92
93
### Spark Resource
94
95
Provides Spark job execution capabilities as a Dagster resource, handling spark-submit command construction and execution.
96
97
```python { .api }
98
@resource
99
def spark_resource(context):
100
"""
101
Dagster resource providing Spark job execution capabilities.
102
103
Returns:
104
SparkResource instance with run_spark_job method
105
"""
106
107
class SparkResource:
108
def __init__(self, logger): ...
109
110
def run_spark_job(self, config: dict, main_class: str):
111
"""
112
Executes a Spark job with the given configuration.
113
114
Parameters:
115
- config: Configuration dictionary with Spark parameters
116
- main_class: Java/Scala main class to execute
117
118
Raises:
119
SparkOpError: If JAR file doesn't exist or job execution fails
120
"""
121
```
122
123
### Configuration Schema
124
125
Defines the configuration schema for Spark job parameters including cluster settings, JAR files, and Spark properties.
126
127
```python { .api }
128
def define_spark_config():
129
"""
130
Returns Spark configuration schema with the following fields:
131
132
Returns:
133
Dictionary containing Field definitions for:
134
- master_url: Spark cluster master URL (required)
135
- deploy_mode: String value ("client" or "cluster")
136
- application_jar: Path to JAR file (required)
137
- spark_conf: Nested Spark configuration properties
138
- spark_home: Path to Spark installation
139
- application_arguments: Arguments for main class
140
"""
141
```
142
143
### Command Construction Utility
144
145
Constructs spark-submit commands with proper parameter formatting and validation.
146
147
```python { .api }
148
def construct_spark_shell_command(
149
application_jar: str,
150
main_class: str,
151
master_url: str = None,
152
spark_conf: dict = None,
153
deploy_mode: str = None,
154
application_arguments: str = None,
155
spark_home: str = None
156
):
157
"""
158
Constructs spark-submit command for Spark job execution.
159
160
Parameters:
161
- application_jar: Path to JAR file containing Spark application
162
- main_class: Java/Scala main class to execute
163
- master_url: Spark cluster master URL
164
- spark_conf: Dictionary of Spark configuration properties
165
- deploy_mode: Deployment mode ("client" or "cluster")
166
- application_arguments: Arguments passed to main class
167
- spark_home: Path to Spark installation directory
168
169
Returns:
170
List of command arguments for spark-submit execution
171
172
Raises:
173
SparkOpError: If SPARK_HOME is not set and spark_home not provided
174
"""
175
```
176
177
## Types
178
179
### Exception Types
180
181
```python { .api }
182
class SparkOpError(Exception):
183
"""
184
Exception raised when Spark operations fail.
185
186
Raised when:
187
- Application JAR file doesn't exist
188
- SPARK_HOME environment variable not set
189
- Spark job execution returns non-zero exit code
190
"""
191
```
192
193
### Package Version
194
195
```python { .api }
196
__version__: str = "0.27.9"
197
```
198
199
## Configuration Structure
200
201
Spark operations expect configuration in the following structure:
202
203
```python
204
{
205
"master_url": "local[2]", # Required: Spark master URL
206
"deploy_mode": "client", # Optional: "client" or "cluster"
207
"application_jar": "/path/to/app.jar", # Required: JAR file path
208
"application_arguments": "arg1 arg2", # Optional: arguments
209
"spark_home": "/opt/spark", # Optional: Spark installation path
210
"spark_conf": { # Optional: Spark configuration properties
211
"spark": {
212
"app": {
213
"name": "my_spark_app"
214
},
215
"driver": {
216
"memory": "2g",
217
"cores": 2
218
},
219
"executor": {
220
"memory": "4g",
221
"cores": 4,
222
"instances": 2
223
}
224
}
225
}
226
}
227
```
228
229
## Environment Requirements
230
231
- **SPARK_HOME**: Environment variable pointing to Spark installation directory (or provide spark_home in config)
232
- **Java Runtime**: Java 8+ runtime environment
233
- **Spark Installation**: Apache Spark installation accessible from execution environment
234
- **Application JAR**: Valid JAR file containing Spark application and dependencies
235
236
## Error Handling
237
238
The library raises `SparkOpError` exceptions in the following cases:
239
240
- Application JAR file specified in configuration doesn't exist
241
- SPARK_HOME environment variable not set and spark_home not provided in config
242
- Spark job execution fails (non-zero exit code from spark-submit)
243
244
Common error handling pattern:
245
246
```python
247
from dagster_spark import SparkOpError
248
249
try:
250
context.resources.spark.run_spark_job(config, main_class)
251
except SparkOpError as e:
252
context.log.error(f"Spark job failed: {e}")
253
raise
254
```