or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/pypi-apache-airflow-providers-apache-spark

Provider package for Apache Spark integration with Apache Airflow, offering operators, hooks, sensors, and decorators for distributed data processing workflows.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-providers-apache-spark@5.3.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-providers-apache-spark@5.3.0

0

# Apache Airflow Providers Apache Spark

1

2

A comprehensive provider package that enables seamless integration between Apache Airflow and Apache Spark distributed computing framework. This package provides operators, hooks, and decorators for orchestrating Spark jobs within Airflow workflows, supporting multiple Spark deployment modes including Spark Submit, Spark SQL, Spark JDBC operations, and the modern Spark Connect protocol.

3

4

## Package Information

5

6

- **Package Name**: apache-airflow-providers-apache-spark

7

- **Language**: Python

8

- **Installation**: `pip install apache-airflow-providers-apache-spark`

9

- **Requires**: Apache Airflow >= 2.10.0, PySpark >= 3.5.2, grpcio-status >= 1.59.0

10

11

## Core Imports

12

13

```python

14

# Operators for executing Spark jobs

15

from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

16

from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator

17

from airflow.providers.apache.spark.operators.spark_jdbc import SparkJDBCOperator

18

19

# Hooks for Spark connections

20

from airflow.providers.apache.spark.hooks.spark_submit import SparkSubmitHook

21

from airflow.providers.apache.spark.hooks.spark_sql import SparkSqlHook

22

from airflow.providers.apache.spark.hooks.spark_jdbc import SparkJDBCHook

23

from airflow.providers.apache.spark.hooks.spark_connect import SparkConnectHook

24

25

# Task decorator for PySpark functions

26

from airflow.providers.apache.spark.decorators.pyspark import pyspark_task

27

```

28

29

## Basic Usage

30

31

```python

32

from datetime import datetime, timedelta

33

from airflow import DAG

34

from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

35

from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator

36

from airflow.providers.apache.spark.decorators.pyspark import pyspark_task

37

38

# Define DAG

39

dag = DAG(

40

'spark_example',

41

default_args={

42

'owner': 'data-team',

43

'retries': 1,

44

'retry_delay': timedelta(minutes=5),

45

},

46

description='Example Spark workflow',

47

schedule_interval=timedelta(days=1),

48

start_date=datetime(2023, 1, 1),

49

catchup=False,

50

)

51

52

# Submit a Spark application

53

spark_job = SparkSubmitOperator(

54

task_id='run_spark_job',

55

application='/path/to/spark_app.py',

56

conn_id='spark_default',

57

conf={

58

'spark.executor.memory': '4g',

59

'spark.executor.cores': '2',

60

'spark.driver.memory': '2g',

61

},

62

dag=dag,

63

)

64

65

# Execute SQL query with Spark

66

spark_sql = SparkSqlOperator(

67

task_id='run_spark_sql',

68

sql='SELECT COUNT(*) FROM users WHERE active = true',

69

conn_id='spark_sql_default',

70

dag=dag,

71

)

72

73

# PySpark task decorator example

74

@pyspark_task(task_id='process_data')

75

def process_user_data(spark):

76

df = spark.read.parquet('/data/users.parquet')

77

result = df.filter(df.active == True).groupBy('region').count()

78

result.write.mode('overwrite').parquet('/data/user_counts.parquet')

79

return result.count()

80

81

process_task = process_user_data()

82

83

# Set task dependencies

84

spark_job >> spark_sql >> process_task

85

```

86

87

## Architecture

88

89

The Apache Spark provider follows Airflow's standard provider pattern with distinct layers:

90

91

- **Operators**: Task-level components that execute Spark jobs within Airflow workflows

92

- **Hooks**: Connection management for various Spark interfaces (Submit, SQL, JDBC, Connect)

93

- **Decorators**: Pythonic task decorators that automatically inject Spark sessions

94

- **Connection Types**: Pre-configured connection interfaces for different Spark deployment modes

95

96

This design enables flexible integration with various Spark deployment architectures including local mode, YARN clusters, Kubernetes, Standalone clusters, and cloud-managed Spark services through consistent Airflow abstractions.

97

98

## Capabilities

99

100

### Spark Job Execution

101

102

Execute Spark applications using spark-submit binary with comprehensive configuration support. Handles Spark application submission, monitoring, and resource management across different cluster managers.

103

104

```python { .api }

105

class SparkSubmitOperator(BaseOperator):

106

def __init__(

107

self,

108

application: str = None,

109

conf: dict = None,

110

conn_id: str = 'spark_default',

111

files: str = None,

112

py_files: str = None,

113

archives: str = None,

114

driver_class_path: str = None,

115

jars: str = None,

116

java_class: str = None,

117

packages: str = None,

118

exclude_packages: str = None,

119

repositories: str = None,

120

total_executor_cores: int = None,

121

executor_cores: int = None,

122

executor_memory: str = None,

123

driver_memory: str = None,

124

keytab: str = None,

125

principal: str = None,

126

proxy_user: str = None,

127

name: str = None,

128

num_executors: int = None,

129

application_args: list = None,

130

env_vars: dict = None,

131

verbose: bool = False,

132

spark_binary: str = 'spark-submit',

133

properties_file: str = None,

134

**kwargs

135

): ...

136

137

def execute(self, context): ...

138

def on_kill(self): ...

139

```

140

141

[Spark Operators](./spark-operators.md)

142

143

### Spark Connection Management

144

145

Manage connections to various Spark interfaces including traditional spark-submit, Spark SQL, JDBC operations, and modern Spark Connect protocol. Provides connection configuration, authentication, and cluster communication.

146

147

```python { .api }

148

class SparkSubmitHook(BaseHook):

149

conn_name_attr = "conn_id"

150

default_conn_name = "spark_default"

151

conn_type = "spark"

152

hook_name = "Spark"

153

154

def submit(self, application: str, **kwargs) -> None: ...

155

def on_kill(self) -> None: ...

156

def get_conn(self): ...

157

158

class SparkConnectHook(BaseHook):

159

conn_name_attr = "conn_id"

160

default_conn_name = "spark_connect_default"

161

conn_type = "spark_connect"

162

hook_name = "Spark Connect"

163

164

def get_connection_url(self) -> str: ...

165

```

166

167

[Spark Hooks](./spark-hooks.md)

168

169

### PySpark Task Integration

170

171

Create PySpark tasks that automatically receive Spark session objects, enabling seamless integration of PySpark code within Airflow workflows with automatic session management and cleanup.

172

173

```python { .api }

174

def pyspark_task(

175

python_callable: Callable | None = None,

176

multiple_outputs: bool | None = None,

177

**kwargs,

178

) -> TaskDecorator: ...

179

180

# Usage example:

181

@pyspark_task

182

def my_spark_function(spark):

183

"""Function receives SparkSession as 'spark' parameter"""

184

df = spark.createDataFrame([(1, 'a'), (2, 'b')], ['id', 'value'])

185

return df.count()

186

```

187

188

[PySpark Decorators](./pyspark-decorators.md)

189

190

## Connection Types

191

192

The provider registers these connection types in Airflow:

193

194

1. **`spark`** - For SparkSubmitHook connections to Spark clusters

195

2. **`spark_sql`** - For SparkSqlHook connections to execute SQL queries

196

3. **`spark_jdbc`** - For SparkJDBCHook connections for database transfers

197

4. **`spark_connect`** - For SparkConnectHook connections using Spark Connect protocol

198

199

Each connection type provides custom UI fields for configuration including cluster URLs, authentication credentials, SSL settings, and deployment-specific parameters.

200

201

## Error Handling

202

203

Common exceptions that may be raised:

204

205

- **`AirflowException`** - General Spark job execution failures, configuration errors

206

- **`AirflowNotFoundException`** - Missing Spark applications, connection configurations

207

- **Connection errors** - Cluster connectivity issues, authentication failures

208

- **Spark application errors** - Application-specific failures, resource constraints

209

210

Handle these exceptions in your DAG error handling and retry logic as appropriate for your use case.