or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-loading.mdindex.md

index.mddocs/

0

# Dagster PySpark

1

2

Dagster-PySpark is a specialized library that enables seamless integration of Apache PySpark with Dagster's data orchestration framework. It provides essential resources for configuring and managing Spark sessions within Dagster operations and jobs, supporting both modern resource-based approaches and legacy configurations.

3

4

## Package Information

5

6

- **Package Name**: dagster-pyspark

7

- **Language**: Python

8

- **Installation**: `pip install dagster-pyspark`

9

10

## Core Imports

11

12

```python

13

from dagster_pyspark import (

14

PySparkResource,

15

LazyPySparkResource,

16

pyspark_resource,

17

lazy_pyspark_resource,

18

DataFrame

19

)

20

from dagster import InitResourceContext

21

```

22

23

## Basic Usage

24

25

### Modern Resource Approach (Recommended)

26

27

```python

28

from dagster import op, job

29

from dagster_pyspark import PySparkResource

30

31

@op

32

def my_spark_op(pyspark: PySparkResource):

33

spark_session = pyspark.spark_session

34

df = spark_session.read.json("path/to/data.json")

35

df.show()

36

return df

37

38

@job(

39

resource_defs={

40

"pyspark": PySparkResource(

41

spark_config={

42

"spark.executor.memory": "2g",

43

"spark.executor.cores": "2"

44

}

45

)

46

}

47

)

48

def my_spark_job():

49

my_spark_op()

50

```

51

52

### Legacy Resource Approach

53

54

```python

55

from dagster import op, job

56

from dagster_pyspark import pyspark_resource

57

58

@op(required_resource_keys={"pyspark"})

59

def my_spark_op(context):

60

spark_session = context.resources.pyspark.spark_session

61

df = spark_session.read.json("path/to/data.json")

62

df.show()

63

return df

64

65

my_pyspark_resource = pyspark_resource.configured({

66

"spark_conf": {"spark.executor.memory": "2g"}

67

})

68

69

@job(resource_defs={"pyspark": my_pyspark_resource})

70

def my_spark_job():

71

my_spark_op()

72

```

73

74

## Architecture

75

76

Dagster-PySpark follows a resource-based architecture:

77

78

- **Resource Classes**: Modern `PySparkResource` and `LazyPySparkResource` providing direct Spark session access

79

- **Legacy Functions**: `pyspark_resource` and `lazy_pyspark_resource` for backwards compatibility

80

- **Type System**: `DataFrame` type with comprehensive loading capabilities for multiple data formats

81

- **Session Management**: Automatic Spark session lifecycle management within Dagster execution context

82

83

The lazy variants avoid Spark session creation overhead until the session is actually accessed, improving performance for jobs that may not always need Spark resources.

84

85

## Capabilities

86

87

### PySpark Resource Management

88

89

Modern ConfigurableResource classes for managing PySpark sessions with flexible configuration and lifecycle management.

90

91

```python { .api }

92

class PySparkResource(ConfigurableResource):

93

spark_config: dict[str, Any]

94

95

def setup_for_execution(self, context: InitResourceContext) -> None: ...

96

97

@property

98

def spark_session(self) -> Any: ...

99

100

@property

101

def spark_context(self) -> Any: ...

102

103

class LazyPySparkResource(ConfigurableResource):

104

spark_config: dict[str, Any]

105

106

@property

107

def spark_session(self) -> Any: ...

108

109

@property

110

def spark_context(self) -> Any: ...

111

```

112

113

### Legacy Resource Functions

114

115

Legacy resource factory functions for backwards compatibility with existing Dagster codebases.

116

117

```python { .api }

118

from dagster import resource

119

from dagster_spark.configs_spark import spark_config

120

121

@resource({"spark_conf": spark_config()})

122

def pyspark_resource(init_context) -> PySparkResource: ...

123

124

@resource({"spark_conf": spark_config()})

125

def lazy_pyspark_resource(init_context: InitResourceContext) -> LazyPySparkResource: ...

126

```

127

128

### DataFrame Type and Data Loading

129

130

Comprehensive data loading capabilities with support for multiple file formats, database connections, and extensive configuration options.

131

132

```python { .api }

133

DataFrame = PythonObjectDagsterType(

134

python_type=pyspark.sql.DataFrame,

135

name="PySparkDataFrame",

136

description="A PySpark data frame.",

137

loader=dataframe_loader

138

)

139

```

140

141

[Data Loading and DataFrame Operations](./data-loading.md)

142

143

## Types

144

145

### Core Types

146

147

```python { .api }

148

class PySparkResource(ConfigurableResource):

149

"""Resource providing access to a PySpark Session for executing PySpark code within Dagster."""

150

spark_config: dict[str, Any]

151

152

def setup_for_execution(self, context: InitResourceContext) -> None: ...

153

154

class LazyPySparkResource(ConfigurableResource):

155

"""Lazily-created PySpark resource that avoids session creation until accessed."""

156

spark_config: dict[str, Any]

157

```