or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/pypi-dagster-spark

Package for Spark Dagster framework components.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/dagster-spark@0.27.x

To install, run

npx @tessl/cli install tessl/pypi-dagster-spark@0.27.0

0

# Dagster Spark

1

2

Integration library that enables Apache Spark job execution within the Dagster orchestration framework. It provides resources, operations, and configuration utilities for building data pipelines that leverage Spark's distributed computing capabilities while benefiting from Dagster's lineage tracking, scheduling, and monitoring features.

3

4

## Package Information

5

6

- **Package Name**: dagster-spark

7

- **Language**: Python

8

- **Installation**: `pip install dagster-spark`

9

- **Dependencies**: dagster==1.11.9

10

11

## Core Imports

12

13

```python

14

from dagster_spark import (

15

create_spark_op,

16

spark_resource,

17

define_spark_config,

18

SparkOpError,

19

construct_spark_shell_command,

20

__version__

21

)

22

```

23

24

## Basic Usage

25

26

```python

27

from dagster import job

28

from dagster_spark import create_spark_op, spark_resource

29

30

# Create a Spark operation

31

my_spark_op = create_spark_op(

32

name="calculate_pi",

33

main_class="org.apache.spark.examples.SparkPi"

34

)

35

36

# Define a job using the Spark resource

37

@job(resource_defs={"spark": spark_resource})

38

def spark_pipeline():

39

my_spark_op()

40

41

# Configuration for the job

42

config = {

43

"ops": {

44

"calculate_pi": {

45

"config": {

46

"master_url": "local[2]",

47

"deploy_mode": "client",

48

"application_jar": "/path/to/spark-examples.jar",

49

"application_arguments": "10",

50

"spark_conf": {

51

"spark": {

52

"app": {

53

"name": "calculate_pi"

54

}

55

}

56

}

57

}

58

}

59

}

60

}

61

62

# Execute the job

63

result = spark_pipeline.execute_in_process(run_config=config)

64

```

65

66

## Capabilities

67

68

### Spark Operation Factory

69

70

Creates parameterized Spark operations that execute specific Spark applications with configurable parameters.

71

72

```python { .api }

73

def create_spark_op(

74

name: str,

75

main_class: str,

76

description: str = None,

77

required_resource_keys: frozenset = frozenset(["spark"])

78

):

79

"""

80

Creates a Dagster op that executes a Spark job.

81

82

Parameters:

83

- name: Name of the operation

84

- main_class: Java/Scala main class to execute

85

- description: Optional description of the operation

86

- required_resource_keys: Resources required by the operation

87

88

Returns:

89

Dagster op function configured for Spark execution

90

"""

91

```

92

93

### Spark Resource

94

95

Provides Spark job execution capabilities as a Dagster resource, handling spark-submit command construction and execution.

96

97

```python { .api }

98

@resource

99

def spark_resource(context):

100

"""

101

Dagster resource providing Spark job execution capabilities.

102

103

Returns:

104

SparkResource instance with run_spark_job method

105

"""

106

107

class SparkResource:

108

def __init__(self, logger): ...

109

110

def run_spark_job(self, config: dict, main_class: str):

111

"""

112

Executes a Spark job with the given configuration.

113

114

Parameters:

115

- config: Configuration dictionary with Spark parameters

116

- main_class: Java/Scala main class to execute

117

118

Raises:

119

SparkOpError: If JAR file doesn't exist or job execution fails

120

"""

121

```

122

123

### Configuration Schema

124

125

Defines the configuration schema for Spark job parameters including cluster settings, JAR files, and Spark properties.

126

127

```python { .api }

128

def define_spark_config():

129

"""

130

Returns Spark configuration schema with the following fields:

131

132

Returns:

133

Dictionary containing Field definitions for:

134

- master_url: Spark cluster master URL (required)

135

- deploy_mode: String value ("client" or "cluster")

136

- application_jar: Path to JAR file (required)

137

- spark_conf: Nested Spark configuration properties

138

- spark_home: Path to Spark installation

139

- application_arguments: Arguments for main class

140

"""

141

```

142

143

### Command Construction Utility

144

145

Constructs spark-submit commands with proper parameter formatting and validation.

146

147

```python { .api }

148

def construct_spark_shell_command(

149

application_jar: str,

150

main_class: str,

151

master_url: str = None,

152

spark_conf: dict = None,

153

deploy_mode: str = None,

154

application_arguments: str = None,

155

spark_home: str = None

156

):

157

"""

158

Constructs spark-submit command for Spark job execution.

159

160

Parameters:

161

- application_jar: Path to JAR file containing Spark application

162

- main_class: Java/Scala main class to execute

163

- master_url: Spark cluster master URL

164

- spark_conf: Dictionary of Spark configuration properties

165

- deploy_mode: Deployment mode ("client" or "cluster")

166

- application_arguments: Arguments passed to main class

167

- spark_home: Path to Spark installation directory

168

169

Returns:

170

List of command arguments for spark-submit execution

171

172

Raises:

173

SparkOpError: If SPARK_HOME is not set and spark_home not provided

174

"""

175

```

176

177

## Types

178

179

### Exception Types

180

181

```python { .api }

182

class SparkOpError(Exception):

183

"""

184

Exception raised when Spark operations fail.

185

186

Raised when:

187

- Application JAR file doesn't exist

188

- SPARK_HOME environment variable not set

189

- Spark job execution returns non-zero exit code

190

"""

191

```

192

193

### Package Version

194

195

```python { .api }

196

__version__: str = "0.27.9"

197

```

198

199

## Configuration Structure

200

201

Spark operations expect configuration in the following structure:

202

203

```python

204

{

205

"master_url": "local[2]", # Required: Spark master URL

206

"deploy_mode": "client", # Optional: "client" or "cluster"

207

"application_jar": "/path/to/app.jar", # Required: JAR file path

208

"application_arguments": "arg1 arg2", # Optional: arguments

209

"spark_home": "/opt/spark", # Optional: Spark installation path

210

"spark_conf": { # Optional: Spark configuration properties

211

"spark": {

212

"app": {

213

"name": "my_spark_app"

214

},

215

"driver": {

216

"memory": "2g",

217

"cores": 2

218

},

219

"executor": {

220

"memory": "4g",

221

"cores": 4,

222

"instances": 2

223

}

224

}

225

}

226

}

227

```

228

229

## Environment Requirements

230

231

- **SPARK_HOME**: Environment variable pointing to Spark installation directory (or provide spark_home in config)

232

- **Java Runtime**: Java 8+ runtime environment

233

- **Spark Installation**: Apache Spark installation accessible from execution environment

234

- **Application JAR**: Valid JAR file containing Spark application and dependencies

235

236

## Error Handling

237

238

The library raises `SparkOpError` exceptions in the following cases:

239

240

- Application JAR file specified in configuration doesn't exist

241

- SPARK_HOME environment variable not set and spark_home not provided in config

242

- Spark job execution fails (non-zero exit code from spark-submit)

243

244

Common error handling pattern:

245

246

```python

247

from dagster_spark import SparkOpError

248

249

try:

250

context.resources.spark.run_spark_job(config, main_class)

251

except SparkOpError as e:

252

context.log.error(f"Spark job failed: {e}")

253

raise

254

```