or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdpyspark-decorators.mdspark-hooks.mdspark-operators.md

spark-hooks.mddocs/

0

# Spark Hooks

1

2

Connection management hooks for various Spark interfaces including spark-submit, Spark SQL, JDBC operations, and Spark Connect protocol. These hooks handle authentication, connection configuration, and communication with different Spark deployment modes.

3

4

## Capabilities

5

6

### Spark Submit Connection Management

7

8

Manage connections for spark-submit operations across different cluster managers with support for comprehensive Spark configuration and resource management.

9

10

```python { .api }

11

class SparkSubmitHook(BaseHook):

12

"""

13

Hook for spark-submit binary execution with extensive configuration support.

14

15

Connection Information:

16

- conn_name_attr: "conn_id"

17

- default_conn_name: "spark_default"

18

- conn_type: "spark"

19

- hook_name: "Spark"

20

21

Parameters:

22

- conf (dict): Spark configuration properties

23

- conn_id (str): Connection ID (default: 'spark_default')

24

- files (str): Files to place in working directory

25

- py_files (str): Python files for Python path

26

- archives (str): Archives to extract

27

- driver_class_path (str): Extra classpath for driver

28

- jars (str): JAR files to include

29

- java_class (str): Main class for Java/Scala apps

30

- packages (str): Maven packages to include

31

- exclude_packages (str): Maven packages to exclude

32

- repositories (str): Additional repositories

33

- total_executor_cores (int): Total cores for executors

34

- executor_cores (int): Cores per executor

35

- executor_memory (str): Memory per executor

36

- driver_memory (str): Driver memory

37

- keytab (str): Kerberos keytab path

38

- principal (str): Kerberos principal

39

- proxy_user (str): User to impersonate

40

- name (str): Application name

41

- num_executors (int): Number of executors

42

- status_poll_interval (int): Poll interval in seconds (default: 1)

43

- application_args (list): Application arguments

44

- env_vars (dict): Environment variables

45

- verbose (bool): Verbose output (default: False)

46

- spark_binary (str): Spark binary (default: 'spark-submit')

47

- properties_file (str): Properties file path

48

- yarn_queue (str): YARN queue name

49

- deploy_mode (str): Deploy mode (client or cluster)

50

- use_krb5ccache (bool): Use Kerberos credential cache (default: False)

51

"""

52

53

conn_name_attr = "conn_id"

54

default_conn_name = "spark_default"

55

conn_type = "spark"

56

hook_name = "Spark"

57

58

def __init__(

59

self,

60

conf: dict = None,

61

conn_id: str = 'spark_default',

62

files: str = None,

63

py_files: str = None,

64

archives: str = None,

65

driver_class_path: str = None,

66

jars: str = None,

67

java_class: str = None,

68

packages: str = None,

69

exclude_packages: str = None,

70

repositories: str = None,

71

total_executor_cores: int = None,

72

executor_cores: int = None,

73

executor_memory: str = None,

74

driver_memory: str = None,

75

keytab: str = None,

76

principal: str = None,

77

proxy_user: str = None,

78

name: str = 'default-name',

79

num_executors: int = None,

80

status_poll_interval: int = 1,

81

application_args: list = None,

82

env_vars: dict = None,

83

verbose: bool = False,

84

spark_binary: str = 'spark-submit',

85

properties_file: str = None,

86

yarn_queue: str = None,

87

deploy_mode: str = None,

88

*,

89

use_krb5ccache: bool = False

90

): ...

91

92

def submit(self, application: str, **kwargs) -> None:

93

"""

94

Submit Spark application for execution.

95

96

Parameters:

97

- application (str): Path to Spark application file

98

- **kwargs: Additional arguments override hook defaults

99

"""

100

101

def on_kill(self) -> None:

102

"""Kill the running Spark job."""

103

104

def get_conn(self):

105

"""Get connection (no-op for Spark submit)."""

106

107

@staticmethod

108

def get_ui_field_behaviour() -> dict:

109

"""Return UI field configuration for connection form."""

110

111

@staticmethod

112

def get_connection_form_widgets() -> dict:

113

"""Return connection form widgets configuration."""

114

```

115

116

#### Usage Example

117

118

```python

119

from airflow.providers.apache.spark.hooks.spark_submit import SparkSubmitHook

120

121

# Create hook with configuration

122

hook = SparkSubmitHook(

123

conn_id='spark_cluster',

124

conf={

125

'spark.executor.memory': '4g',

126

'spark.executor.cores': '2',

127

'spark.driver.memory': '2g',

128

},

129

packages='org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0',

130

verbose=True

131

)

132

133

# Submit application

134

hook.submit(

135

application='/path/to/app.py',

136

application_args=['--input', 'hdfs://data/input', '--output', 'hdfs://data/output']

137

)

138

```

139

140

### Spark SQL Connection Management

141

142

Manage connections for Spark SQL operations with support for Hive integration and distributed SQL execution.

143

144

```python { .api }

145

class SparkSqlHook(BaseHook):

146

"""

147

Hook for spark-sql binary execution with SQL query support.

148

149

Connection Information:

150

- conn_name_attr: "conn_id"

151

- default_conn_name: "spark_sql_default"

152

- conn_type: "spark_sql"

153

- hook_name: "Spark SQL"

154

155

Parameters:

156

- sql (str): SQL query to execute

157

- conf (dict): Spark configuration properties

158

- conn_id (str): Connection ID (default: 'spark_sql_default')

159

- total_executor_cores (int): Total cores for executors

160

- executor_cores (int): Cores per executor

161

- executor_memory (str): Memory per executor

162

- keytab (str): Kerberos keytab path

163

- principal (str): Kerberos principal

164

- master (str): Cluster manager URL (default: 'yarn')

165

- name (str): Application name (default: 'default')

166

- num_executors (int): Number of executors

167

- verbose (bool): Verbose output (default: True)

168

- yarn_queue (str): YARN queue (default: 'default')

169

- properties_file (str): Properties file path

170

- application_args (list): Additional arguments

171

"""

172

173

conn_name_attr = "conn_id"

174

default_conn_name = "spark_sql_default"

175

conn_type = "spark_sql"

176

hook_name = "Spark SQL"

177

178

def __init__(

179

self,

180

sql: str = None,

181

conf: dict = None,

182

conn_id: str = 'spark_sql_default',

183

total_executor_cores: int = None,

184

executor_cores: int = None,

185

executor_memory: str = None,

186

keytab: str = None,

187

principal: str = None,

188

master: str = 'yarn',

189

name: str = 'default',

190

num_executors: int = None,

191

verbose: bool = True,

192

yarn_queue: str = 'default',

193

properties_file: str = None,

194

application_args: list = None

195

): ...

196

197

def run_query(self, cmd: str = None, **kwargs) -> None:

198

"""

199

Execute Spark SQL query.

200

201

Parameters:

202

- cmd (str): SQL command to execute (overrides sql parameter)

203

- **kwargs: Additional execution parameters

204

"""

205

206

def kill(self) -> None:

207

"""Kill the running SQL query."""

208

209

def get_conn(self):

210

"""Get connection (no-op for Spark SQL)."""

211

212

@staticmethod

213

def get_ui_field_behaviour() -> dict:

214

"""Return UI field configuration for connection form."""

215

216

@staticmethod

217

def get_connection_form_widgets() -> dict:

218

"""Return connection form widgets configuration."""

219

```

220

221

#### Usage Example

222

223

```python

224

from airflow.providers.apache.spark.hooks.spark_sql import SparkSqlHook

225

226

# Create hook for SQL execution

227

hook = SparkSqlHook(

228

conn_id='spark_sql_cluster',

229

conf={

230

'spark.sql.adaptive.enabled': 'true',

231

'spark.sql.adaptive.coalescePartitions.enabled': 'true',

232

},

233

master='yarn',

234

yarn_queue='analytics'

235

)

236

237

# Execute SQL query

238

hook.run_query("""

239

CREATE TABLE daily_summary AS

240

SELECT date, region, COUNT(*) as transactions

241

FROM sales

242

WHERE date = current_date()

243

GROUP BY date, region

244

""")

245

```

246

247

### Spark JDBC Connection Management

248

249

Manage connections for Spark JDBC operations with database integration support including authentication and connection pooling.

250

251

```python { .api }

252

class SparkJDBCHook(SparkSubmitHook):

253

"""

254

Hook for Spark JDBC operations, extends SparkSubmitHook.

255

256

Connection Information:

257

- conn_name_attr: "spark_conn_id"

258

- default_conn_name: "spark_default"

259

- conn_type: "spark_jdbc"

260

- hook_name: "Spark JDBC"

261

262

Parameters:

263

- spark_app_name (str): Spark application name (default: 'airflow-spark-jdbc')

264

- spark_conn_id (str): Spark connection ID (default: 'spark_default')

265

- spark_conf (dict): Spark configuration

266

- spark_py_files (str): Python files

267

- spark_files (str): Additional files

268

- spark_jars (str): JAR files (include JDBC drivers)

269

- num_executors (int): Number of executors

270

- executor_cores (int): Cores per executor

271

- executor_memory (str): Memory per executor

272

- driver_memory (str): Driver memory

273

- verbose (bool): Verbose output

274

- keytab (str): Kerberos keytab

275

- principal (str): Kerberos principal

276

- cmd_type (str): Operation type ('spark_to_jdbc', 'jdbc_to_spark')

277

- jdbc_table (str): JDBC table name

278

- jdbc_conn_id (str): JDBC connection ID (default: 'jdbc_default')

279

- jdbc_driver (str): JDBC driver class

280

- metastore_table (str): Spark metastore table

281

- jdbc_truncate (bool): Truncate table before write

282

- save_mode (str): Save mode for writes

283

- save_format (str): Data format

284

- batch_size (int): JDBC batch size

285

- fetch_size (int): JDBC fetch size

286

- num_partitions (int): Number of partitions

287

- partition_column (str): Partitioning column

288

- lower_bound (str): Partition lower bound

289

- upper_bound (str): Partition upper bound

290

- create_table_column_types (str): Column types for table creation

291

- use_krb5ccache (bool): Use Kerberos credential cache

292

"""

293

294

conn_name_attr = "spark_conn_id"

295

default_conn_name = "spark_default"

296

conn_type = "spark_jdbc"

297

hook_name = "Spark JDBC"

298

299

def __init__(

300

self,

301

spark_app_name: str = 'airflow-spark-jdbc',

302

spark_conn_id: str = 'spark_default',

303

spark_conf: dict = None,

304

spark_py_files: str = None,

305

spark_files: str = None,

306

spark_jars: str = None,

307

num_executors: int = None,

308

executor_cores: int = None,

309

executor_memory: str = None,

310

driver_memory: str = None,

311

verbose: bool = False,

312

keytab: str = None,

313

principal: str = None,

314

cmd_type: str = 'spark_to_jdbc',

315

jdbc_table: str = None,

316

jdbc_conn_id: str = 'jdbc_default',

317

jdbc_driver: str = None,

318

metastore_table: str = None,

319

jdbc_truncate: bool = False,

320

save_mode: str = None,

321

save_format: str = None,

322

batch_size: int = None,

323

fetch_size: int = None,

324

num_partitions: int = None,

325

partition_column: str = None,

326

lower_bound: str = None,

327

upper_bound: str = None,

328

create_table_column_types: str = None,

329

use_krb5ccache: bool = False

330

): ...

331

332

def submit_jdbc_job(self) -> None:

333

"""Submit Spark JDBC transfer job."""

334

335

def get_conn(self):

336

"""Get connection (no-op for Spark JDBC)."""

337

```

338

339

#### Usage Example

340

341

```python

342

from airflow.providers.apache.spark.hooks.spark_jdbc import SparkJDBCHook

343

344

# Create hook for database transfer

345

hook = SparkJDBCHook(

346

spark_conn_id='spark_cluster',

347

jdbc_conn_id='postgres_warehouse',

348

cmd_type='jdbc_to_spark',

349

jdbc_table='customer_data',

350

metastore_table='customers',

351

jdbc_driver='org.postgresql.Driver',

352

spark_jars='postgresql-42.2.18.jar',

353

num_partitions=8,

354

partition_column='customer_id',

355

lower_bound='1',

356

upper_bound='1000000'

357

)

358

359

# Execute transfer

360

hook.submit_jdbc_job()

361

```

362

363

### Spark Connect Connection Management

364

365

Manage connections using the modern Spark Connect protocol for improved performance and scalability.

366

367

```python { .api }

368

class SparkConnectHook(BaseHook):

369

"""

370

Hook for Spark Connect protocol connections.

371

372

Connection Information:

373

- conn_name_attr: "conn_id"

374

- default_conn_name: "spark_connect_default"

375

- conn_type: "spark_connect"

376

- hook_name: "Spark Connect"

377

378

Constants:

379

- PARAM_USE_SSL: "use_ssl"

380

- PARAM_TOKEN: "token"

381

- PARAM_USER_ID: "user_id"

382

383

Parameters:

384

- conn_id (str): Connection ID (default: 'spark_connect_default')

385

"""

386

387

conn_name_attr = "conn_id"

388

default_conn_name = "spark_connect_default"

389

conn_type = "spark_connect"

390

hook_name = "Spark Connect"

391

392

PARAM_USE_SSL = "use_ssl"

393

PARAM_TOKEN = "token"

394

PARAM_USER_ID = "user_id"

395

396

def __init__(self, conn_id: str = 'spark_connect_default'): ...

397

398

def get_connection_url(self) -> str:

399

"""

400

Build Spark Connect connection URL.

401

402

Returns:

403

str: Complete Spark Connect URL for client connections

404

"""

405

406

@staticmethod

407

def get_ui_field_behaviour() -> dict:

408

"""Return UI field configuration for connection form."""

409

410

@staticmethod

411

def get_connection_form_widgets() -> dict:

412

"""Return connection form widgets configuration."""

413

```

414

415

#### Usage Example

416

417

```python

418

from airflow.providers.apache.spark.hooks.spark_connect import SparkConnectHook

419

420

# Create Spark Connect hook

421

hook = SparkConnectHook(conn_id='spark_connect_cluster')

422

423

# Get connection URL for Spark Connect client

424

connect_url = hook.get_connection_url()

425

# Returns: sc://hostname:15002/;token=abc123;user_id=airflow

426

427

# Use with Spark Connect client

428

from pyspark.sql import SparkSession

429

430

spark = SparkSession.builder \

431

.remote(connect_url) \

432

.appName("Airflow Spark Connect") \

433

.getOrCreate()

434

```

435

436

## Connection Configuration

437

438

### Spark Connection (`spark`)

439

- **Host**: Cluster manager URL (e.g., `yarn`, `spark://master:7077`, `k8s://api-server`)

440

- **Extra**: JSON with additional Spark configuration

441

- **Login/Password**: For cluster authentication if required

442

443

### Spark SQL Connection (`spark_sql`)

444

- **Host**: Cluster manager URL

445

- **Extra**: JSON with Spark SQL specific configuration

446

- **Schema**: Default database/schema

447

448

### Spark JDBC Connection (`spark_jdbc`)

449

- **Inherits**: Spark connection configuration

450

- **Extra**: JDBC-specific settings and driver configuration

451

452

### Spark Connect Connection (`spark_connect`)

453

- **Host**: Spark Connect server hostname

454

- **Port**: Spark Connect server port (default: 15002)

455

- **Extra**: JSON with SSL settings, tokens, and user authentication

456

457

## Error Handling and Best Practices

458

459

### Connection Management

460

- **Connection pooling**: Configure appropriate pool sizes for JDBC operations

461

- **Authentication**: Ensure proper Kerberos or token-based authentication setup

462

- **SSL/TLS**: Use secure connections for production environments

463

- **Timeouts**: Configure appropriate connection and query timeouts

464

465

### Resource Management

466

- **Memory allocation**: Balance driver and executor memory based on workload

467

- **Core allocation**: Optimize core allocation for cluster utilization

468

- **Dynamic allocation**: Use Spark's dynamic allocation for variable workloads

469

470

### Error Recovery

471

- **Retry logic**: Implement retry mechanisms for transient failures

472

- **Graceful degradation**: Handle cluster unavailability scenarios

473

- **Resource monitoring**: Monitor cluster resources and job progress