0
# Dagster PySpark
1
2
Dagster-PySpark is a specialized library that enables seamless integration of Apache PySpark with Dagster's data orchestration framework. It provides essential resources for configuring and managing Spark sessions within Dagster operations and jobs, supporting both modern resource-based approaches and legacy configurations.
3
4
## Package Information
5
6
- **Package Name**: dagster-pyspark
7
- **Language**: Python
8
- **Installation**: `pip install dagster-pyspark`
9
10
## Core Imports
11
12
```python
13
from dagster_pyspark import (
14
PySparkResource,
15
LazyPySparkResource,
16
pyspark_resource,
17
lazy_pyspark_resource,
18
DataFrame
19
)
20
from dagster import InitResourceContext
21
```
22
23
## Basic Usage
24
25
### Modern Resource Approach (Recommended)
26
27
```python
28
from dagster import op, job
29
from dagster_pyspark import PySparkResource
30
31
@op
32
def my_spark_op(pyspark: PySparkResource):
33
spark_session = pyspark.spark_session
34
df = spark_session.read.json("path/to/data.json")
35
df.show()
36
return df
37
38
@job(
39
resource_defs={
40
"pyspark": PySparkResource(
41
spark_config={
42
"spark.executor.memory": "2g",
43
"spark.executor.cores": "2"
44
}
45
)
46
}
47
)
48
def my_spark_job():
49
my_spark_op()
50
```
51
52
### Legacy Resource Approach
53
54
```python
55
from dagster import op, job
56
from dagster_pyspark import pyspark_resource
57
58
@op(required_resource_keys={"pyspark"})
59
def my_spark_op(context):
60
spark_session = context.resources.pyspark.spark_session
61
df = spark_session.read.json("path/to/data.json")
62
df.show()
63
return df
64
65
my_pyspark_resource = pyspark_resource.configured({
66
"spark_conf": {"spark.executor.memory": "2g"}
67
})
68
69
@job(resource_defs={"pyspark": my_pyspark_resource})
70
def my_spark_job():
71
my_spark_op()
72
```
73
74
## Architecture
75
76
Dagster-PySpark follows a resource-based architecture:
77
78
- **Resource Classes**: Modern `PySparkResource` and `LazyPySparkResource` providing direct Spark session access
79
- **Legacy Functions**: `pyspark_resource` and `lazy_pyspark_resource` for backwards compatibility
80
- **Type System**: `DataFrame` type with comprehensive loading capabilities for multiple data formats
81
- **Session Management**: Automatic Spark session lifecycle management within Dagster execution context
82
83
The lazy variants avoid Spark session creation overhead until the session is actually accessed, improving performance for jobs that may not always need Spark resources.
84
85
## Capabilities
86
87
### PySpark Resource Management
88
89
Modern ConfigurableResource classes for managing PySpark sessions with flexible configuration and lifecycle management.
90
91
```python { .api }
92
class PySparkResource(ConfigurableResource):
93
spark_config: dict[str, Any]
94
95
def setup_for_execution(self, context: InitResourceContext) -> None: ...
96
97
@property
98
def spark_session(self) -> Any: ...
99
100
@property
101
def spark_context(self) -> Any: ...
102
103
class LazyPySparkResource(ConfigurableResource):
104
spark_config: dict[str, Any]
105
106
@property
107
def spark_session(self) -> Any: ...
108
109
@property
110
def spark_context(self) -> Any: ...
111
```
112
113
### Legacy Resource Functions
114
115
Legacy resource factory functions for backwards compatibility with existing Dagster codebases.
116
117
```python { .api }
118
from dagster import resource
119
from dagster_spark.configs_spark import spark_config
120
121
@resource({"spark_conf": spark_config()})
122
def pyspark_resource(init_context) -> PySparkResource: ...
123
124
@resource({"spark_conf": spark_config()})
125
def lazy_pyspark_resource(init_context: InitResourceContext) -> LazyPySparkResource: ...
126
```
127
128
### DataFrame Type and Data Loading
129
130
Comprehensive data loading capabilities with support for multiple file formats, database connections, and extensive configuration options.
131
132
```python { .api }
133
DataFrame = PythonObjectDagsterType(
134
python_type=pyspark.sql.DataFrame,
135
name="PySparkDataFrame",
136
description="A PySpark data frame.",
137
loader=dataframe_loader
138
)
139
```
140
141
[Data Loading and DataFrame Operations](./data-loading.md)
142
143
## Types
144
145
### Core Types
146
147
```python { .api }
148
class PySparkResource(ConfigurableResource):
149
"""Resource providing access to a PySpark Session for executing PySpark code within Dagster."""
150
spark_config: dict[str, Any]
151
152
def setup_for_execution(self, context: InitResourceContext) -> None: ...
153
154
class LazyPySparkResource(ConfigurableResource):
155
"""Lazily-created PySpark resource that avoids session creation until accessed."""
156
spark_config: dict[str, Any]
157
```