or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdpyspark-decorators.mdspark-hooks.mdspark-operators.md

spark-operators.mddocs/

0

# Spark Operators

1

2

Operators for executing various types of Spark jobs within Airflow workflows. These operators provide task-level components that handle Spark application submission, SQL query execution, and JDBC database operations with comprehensive configuration and monitoring capabilities.

3

4

## Capabilities

5

6

### Spark Application Submission

7

8

Execute Spark applications using the spark-submit binary with full support for cluster managers, resource configuration, authentication, and dependency management.

9

10

```python { .api }

11

class SparkSubmitOperator(BaseOperator):

12

"""

13

Execute Spark applications via spark-submit binary.

14

15

Parameters:

16

- application (str): Path to Spark application file (.py, .jar, .scala, .R) (default: "")

17

- conf (dict[Any, Any] | None): Spark configuration properties as key-value pairs

18

- conn_id (str): Airflow connection ID for Spark cluster (default: "spark_default")

19

- files (str | None): Comma-separated list of files to place in working directory

20

- py_files (str | None): Comma-separated list of .zip, .egg, .py files for Python path

21

- archives (str | None): Comma-separated list of archives to extract in working directory

22

- driver_class_path (str | None): Extra classpath entries for driver

23

- jars (str | None): Comma-separated list of JAR files to include

24

- java_class (str | None): Main class for Java/Scala applications

25

- packages (str | None): Maven coordinates of packages to include

26

- exclude_packages (str | None): Maven coordinates of packages to exclude

27

- repositories (str | None): Additional remote repositories for dependency resolution

28

- total_executor_cores (int | None): Total cores for all executors

29

- executor_cores (int | None): Number of cores per executor

30

- executor_memory (str | None): Memory per executor (e.g., '4g', '2048m')

31

- driver_memory (str | None): Memory for driver (e.g., '2g', '1024m')

32

- keytab (str | None): Path to Kerberos keytab file

33

- principal (str | None): Kerberos principal

34

- proxy_user (str | None): User to impersonate when running job

35

- name (str): Name for Spark application (default: "arrow-spark")

36

- num_executors (int | None): Number of executors to launch

37

- application_args (list[Any] | None): Arguments passed to main method of application

38

- env_vars (dict[str, Any] | None): Environment variables for Spark application

39

- verbose (bool): Enable verbose output (default: False)

40

- spark_binary (str | None): Spark binary to use (uses connection setting if not specified)

41

- properties_file (str | None): Path to properties file with Spark configuration

42

- yarn_queue (str | None): YARN queue to submit to

43

- deploy_mode (str | None): Deploy mode (client or cluster)

44

- status_poll_interval (int): Seconds between polls of driver status (default: 1)

45

- use_krb5ccache (bool): Use Kerberos credential cache (default: False)

46

- openlineage_inject_parent_job_info (bool): Inject OpenLineage parent job info (default: False)

47

- openlineage_inject_transport_info (bool): Inject OpenLineage transport info (default: False)

48

49

Template Fields: application, conf, files, py_files, jars, driver_class_path,

50

packages, exclude_packages, keytab, principal, proxy_user, name,

51

application_args, env_vars, properties_file

52

"""

53

54

template_fields = (

55

"application", "conf", "files", "py_files", "jars",

56

"driver_class_path", "packages", "exclude_packages",

57

"keytab", "principal", "proxy_user", "name",

58

"application_args", "env_vars", "properties_file"

59

)

60

61

def __init__(

62

self,

63

*,

64

application: str = "",

65

conf: dict[Any, Any] | None = None,

66

conn_id: str = "spark_default",

67

files: str | None = None,

68

py_files: str | None = None,

69

archives: str | None = None,

70

driver_class_path: str | None = None,

71

jars: str | None = None,

72

java_class: str | None = None,

73

packages: str | None = None,

74

exclude_packages: str | None = None,

75

repositories: str | None = None,

76

total_executor_cores: int | None = None,

77

executor_cores: int | None = None,

78

executor_memory: str | None = None,

79

driver_memory: str | None = None,

80

keytab: str | None = None,

81

principal: str | None = None,

82

proxy_user: str | None = None,

83

name: str = "arrow-spark",

84

num_executors: int | None = None,

85

status_poll_interval: int = 1,

86

application_args: list[Any] | None = None,

87

env_vars: dict[str, Any] | None = None,

88

verbose: bool = False,

89

spark_binary: str | None = None,

90

properties_file: str | None = None,

91

yarn_queue: str | None = None,

92

deploy_mode: str | None = None,

93

use_krb5ccache: bool = False,

94

openlineage_inject_parent_job_info: bool = False,

95

openlineage_inject_transport_info: bool = False,

96

**kwargs

97

): ...

98

99

def execute(self, context) -> None:

100

"""Execute Spark application using SparkSubmitHook."""

101

102

def on_kill(self) -> None:

103

"""Kill running Spark job."""

104

```

105

106

#### Usage Example

107

108

```python

109

from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

110

111

# Submit PySpark application

112

pyspark_job = SparkSubmitOperator(

113

task_id='process_data',

114

application='/path/to/data_processing.py',

115

conn_id='spark_cluster',

116

conf={

117

'spark.executor.memory': '4g',

118

'spark.executor.cores': '2',

119

'spark.driver.memory': '2g',

120

'spark.sql.adaptive.enabled': 'true',

121

'spark.sql.adaptive.coalescePartitions.enabled': 'true',

122

},

123

py_files='s3://bucket/dependencies.zip',

124

application_args=['--input', 's3://bucket/input/', '--output', 's3://bucket/output/'],

125

env_vars={'SPARK_ENV': 'production'},

126

dag=dag,

127

)

128

129

# Submit Scala/Java application

130

scala_job = SparkSubmitOperator(

131

task_id='run_scala_job',

132

application='/path/to/app.jar',

133

java_class='com.example.SparkApplication',

134

jars='/path/to/additional.jar',

135

packages='org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0',

136

conf={

137

'spark.executor.instances': '10',

138

'spark.executor.memory': '8g',

139

},

140

dag=dag,

141

)

142

```

143

144

### Spark SQL Execution

145

146

Execute Spark SQL queries using the spark-sql binary with support for Hive integration, query templating, and various output formats.

147

148

```python { .api }

149

class SparkSqlOperator(BaseOperator):

150

"""

151

Execute Spark SQL queries via spark-sql binary.

152

153

Parameters:

154

- sql (str): SQL query to execute (can be templated)

155

- conf (dict[str, Any] | str | None): Spark configuration properties

156

- conn_id (str): Airflow connection ID (default: "spark_sql_default")

157

- total_executor_cores (int | None): Total cores for all executors

158

- executor_cores (int | None): Number of cores per executor

159

- executor_memory (str | None): Memory per executor

160

- keytab (str | None): Path to Kerberos keytab file

161

- principal (str | None): Kerberos principal

162

- master (str | None): Cluster manager URL (default: None, uses connection)

163

- name (str): Name for Spark application (default: "default-name")

164

- num_executors (int | None): Number of executors

165

- verbose (bool): Enable verbose output (default: True)

166

- yarn_queue (str | None): YARN queue name

167

168

Template Fields: sql

169

Template Extensions: .sql, .hql

170

"""

171

172

template_fields = ("sql",)

173

template_ext = (".sql", ".hql")

174

template_fields_renderers = {"sql": "sql"}

175

176

def __init__(

177

self,

178

*,

179

sql: str,

180

conf: dict[str, Any] | str | None = None,

181

conn_id: str = "spark_sql_default",

182

total_executor_cores: int | None = None,

183

executor_cores: int | None = None,

184

executor_memory: str | None = None,

185

keytab: str | None = None,

186

principal: str | None = None,

187

master: str | None = None,

188

name: str = "default-name",

189

num_executors: int | None = None,

190

verbose: bool = True,

191

yarn_queue: str | None = None,

192

**kwargs

193

): ...

194

195

def execute(self, context) -> None:

196

"""Execute SQL query using SparkSqlHook."""

197

198

def on_kill(self) -> None:

199

"""Kill running SQL query."""

200

```

201

202

#### Usage Example

203

204

```python

205

from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator

206

207

# Execute SQL query from string

208

sql_analysis = SparkSqlOperator(

209

task_id='analyze_user_data',

210

sql="""

211

CREATE TABLE user_summary AS

212

SELECT region,

213

COUNT(*) as user_count,

214

AVG(age) as avg_age,

215

SUM(total_purchases) as total_revenue

216

FROM users

217

WHERE active = true

218

AND registration_date >= '{{ ds }}'

219

GROUP BY region

220

""",

221

conn_id='spark_sql_cluster',

222

conf={

223

'spark.sql.adaptive.enabled': 'true',

224

'spark.sql.adaptive.coalescePartitions.enabled': 'true',

225

},

226

dag=dag,

227

)

228

229

# Execute SQL from file

230

sql_from_file = SparkSqlOperator(

231

task_id='run_etl_script',

232

sql='etl_queries.sql', # File in DAG folder or templated path

233

conn_id='spark_sql_default',

234

dag=dag,

235

)

236

```

237

238

### Spark JDBC Operations

239

240

Transfer data between Spark and JDBC databases with support for batch processing, partitioning, and various save modes.

241

242

```python { .api }

243

class SparkJDBCOperator(SparkSubmitOperator):

244

"""

245

Execute Spark JDBC operations to transfer data between Spark and databases.

246

Inherits from SparkSubmitOperator for Spark configuration.

247

248

Parameters:

249

- spark_app_name (str): Name for Spark application (default: 'airflow-spark-jdbc')

250

- spark_conn_id (str): Spark connection ID (default: 'spark_default')

251

- spark_conf (dict): Spark configuration properties

252

- spark_py_files (str): Python files for Spark

253

- spark_files (str): Files for Spark

254

- spark_jars (str): JAR files for Spark (include JDBC drivers)

255

- num_executors (int): Number of Spark executors

256

- executor_cores (int): Cores per executor

257

- executor_memory (str): Memory per executor

258

- driver_memory (str): Driver memory

259

- verbose (bool): Enable verbose output

260

- principal (str): Kerberos principal

261

- keytab (str): Kerberos keytab path

262

- cmd_type (str): Operation type ('spark_to_jdbc' or 'jdbc_to_spark')

263

- jdbc_table (str): JDBC table name

264

- jdbc_conn_id (str): JDBC connection ID (default: 'jdbc_default')

265

- jdbc_driver (str): JDBC driver class name

266

- metastore_table (str): Spark metastore table name

267

- jdbc_truncate (bool): Truncate JDBC table before write (default: False)

268

- save_mode (str): Save mode ('append', 'overwrite', 'ignore', 'error')

269

- save_format (str): Save format ('json', 'parquet', 'csv', etc.)

270

- batch_size (int): JDBC batch size for writes

271

- fetch_size (int): JDBC fetch size for reads

272

- num_partitions (int): Number of partitions for parallel reads/writes

273

- partition_column (str): Column for partitioning JDBC reads

274

- lower_bound (str): Lower bound for partition column

275

- upper_bound (str): Upper bound for partition column

276

- create_table_column_types (str): Column types for table creation

277

- use_krb5ccache (bool): Use Kerberos credential cache

278

"""

279

280

def __init__(

281

self,

282

spark_app_name: str = 'airflow-spark-jdbc',

283

spark_conn_id: str = 'spark_default',

284

spark_conf: dict[str, Any] | None = None,

285

spark_py_files: str = None,

286

spark_files: str = None,

287

spark_jars: str = None,

288

num_executors: int = None,

289

executor_cores: int = None,

290

executor_memory: str = None,

291

driver_memory: str = None,

292

verbose: bool = False,

293

principal: str = None,

294

keytab: str = None,

295

cmd_type: str = 'spark_to_jdbc',

296

jdbc_table: str = None,

297

jdbc_conn_id: str = 'jdbc_default',

298

jdbc_driver: str = None,

299

metastore_table: str = None,

300

jdbc_truncate: bool = False,

301

save_mode: str = None,

302

save_format: str = None,

303

batch_size: int = None,

304

fetch_size: int = None,

305

num_partitions: int = None,

306

partition_column: str = None,

307

lower_bound: str = None,

308

upper_bound: str = None,

309

create_table_column_types: str = None,

310

use_krb5ccache: bool = False,

311

**kwargs

312

): ...

313

314

def execute(self, context) -> None:

315

"""Execute JDBC transfer using SparkJDBCHook."""

316

317

def on_kill(self) -> None:

318

"""Kill running JDBC job."""

319

```

320

321

#### Usage Example

322

323

```python

324

from airflow.providers.apache.spark.operators.spark_jdbc import SparkJDBCOperator

325

326

# Transfer data from Spark to database

327

spark_to_db = SparkJDBCOperator(

328

task_id='export_results_to_db',

329

spark_conn_id='spark_cluster',

330

jdbc_conn_id='postgres_default',

331

cmd_type='spark_to_jdbc',

332

metastore_table='processed_data',

333

jdbc_table='analytics_results',

334

jdbc_driver='org.postgresql.Driver',

335

spark_jars='postgresql-42.2.18.jar',

336

save_mode='overwrite',

337

jdbc_truncate=True,

338

batch_size=10000,

339

dag=dag,

340

)

341

342

# Transfer data from database to Spark with partitioning

343

db_to_spark = SparkJDBCOperator(

344

task_id='load_data_from_db',

345

spark_conn_id='spark_cluster',

346

jdbc_conn_id='mysql_warehouse',

347

cmd_type='jdbc_to_spark',

348

jdbc_table='large_table',

349

metastore_table='imported_data',

350

jdbc_driver='com.mysql.cj.jdbc.Driver',

351

spark_jars='mysql-connector-java-8.0.23.jar',

352

num_partitions=10,

353

partition_column='id',

354

lower_bound='1',

355

upper_bound='1000000',

356

fetch_size=50000,

357

dag=dag,

358

)

359

```

360

361

## Error Handling

362

363

Common exceptions and error scenarios:

364

365

### Application Errors

366

- **Application not found**: Verify application path is accessible to Spark cluster

367

- **Class not found**: Ensure main class exists and dependencies are included

368

- **Resource allocation failures**: Check cluster capacity and resource requirements

369

370

### Configuration Errors

371

- **Invalid Spark configuration**: Validate conf parameters against Spark documentation

372

- **Connection failures**: Verify connection configurations and cluster accessibility

373

- **Authentication errors**: Check Kerberos settings, keytab files, and principals

374

375

### JDBC Errors

376

- **Driver not found**: Include JDBC driver JAR in spark_jars parameter

377

- **Connection failures**: Verify JDBC connection settings and database accessibility

378

- **Table/column errors**: Ensure target tables exist and column types are compatible

379

380

### Best Practices

381

- Use connection pooling for JDBC operations

382

- Configure appropriate batch sizes for data transfers

383

- Monitor cluster resources and adjust executor settings

384

- Use partitioning for large dataset transfers

385

- Implement proper error handling and retry logic in DAGs