or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-submission.mdcluster-management.mdconfiguration.mdfeature-steps.mdindex.mdpod-management.mdutilities.md

application-submission.mddocs/

0

# Application Submission

1

2

The application submission layer provides complete integration with Spark's `spark-submit` tool, enabling seamless deployment of Spark applications to Kubernetes clusters using familiar Spark interfaces.

3

4

## Core Components

5

6

### KubernetesClientApplication { .api }

7

8

The main entry point for application submission that integrates with Spark's application framework:

9

10

```scala

11

class KubernetesClientApplication extends SparkApplication {

12

def start(args: Array[String], conf: SparkConf): Unit

13

}

14

```

15

16

**Integration Points**:

17

- Automatically invoked by `spark-submit` when using `--master k8s://` and `--deploy-mode cluster`

18

- Parses command-line arguments and converts them to Kubernetes-specific configuration

19

- Orchestrates the complete application submission workflow

20

21

**Workflow Overview**:

22

1. Parse command-line arguments using `ClientArguments`

23

2. Create `KubernetesDriverConf` from Spark configuration

24

3. Build driver pod specification using `KubernetesDriverBuilder`

25

4. Submit driver pod to Kubernetes using `Client`

26

5. Monitor application status using `LoggingPodStatusWatcher`

27

28

### ClientArguments { .api }

29

30

Encapsulates and validates arguments passed to the submission client:

31

32

```scala

33

case class ClientArguments(

34

mainAppResource: MainAppResource,

35

mainClass: String,

36

driverArgs: Array[String]

37

)

38

```

39

40

**Companion Object**:

41

```scala

42

object ClientArguments {

43

def fromCommandLineArgs(args: Array[String]): ClientArguments

44

}

45

```

46

47

**Supported Resource Types**:

48

```scala

49

// Java/Scala applications

50

JavaMainAppResource(primaryResource: Option[String])

51

52

// Python applications

53

PythonMainAppResource(primaryResource: String)

54

55

// R applications

56

RMainAppResource(primaryResource: String)

57

```

58

59

**Usage Example**:

60

```scala

61

// Parsing spark-submit arguments

62

val args = Array(

63

"--class", "org.apache.spark.examples.SparkPi",

64

"--conf", "spark.kubernetes.container.image=spark:latest",

65

"local:///opt/spark/examples/jars/spark-examples.jar",

66

"1000"

67

)

68

69

val clientArgs = ClientArguments.fromCommandLineArgs(args)

70

// Results in:

71

// - mainClass: "org.apache.spark.examples.SparkPi"

72

// - mainAppResource: JavaMainAppResource(Some("local:///opt/spark/examples/jars/spark-examples.jar"))

73

// - driverArgs: Array("1000")

74

```

75

76

### Client { .api }

77

78

Manages the actual submission process and lifecycle of the Spark application:

79

80

```scala

81

class Client(

82

conf: KubernetesDriverConf,

83

builder: KubernetesDriverBuilder,

84

kubernetesClient: KubernetesClient,

85

watcher: LoggingPodStatusWatcher

86

)

87

```

88

89

**Key Methods**:

90

```scala

91

def run(): Unit

92

```

93

94

**Submission Process**:

95

1. **Driver Spec Building**: Uses `KubernetesDriverBuilder` to create complete pod specification

96

2. **Resource Creation**: Submits driver pod and associated Kubernetes resources

97

3. **Status Monitoring**: Watches pod status and logs application progress

98

4. **Cleanup**: Handles cleanup of resources on completion or failure

99

100

## Application Resource Types

101

102

### MainAppResource Hierarchy { .api }

103

104

Sealed trait representing different types of application resources:

105

106

```scala

107

sealed trait MainAppResource

108

109

// Base for non-JVM languages

110

sealed trait NonJVMResource extends MainAppResource

111

112

// JVM-based applications (Java/Scala)

113

case class JavaMainAppResource(primaryResource: Option[String]) extends MainAppResource

114

115

// Python applications

116

case class PythonMainAppResource(primaryResource: String) extends NonJVMResource

117

118

// R applications

119

case class RMainAppResource(primaryResource: String) extends NonJVMResource

120

```

121

122

**Resource Resolution**:

123

- **Local Resources**: `local://` URLs reference files within the container image

124

- **Remote Resources**: `http://`, `https://`, `s3a://` URLs are downloaded at runtime

125

- **No Resource**: For applications bundled directly in the container image

126

127

## Driver Builder System

128

129

### KubernetesDriverBuilder { .api }

130

131

Constructs complete driver pod specifications using the feature step pattern:

132

133

```scala

134

class KubernetesDriverBuilder {

135

def buildFromFeatures(

136

conf: KubernetesDriverConf,

137

client: KubernetesClient

138

): KubernetesDriverSpec

139

}

140

```

141

142

**Feature Step Integration**:

143

```scala

144

// Automatic feature step selection based on configuration

145

val featureSteps: Seq[KubernetesFeatureConfigStep] = Seq(

146

new BasicDriverFeatureStep(conf),

147

new DriverServiceFeatureStep(conf),

148

new DriverCommandFeatureStep(conf),

149

new DriverKubernetesCredentialsFeatureStep(conf),

150

new MountSecretsFeatureStep(conf),

151

new EnvSecretsFeatureStep(conf),

152

new MountVolumesFeatureStep(conf),

153

new LocalDirsFeatureStep(conf)

154

)

155

156

// Apply all feature steps to build complete specification

157

val finalSpec = featureSteps.foldLeft(initialSpec) { (spec, step) =>

158

step.configurePod(spec.pod) // Apply transformations

159

}

160

```

161

162

### KubernetesDriverSpec { .api }

163

164

Complete specification for driver pod and associated resources:

165

166

```scala

167

case class KubernetesDriverSpec(

168

pod: SparkPod,

169

driverKubernetesResources: Seq[HasMetadata],

170

systemProperties: Map[String, String]

171

)

172

```

173

174

**Components**:

175

- **Pod**: Complete driver pod specification with container, volumes, and metadata

176

- **Resources**: Additional Kubernetes resources (Services, ConfigMaps, Secrets)

177

- **System Properties**: JVM system properties to be passed to the driver

178

179

## Submission Workflow

180

181

### Complete Submission Process

182

183

```scala

184

// 1. Entry Point - spark-submit invokes KubernetesClientApplication

185

class KubernetesClientApplication extends SparkApplication {

186

def start(args: Array[String], conf: SparkConf): Unit = {

187

188

// 2. Parse command-line arguments

189

val clientArgs = ClientArguments.fromCommandLineArgs(args)

190

191

// 3. Create driver configuration

192

val kubernetesConf = KubernetesConf.createDriverConf(

193

conf,

194

appName = conf.get("spark.app.name"),

195

appResourceNamePrefix = s"${appName}-${UUID.randomUUID().toString.take(8)}",

196

appId = SparkApplication.applicationId,

197

mainAppResource = clientArgs.mainAppResource,

198

mainClass = clientArgs.mainClass,

199

appArgs = clientArgs.driverArgs

200

)

201

202

// 4. Create Kubernetes client

203

val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient(

204

ClientType.Submission,

205

kubernetesConf

206

)

207

208

// 5. Build driver specification

209

val builder = new KubernetesDriverBuilder()

210

val driverSpec = builder.buildFromFeatures(kubernetesConf, kubernetesClient)

211

212

// 6. Create status watcher

213

val watcher = new LoggingPodStatusWatcherImpl(kubernetesConf)

214

215

// 7. Submit application

216

val client = new Client(kubernetesConf, builder, kubernetesClient, watcher)

217

client.run()

218

}

219

}

220

```

221

222

### Status Monitoring

223

224

### LoggingPodStatusWatcher { .api }

225

226

Monitors application status and provides real-time feedback:

227

228

```scala

229

trait LoggingPodStatusWatcher {

230

def watchUntilCompletion(): Unit

231

def stop(): Unit

232

}

233

234

class LoggingPodStatusWatcherImpl(

235

conf: KubernetesDriverConf,

236

kubernetesClient: KubernetesClient,

237

clock: Clock

238

) extends LoggingPodStatusWatcher

239

```

240

241

**Monitoring Capabilities**:

242

- Real-time pod status updates

243

- Driver log streaming

244

- Application completion detection

245

- Error condition reporting

246

247

## spark-submit Integration

248

249

### Command Line Usage

250

251

```bash

252

# Basic submission

253

spark-submit \

254

--master k8s://https://kubernetes.example.com:443 \

255

--deploy-mode cluster \

256

--name spark-pi \

257

--class org.apache.spark.examples.SparkPi \

258

--conf spark.kubernetes.container.image=spark:latest \

259

local:///opt/spark/examples/jars/spark-examples.jar

260

261

# Python application

262

spark-submit \

263

--master k8s://https://kubernetes.example.com:443 \

264

--deploy-mode cluster \

265

--name pyspark-job \

266

--conf spark.kubernetes.container.image=spark-py:latest \

267

local:///opt/spark/examples/src/main/python/pi.py

268

269

# With additional configuration

270

spark-submit \

271

--master k8s://https://kubernetes.example.com:443 \

272

--deploy-mode cluster \

273

--name my-spark-app \

274

--class com.example.MyApp \

275

--conf spark.kubernetes.container.image=my-org/spark:v3.0.1 \

276

--conf spark.kubernetes.namespace=production \

277

--conf spark.kubernetes.driver.memory=2g \

278

--conf spark.kubernetes.executor.instances=4 \

279

--conf spark.kubernetes.executor.memory=4g \

280

--conf spark.kubernetes.executor.cores=2 \

281

s3a://my-bucket/jars/my-app.jar \

282

--input s3a://my-bucket/data/ \

283

--output s3a://my-bucket/results/

284

```

285

286

### Configuration Mapping

287

288

spark-submit options are automatically mapped to Kubernetes configuration:

289

290

```scala

291

// spark-submit option -> Kubernetes configuration

292

--name -> spark.app.name

293

--class -> spark.kubernetes.driver.mainClass

294

--driver-memory -> spark.kubernetes.driver.memory

295

--executor-memory -> spark.kubernetes.executor.memory

296

--executor-cores -> spark.kubernetes.executor.cores

297

--num-executors -> spark.kubernetes.executor.instances

298

```

299

300

## Error Handling and Diagnostics

301

302

### Submission Validation

303

304

Comprehensive validation of submission parameters:

305

306

```scala

307

// Configuration validation

308

def validateSubmission(conf: KubernetesDriverConf): Unit = {

309

// Required configurations

310

require(conf.get(CONTAINER_IMAGE).nonEmpty,

311

"Container image must be specified")

312

require(conf.namespace.nonEmpty,

313

"Kubernetes namespace must be specified")

314

315

// Resource validation

316

val driverMemory = conf.get(KUBERNETES_DRIVER_MEMORY)

317

require(driverMemory > 0, "Driver memory must be positive")

318

319

// Image pull policy validation

320

val pullPolicy = conf.imagePullPolicy

321

require(Set("Always", "Never", "IfNotPresent").contains(pullPolicy),

322

s"Invalid image pull policy: $pullPolicy")

323

}

324

```

325

326

### Common Error Scenarios

327

328

```scala

329

// Image pull failures

330

case class ImagePullError(imageName: String, reason: String)

331

332

// Insufficient resources

333

case class InsufficientResourcesError(requested: String, available: String)

334

335

// RBAC permission issues

336

case class PermissionError(operation: String, resource: String)

337

338

// Network connectivity issues

339

case class ApiServerConnectionError(endpoint: String, cause: Throwable)

340

```

341

342

### Diagnostic Information

343

344

```scala

345

// Collect diagnostic information for troubleshooting

346

def collectDiagnostics(conf: KubernetesDriverConf): Map[String, Any] = Map(

347

"kubernetesVersion" -> kubernetesClient.getVersion.getGitVersion,

348

"namespace" -> conf.namespace,

349

"serviceAccount" -> conf.get(KUBERNETES_SERVICE_ACCOUNT_NAME),

350

"containerImage" -> conf.get(CONTAINER_IMAGE),

351

"driverPodName" -> conf.resourceNamePrefix + "-driver",

352

"submissionTime" -> System.currentTimeMillis(),

353

"sparkVersion" -> org.apache.spark.SPARK_VERSION

354

)

355

```

356

357

## Advanced Submission Features

358

359

### Custom Driver Configuration

360

361

```scala

362

// Advanced driver pod customization

363

spark.conf.set("spark.kubernetes.driver.podTemplateFile", "/templates/driver.yaml")

364

spark.conf.set("spark.kubernetes.driver.annotation.prometheus.io/scrape", "true")

365

spark.conf.set("spark.kubernetes.driver.label.version", "v1.2.3")

366

367

// Resource limits and requests

368

spark.conf.set("spark.kubernetes.driver.limit.cores", "2")

369

spark.conf.set("spark.kubernetes.driver.request.cores", "1")

370

spark.conf.set("spark.kubernetes.driver.memory", "2g")

371

spark.conf.set("spark.kubernetes.driver.memoryOverhead", "512m")

372

```

373

374

### Dependency Management

375

376

```scala

377

// Remote dependency resolution

378

spark.conf.set("spark.jars", "s3a://my-bucket/jars/dependency1.jar,s3a://my-bucket/jars/dependency2.jar")

379

spark.conf.set("spark.files", "s3a://my-bucket/conf/app.properties")

380

381

// Python package dependencies

382

spark.conf.set("spark.kubernetes.pyspark.pythonVersion", "3.8")

383

spark.conf.set("spark.submit.pyFiles", "s3a://my-bucket/python/modules.zip")

384

```

385

386

### Security Configuration

387

388

```scala

389

// Service account and RBAC

390

spark.conf.set("spark.kubernetes.authenticate.driver.serviceAccountName", "spark-driver-sa")

391

392

// Secret mounting

393

spark.conf.set("spark.kubernetes.driver.secrets.mysecret", "/opt/spark/secrets")

394

spark.conf.set("spark.kubernetes.executor.secrets.mysecret", "/opt/spark/secrets")

395

396

// Environment variable secrets

397

spark.conf.set("spark.kubernetes.driver.secretKeyRef.DB_PASSWORD", "db-secret:password")

398

```

399

400

### Monitoring and Logging

401

402

```scala

403

// Enable driver log collection

404

spark.conf.set("spark.kubernetes.driver.log.persistentVolumeClaim.claimName", "driver-logs")

405

spark.conf.set("spark.kubernetes.driver.log.persistentVolumeClaim.mountPath", "/opt/spark/logs")

406

407

// Metrics configuration

408

spark.conf.set("spark.sql.streaming.metricsEnabled", "true")

409

spark.conf.set("spark.kubernetes.driver.annotation.prometheus.io/scrape", "true")

410

spark.conf.set("spark.kubernetes.driver.annotation.prometheus.io/port", "4040")

411

```

412

413

## Best Practices

414

415

### Application Packaging

416

417

```scala

418

// Container image best practices

419

// 1. Use multi-stage builds for smaller images

420

// 2. Include all dependencies in the image

421

// 3. Set appropriate USER directive

422

// 4. Use specific version tags

423

424

// Dockerfile example

425

FROM spark:3.0.1 AS base

426

USER spark

427

COPY target/my-app.jar /opt/spark/jars/

428

```

429

430

### Resource Planning

431

432

```scala

433

// Right-sizing resources

434

val conf = new SparkConf()

435

// Driver sizing for application coordination

436

.set("spark.kubernetes.driver.memory", "2g")

437

.set("spark.kubernetes.driver.limit.cores", "1")

438

439

// Executor sizing for parallel processing

440

.set("spark.kubernetes.executor.memory", "4g")

441

.set("spark.kubernetes.executor.cores", "2")

442

.set("spark.kubernetes.executor.instances", "10")

443

444

// Memory overhead for Kubernetes pod management

445

.set("spark.kubernetes.memoryOverheadFactor", "0.1")

446

```

447

448

### Network Configuration

449

450

```scala

451

// Service and networking

452

spark.conf.set("spark.kubernetes.driver.pod.name", "my-app-driver")

453

spark.conf.set("spark.kubernetes.driver.service.type", "ClusterIP")

454

455

// Port configuration for external access

456

spark.conf.set("spark.kubernetes.driver.port", "7077")

457

spark.conf.set("spark.kubernetes.driver.blockManager.port", "7078")

458

```

459

460

The application submission layer provides a seamless bridge between Spark's familiar submission interface and Kubernetes-native deployment, enabling developers to leverage Kubernetes benefits without changing their existing Spark workflows.