or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-launcher.mdcore-engine.mdgraph-processing.mdindex.mdmachine-learning.mdsql-dataframes.mdstream-processing.md

application-launcher.mddocs/

0

# Application Launcher

1

2

The Spark Launcher provides a programmatic way to launch Spark applications from Java or Scala code. This is useful for building tools and applications that need to submit Spark jobs programmatically.

3

4

## SparkLauncher

5

6

The main class for launching Spark applications programmatically.

7

8

```java { .api }

9

public class SparkLauncher {

10

// Constructors

11

public SparkLauncher()

12

public SparkLauncher(Map<String, String> env)

13

14

// Application configuration

15

public SparkLauncher setAppName(String appName)

16

public SparkLauncher setMaster(String master)

17

public SparkLauncher setAppResource(String resource)

18

public SparkLauncher setMainClass(String mainClass)

19

public SparkLauncher addAppArgs(String... args)

20

public SparkLauncher addJar(String jar)

21

public SparkLauncher addPyFile(String file)

22

public SparkLauncher addFile(String file)

23

24

// Spark configuration

25

public SparkLauncher setConf(String key, String value)

26

public SparkLauncher setPropertiesFile(String path)

27

public SparkLauncher setVerbose(boolean verbose)

28

29

// Environment configuration

30

public SparkLauncher setSparkHome(String sparkHome)

31

public SparkLauncher redirectError()

32

public SparkLauncher redirectError(ProcessBuilder.Redirect redirect)

33

public SparkLauncher redirectOutput(ProcessBuilder.Redirect redirect)

34

public SparkLauncher redirectToLog(String loggerName)

35

36

// Launching methods

37

public Process launch() throws IOException

38

public SparkAppHandle startApplication() throws IOException

39

public SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) throws IOException

40

41

// Constants for configuration keys

42

public static final String SPARK_HOME = "spark.home"

43

public static final String EXECUTOR_MEMORY = "spark.executor.memory"

44

public static final String EXECUTOR_CORES = "spark.executor.cores"

45

public static final String EXECUTOR_INSTANCES = "spark.executor.instances"

46

public static final String DRIVER_MEMORY = "spark.driver.memory"

47

public static final String DRIVER_CORES = "spark.driver.cores"

48

public static final String DRIVER_CLASS_PATH = "spark.driver.extraClassPath"

49

public static final String DRIVER_JAVA_OPTIONS = "spark.driver.extraJavaOptions"

50

public static final String DRIVER_LIBRARY_PATH = "spark.driver.extraLibraryPath"

51

public static final String EXECUTOR_EXTRA_CLASSPATH = "spark.executor.extraClassPath"

52

public static final String EXECUTOR_EXTRA_JAVA_OPTIONS = "spark.executor.extraJavaOptions"

53

public static final String EXECUTOR_EXTRA_LIBRARY_PATH = "spark.executor.extraLibraryPath"

54

public static final String PYSPARK_DRIVER_PYTHON = "spark.pyspark.driver.python"

55

public static final String PYSPARK_PYTHON = "spark.pyspark.python"

56

57

// Child process management

58

public static class Builder {

59

public Builder setSparkHome(String sparkHome)

60

public Builder setPropertiesFile(String path)

61

public Builder setConf(String key, String value)

62

public Builder setAppName(String name)

63

public Builder setMaster(String master)

64

public Builder setMainClass(String mainClass)

65

public Builder setAppResource(String resource)

66

public Builder addAppArgs(String... args)

67

public Builder addJar(String jar)

68

public Builder addPyFile(String file)

69

public Builder addFile(String file)

70

public Builder setVerbose(boolean verbose)

71

public SparkLauncher build()

72

}

73

}

74

```

75

76

### Usage Examples

77

78

```java

79

import org.apache.spark.launcher.SparkLauncher;

80

import org.apache.spark.launcher.SparkAppHandle;

81

82

// Basic launcher setup

83

SparkLauncher launcher = new SparkLauncher()

84

.setAppName("MySparkApp")

85

.setMaster("yarn")

86

.setAppResource("/path/to/my-app.jar")

87

.setMainClass("com.example.MyMainClass")

88

.addAppArgs("arg1", "arg2", "arg3")

89

.setConf(SparkLauncher.DRIVER_MEMORY, "2g")

90

.setConf(SparkLauncher.EXECUTOR_MEMORY, "4g")

91

.setConf(SparkLauncher.EXECUTOR_CORES, "4")

92

.setConf(SparkLauncher.EXECUTOR_INSTANCES, "10")

93

.setVerbose(true);

94

95

// Launch and get handle

96

SparkAppHandle handle = launcher.startApplication();

97

98

// Launch as process

99

Process process = launcher.launch();

100

101

// With environment variables

102

Map<String, String> env = new HashMap<>();

103

env.put("JAVA_HOME", "/usr/lib/jvm/java-8-openjdk");

104

SparkLauncher envLauncher = new SparkLauncher(env);

105

```

106

107

```scala

108

// Scala usage

109

import org.apache.spark.launcher.{SparkLauncher, SparkAppHandle}

110

import scala.collection.JavaConverters._

111

112

val launcher = new SparkLauncher()

113

.setAppName("MySparkApp")

114

.setMaster("local[*]")

115

.setAppResource("/path/to/my-app.jar")

116

.setMainClass("com.example.MyMainClass")

117

.addAppArgs("arg1", "arg2")

118

.setConf("spark.sql.adaptive.enabled", "true")

119

.setConf("spark.sql.adaptive.coalescePartitions.enabled", "true")

120

121

val handle = launcher.startApplication()

122

```

123

124

## SparkAppHandle

125

126

Interface for monitoring and controlling launched Spark applications.

127

128

```java { .api }

129

public interface SparkAppHandle {

130

// Application information

131

String getAppId()

132

State getState()

133

134

// Control operations

135

void kill()

136

void disconnect()

137

138

// Monitoring

139

void addListener(Listener listener)

140

141

// State enumeration

142

enum State {

143

UNKNOWN, // The application state is not known.

144

CONNECTED, // The application has just been submitted.

145

SUBMITTED, // The application has been submitted to the cluster manager.

146

RUNNING, // The application is running.

147

FINISHED, // The application finished with a successful status.

148

FAILED, // The application finished with a failed status.

149

KILLED, // The application was killed.

150

LOST // The Spark Launcher is not able to contact the application.

151

}

152

153

// Listener interface for state changes

154

interface Listener {

155

void stateChanged(SparkAppHandle handle)

156

void infoChanged(SparkAppHandle handle)

157

}

158

}

159

```

160

161

### Usage Examples

162

163

```java

164

import org.apache.spark.launcher.SparkAppHandle;

165

166

// Launch application and monitor

167

SparkAppHandle handle = launcher.startApplication();

168

169

// Add listener for state changes

170

handle.addListener(new SparkAppHandle.Listener() {

171

@Override

172

public void stateChanged(SparkAppHandle handle) {

173

System.out.println("Application state changed to: " + handle.getState());

174

if (handle.getState().isFinal()) {

175

System.out.println("Application " + handle.getAppId() + " finished.");

176

}

177

}

178

179

@Override

180

public void infoChanged(SparkAppHandle handle) {

181

System.out.println("Application info changed for: " + handle.getAppId());

182

}

183

});

184

185

// Monitor application state

186

SparkAppHandle.State state = handle.getState();

187

while (!state.isFinal()) {

188

Thread.sleep(1000);

189

state = handle.getState();

190

System.out.println("Current state: " + state);

191

}

192

193

// Get final application ID

194

String appId = handle.getAppId();

195

System.out.println("Application ID: " + appId);

196

197

// Kill application if needed

198

if (someCondition) {

199

handle.kill();

200

}

201

202

// Disconnect when done monitoring

203

handle.disconnect();

204

```

205

206

```scala

207

// Scala usage with future

208

import scala.concurrent.{Future, Promise}

209

import scala.concurrent.ExecutionContext.Implicits.global

210

211

def launchAndWait(launcher: SparkLauncher): Future[SparkAppHandle.State] = {

212

val promise = Promise[SparkAppHandle.State]()

213

val handle = launcher.startApplication()

214

215

handle.addListener(new SparkAppHandle.Listener {

216

override def stateChanged(handle: SparkAppHandle): Unit = {

217

val state = handle.getState

218

println(s"State changed to: $state")

219

if (state.isFinal) {

220

promise.success(state)

221

}

222

}

223

224

override def infoChanged(handle: SparkAppHandle): Unit = {

225

println(s"Info changed for app: ${handle.getAppId}")

226

}

227

})

228

229

promise.future

230

}

231

232

// Usage

233

val finalState = launchAndWait(launcher)

234

finalState.foreach { state =>

235

println(s"Application finished with state: $state")

236

}

237

```

238

239

## InProcessLauncher

240

241

For launching applications in the same JVM process (useful for testing).

242

243

```java { .api }

244

public class InProcessLauncher extends SparkLauncher {

245

public InProcessLauncher()

246

public InProcessLauncher(Map<String, String> env)

247

248

// Override launch methods to run in-process

249

@Override

250

public SparkAppHandle startApplication() throws IOException

251

@Override

252

public SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) throws IOException

253

}

254

```

255

256

### Usage Examples

257

258

```java

259

// For testing or embedding Spark applications

260

import org.apache.spark.launcher.InProcessLauncher;

261

262

InProcessLauncher inProcessLauncher = new InProcessLauncher()

263

.setAppName("TestApp")

264

.setMaster("local[2]")

265

.setAppResource("local:/path/to/app.jar")

266

.setMainClass("com.example.TestApp");

267

268

SparkAppHandle handle = inProcessLauncher.startApplication();

269

```

270

271

## Launcher Configuration

272

273

Common configuration patterns and utilities.

274

275

### Environment Variables

276

277

```java

278

// Setting up environment for launcher

279

Map<String, String> env = new HashMap<>();

280

env.put("SPARK_HOME", "/opt/spark");

281

env.put("JAVA_HOME", "/usr/lib/jvm/java-8-openjdk");

282

env.put("HADOOP_HOME", "/opt/hadoop");

283

env.put("YARN_CONF_DIR", "/etc/hadoop/conf");

284

285

SparkLauncher launcher = new SparkLauncher(env);

286

```

287

288

### Resource Configuration

289

290

```java

291

// Memory and CPU configuration

292

launcher

293

.setConf(SparkLauncher.DRIVER_MEMORY, "4g")

294

.setConf(SparkLauncher.DRIVER_CORES, "2")

295

.setConf(SparkLauncher.EXECUTOR_MEMORY, "8g")

296

.setConf(SparkLauncher.EXECUTOR_CORES, "4")

297

.setConf(SparkLauncher.EXECUTOR_INSTANCES, "20");

298

299

// Dynamic allocation

300

launcher

301

.setConf("spark.dynamicAllocation.enabled", "true")

302

.setConf("spark.dynamicAllocation.minExecutors", "5")

303

.setConf("spark.dynamicAllocation.maxExecutors", "50")

304

.setConf("spark.dynamicAllocation.initialExecutors", "10");

305

```

306

307

### Cluster Configuration

308

309

```java

310

// YARN configuration

311

launcher

312

.setMaster("yarn")

313

.setConf("spark.submit.deployMode", "cluster")

314

.setConf("spark.yarn.queue", "production")

315

.setConf("spark.yarn.jars", "hdfs://namenode:port/spark/jars/*");

316

317

// Kubernetes configuration

318

launcher

319

.setMaster("k8s://https://kubernetes-api-server:443")

320

.setConf("spark.kubernetes.container.image", "spark:latest")

321

.setConf("spark.kubernetes.namespace", "spark-jobs")

322

.setConf("spark.executor.instances", "10");

323

324

// Standalone cluster

325

launcher

326

.setMaster("spark://master-host:7077")

327

.setConf("spark.cores.max", "100")

328

.setConf("spark.executor.memory", "4g");

329

```

330

331

### Dependency Management

332

333

```java

334

// Adding JARs and files

335

launcher

336

.addJar("hdfs://namenode:port/path/to/dependency1.jar")

337

.addJar("hdfs://namenode:port/path/to/dependency2.jar")

338

.addFile("hdfs://namenode:port/path/to/config.properties")

339

.addPyFile("hdfs://namenode:port/path/to/module.py");

340

341

// Python-specific configuration

342

launcher

343

.setConf(SparkLauncher.PYSPARK_DRIVER_PYTHON, "/usr/bin/python3")

344

.setConf(SparkLauncher.PYSPARK_PYTHON, "/usr/bin/python3")

345

.addPyFile("hdfs://path/to/dependencies.zip");

346

```

347

348

### Error Handling and Monitoring

349

350

```java

351

public class SparkJobManager {

352

public void launchWithRetry(SparkLauncher launcher, int maxRetries) {

353

int attempts = 0;

354

while (attempts < maxRetries) {

355

try {

356

SparkAppHandle handle = launcher.startApplication();

357

358

handle.addListener(new SparkAppHandle.Listener() {

359

@Override

360

public void stateChanged(SparkAppHandle handle) {

361

SparkAppHandle.State state = handle.getState();

362

System.out.println("State: " + state + " for app: " + handle.getAppId());

363

364

switch (state) {

365

case FAILED:

366

System.err.println("Application failed: " + handle.getAppId());

367

break;

368

case KILLED:

369

System.out.println("Application killed: " + handle.getAppId());

370

break;

371

case FINISHED:

372

System.out.println("Application completed successfully: " + handle.getAppId());

373

break;

374

}

375

}

376

377

@Override

378

public void infoChanged(SparkAppHandle handle) {

379

// Handle info changes if needed

380

}

381

});

382

383

// Wait for completion

384

SparkAppHandle.State finalState = waitForCompletion(handle);

385

if (finalState == SparkAppHandle.State.FINISHED) {

386

return; // Success

387

}

388

389

} catch (IOException e) {

390

System.err.println("Launch attempt " + (attempts + 1) + " failed: " + e.getMessage());

391

}

392

393

attempts++;

394

if (attempts < maxRetries) {

395

try {

396

Thread.sleep(5000); // Wait before retry

397

} catch (InterruptedException ie) {

398

Thread.currentThread().interrupt();

399

return;

400

}

401

}

402

}

403

404

throw new RuntimeException("Failed to launch application after " + maxRetries + " attempts");

405

}

406

407

private SparkAppHandle.State waitForCompletion(SparkAppHandle handle) {

408

SparkAppHandle.State state = handle.getState();

409

while (!state.isFinal()) {

410

try {

411

Thread.sleep(1000);

412

state = handle.getState();

413

} catch (InterruptedException e) {

414

Thread.currentThread().interrupt();

415

return state;

416

}

417

}

418

return state;

419

}

420

}

421

```

422

423

## Builder Pattern Usage

424

425

Alternative fluent API for creating launchers.

426

427

```java

428

// Using builder pattern

429

SparkLauncher launcher = new SparkLauncher.Builder()

430

.setSparkHome("/opt/spark")

431

.setAppName("MyApp")

432

.setMaster("yarn")

433

.setMainClass("com.example.MyApp")

434

.setAppResource("hdfs://namenode:port/path/to/app.jar")

435

.setConf("spark.executor.memory", "4g")

436

.setConf("spark.executor.cores", "2")

437

.addAppArgs("--input", "/data/input", "--output", "/data/output")

438

.setVerbose(true)

439

.build();

440

441

SparkAppHandle handle = launcher.startApplication();

442

```

443

444

This comprehensive documentation covers all the major aspects of programmatically launching and monitoring Spark applications using the Launcher API.