0
# Application Submission
1
2
The application submission layer provides complete integration with Spark's `spark-submit` tool, enabling seamless deployment of Spark applications to Kubernetes clusters using familiar Spark interfaces.
3
4
## Core Components
5
6
### KubernetesClientApplication { .api }
7
8
The main entry point for application submission that integrates with Spark's application framework:
9
10
```scala
11
class KubernetesClientApplication extends SparkApplication {
12
def start(args: Array[String], conf: SparkConf): Unit
13
}
14
```
15
16
**Integration Points**:
17
- Automatically invoked by `spark-submit` when using `--master k8s://` and `--deploy-mode cluster`
18
- Parses command-line arguments and converts them to Kubernetes-specific configuration
19
- Orchestrates the complete application submission workflow
20
21
**Workflow Overview**:
22
1. Parse command-line arguments using `ClientArguments`
23
2. Create `KubernetesDriverConf` from Spark configuration
24
3. Build driver pod specification using `KubernetesDriverBuilder`
25
4. Submit driver pod to Kubernetes using `Client`
26
5. Monitor application status using `LoggingPodStatusWatcher`
27
28
### ClientArguments { .api }
29
30
Encapsulates and validates arguments passed to the submission client:
31
32
```scala
33
case class ClientArguments(
34
mainAppResource: MainAppResource,
35
mainClass: String,
36
driverArgs: Array[String]
37
)
38
```
39
40
**Companion Object**:
41
```scala
42
object ClientArguments {
43
def fromCommandLineArgs(args: Array[String]): ClientArguments
44
}
45
```
46
47
**Supported Resource Types**:
48
```scala
49
// Java/Scala applications
50
JavaMainAppResource(primaryResource: Option[String])
51
52
// Python applications
53
PythonMainAppResource(primaryResource: String)
54
55
// R applications
56
RMainAppResource(primaryResource: String)
57
```
58
59
**Usage Example**:
60
```scala
61
// Parsing spark-submit arguments
62
val args = Array(
63
"--class", "org.apache.spark.examples.SparkPi",
64
"--conf", "spark.kubernetes.container.image=spark:latest",
65
"local:///opt/spark/examples/jars/spark-examples.jar",
66
"1000"
67
)
68
69
val clientArgs = ClientArguments.fromCommandLineArgs(args)
70
// Results in:
71
// - mainClass: "org.apache.spark.examples.SparkPi"
72
// - mainAppResource: JavaMainAppResource(Some("local:///opt/spark/examples/jars/spark-examples.jar"))
73
// - driverArgs: Array("1000")
74
```
75
76
### Client { .api }
77
78
Manages the actual submission process and lifecycle of the Spark application:
79
80
```scala
81
class Client(
82
conf: KubernetesDriverConf,
83
builder: KubernetesDriverBuilder,
84
kubernetesClient: KubernetesClient,
85
watcher: LoggingPodStatusWatcher
86
)
87
```
88
89
**Key Methods**:
90
```scala
91
def run(): Unit
92
```
93
94
**Submission Process**:
95
1. **Driver Spec Building**: Uses `KubernetesDriverBuilder` to create complete pod specification
96
2. **Resource Creation**: Submits driver pod and associated Kubernetes resources
97
3. **Status Monitoring**: Watches pod status and logs application progress
98
4. **Cleanup**: Handles cleanup of resources on completion or failure
99
100
## Application Resource Types
101
102
### MainAppResource Hierarchy { .api }
103
104
Sealed trait representing different types of application resources:
105
106
```scala
107
sealed trait MainAppResource
108
109
// Base for non-JVM languages
110
sealed trait NonJVMResource extends MainAppResource
111
112
// JVM-based applications (Java/Scala)
113
case class JavaMainAppResource(primaryResource: Option[String]) extends MainAppResource
114
115
// Python applications
116
case class PythonMainAppResource(primaryResource: String) extends NonJVMResource
117
118
// R applications
119
case class RMainAppResource(primaryResource: String) extends NonJVMResource
120
```
121
122
**Resource Resolution**:
123
- **Local Resources**: `local://` URLs reference files within the container image
124
- **Remote Resources**: `http://`, `https://`, `s3a://` URLs are downloaded at runtime
125
- **No Resource**: For applications bundled directly in the container image
126
127
## Driver Builder System
128
129
### KubernetesDriverBuilder { .api }
130
131
Constructs complete driver pod specifications using the feature step pattern:
132
133
```scala
134
class KubernetesDriverBuilder {
135
def buildFromFeatures(
136
conf: KubernetesDriverConf,
137
client: KubernetesClient
138
): KubernetesDriverSpec
139
}
140
```
141
142
**Feature Step Integration**:
143
```scala
144
// Automatic feature step selection based on configuration
145
val featureSteps: Seq[KubernetesFeatureConfigStep] = Seq(
146
new BasicDriverFeatureStep(conf),
147
new DriverServiceFeatureStep(conf),
148
new DriverCommandFeatureStep(conf),
149
new DriverKubernetesCredentialsFeatureStep(conf),
150
new MountSecretsFeatureStep(conf),
151
new EnvSecretsFeatureStep(conf),
152
new MountVolumesFeatureStep(conf),
153
new LocalDirsFeatureStep(conf)
154
)
155
156
// Apply all feature steps to build complete specification
157
val finalSpec = featureSteps.foldLeft(initialSpec) { (spec, step) =>
158
step.configurePod(spec.pod) // Apply transformations
159
}
160
```
161
162
### KubernetesDriverSpec { .api }
163
164
Complete specification for driver pod and associated resources:
165
166
```scala
167
case class KubernetesDriverSpec(
168
pod: SparkPod,
169
driverKubernetesResources: Seq[HasMetadata],
170
systemProperties: Map[String, String]
171
)
172
```
173
174
**Components**:
175
- **Pod**: Complete driver pod specification with container, volumes, and metadata
176
- **Resources**: Additional Kubernetes resources (Services, ConfigMaps, Secrets)
177
- **System Properties**: JVM system properties to be passed to the driver
178
179
## Submission Workflow
180
181
### Complete Submission Process
182
183
```scala
184
// 1. Entry Point - spark-submit invokes KubernetesClientApplication
185
class KubernetesClientApplication extends SparkApplication {
186
def start(args: Array[String], conf: SparkConf): Unit = {
187
188
// 2. Parse command-line arguments
189
val clientArgs = ClientArguments.fromCommandLineArgs(args)
190
191
// 3. Create driver configuration
192
val kubernetesConf = KubernetesConf.createDriverConf(
193
conf,
194
appName = conf.get("spark.app.name"),
195
appResourceNamePrefix = s"${appName}-${UUID.randomUUID().toString.take(8)}",
196
appId = SparkApplication.applicationId,
197
mainAppResource = clientArgs.mainAppResource,
198
mainClass = clientArgs.mainClass,
199
appArgs = clientArgs.driverArgs
200
)
201
202
// 4. Create Kubernetes client
203
val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient(
204
ClientType.Submission,
205
kubernetesConf
206
)
207
208
// 5. Build driver specification
209
val builder = new KubernetesDriverBuilder()
210
val driverSpec = builder.buildFromFeatures(kubernetesConf, kubernetesClient)
211
212
// 6. Create status watcher
213
val watcher = new LoggingPodStatusWatcherImpl(kubernetesConf)
214
215
// 7. Submit application
216
val client = new Client(kubernetesConf, builder, kubernetesClient, watcher)
217
client.run()
218
}
219
}
220
```
221
222
### Status Monitoring
223
224
### LoggingPodStatusWatcher { .api }
225
226
Monitors application status and provides real-time feedback:
227
228
```scala
229
trait LoggingPodStatusWatcher {
230
def watchUntilCompletion(): Unit
231
def stop(): Unit
232
}
233
234
class LoggingPodStatusWatcherImpl(
235
conf: KubernetesDriverConf,
236
kubernetesClient: KubernetesClient,
237
clock: Clock
238
) extends LoggingPodStatusWatcher
239
```
240
241
**Monitoring Capabilities**:
242
- Real-time pod status updates
243
- Driver log streaming
244
- Application completion detection
245
- Error condition reporting
246
247
## spark-submit Integration
248
249
### Command Line Usage
250
251
```bash
252
# Basic submission
253
spark-submit \
254
--master k8s://https://kubernetes.example.com:443 \
255
--deploy-mode cluster \
256
--name spark-pi \
257
--class org.apache.spark.examples.SparkPi \
258
--conf spark.kubernetes.container.image=spark:latest \
259
local:///opt/spark/examples/jars/spark-examples.jar
260
261
# Python application
262
spark-submit \
263
--master k8s://https://kubernetes.example.com:443 \
264
--deploy-mode cluster \
265
--name pyspark-job \
266
--conf spark.kubernetes.container.image=spark-py:latest \
267
local:///opt/spark/examples/src/main/python/pi.py
268
269
# With additional configuration
270
spark-submit \
271
--master k8s://https://kubernetes.example.com:443 \
272
--deploy-mode cluster \
273
--name my-spark-app \
274
--class com.example.MyApp \
275
--conf spark.kubernetes.container.image=my-org/spark:v3.0.1 \
276
--conf spark.kubernetes.namespace=production \
277
--conf spark.kubernetes.driver.memory=2g \
278
--conf spark.kubernetes.executor.instances=4 \
279
--conf spark.kubernetes.executor.memory=4g \
280
--conf spark.kubernetes.executor.cores=2 \
281
s3a://my-bucket/jars/my-app.jar \
282
--input s3a://my-bucket/data/ \
283
--output s3a://my-bucket/results/
284
```
285
286
### Configuration Mapping
287
288
spark-submit options are automatically mapped to Kubernetes configuration:
289
290
```scala
291
// spark-submit option -> Kubernetes configuration
292
--name -> spark.app.name
293
--class -> spark.kubernetes.driver.mainClass
294
--driver-memory -> spark.kubernetes.driver.memory
295
--executor-memory -> spark.kubernetes.executor.memory
296
--executor-cores -> spark.kubernetes.executor.cores
297
--num-executors -> spark.kubernetes.executor.instances
298
```
299
300
## Error Handling and Diagnostics
301
302
### Submission Validation
303
304
Comprehensive validation of submission parameters:
305
306
```scala
307
// Configuration validation
308
def validateSubmission(conf: KubernetesDriverConf): Unit = {
309
// Required configurations
310
require(conf.get(CONTAINER_IMAGE).nonEmpty,
311
"Container image must be specified")
312
require(conf.namespace.nonEmpty,
313
"Kubernetes namespace must be specified")
314
315
// Resource validation
316
val driverMemory = conf.get(KUBERNETES_DRIVER_MEMORY)
317
require(driverMemory > 0, "Driver memory must be positive")
318
319
// Image pull policy validation
320
val pullPolicy = conf.imagePullPolicy
321
require(Set("Always", "Never", "IfNotPresent").contains(pullPolicy),
322
s"Invalid image pull policy: $pullPolicy")
323
}
324
```
325
326
### Common Error Scenarios
327
328
```scala
329
// Image pull failures
330
case class ImagePullError(imageName: String, reason: String)
331
332
// Insufficient resources
333
case class InsufficientResourcesError(requested: String, available: String)
334
335
// RBAC permission issues
336
case class PermissionError(operation: String, resource: String)
337
338
// Network connectivity issues
339
case class ApiServerConnectionError(endpoint: String, cause: Throwable)
340
```
341
342
### Diagnostic Information
343
344
```scala
345
// Collect diagnostic information for troubleshooting
346
def collectDiagnostics(conf: KubernetesDriverConf): Map[String, Any] = Map(
347
"kubernetesVersion" -> kubernetesClient.getVersion.getGitVersion,
348
"namespace" -> conf.namespace,
349
"serviceAccount" -> conf.get(KUBERNETES_SERVICE_ACCOUNT_NAME),
350
"containerImage" -> conf.get(CONTAINER_IMAGE),
351
"driverPodName" -> conf.resourceNamePrefix + "-driver",
352
"submissionTime" -> System.currentTimeMillis(),
353
"sparkVersion" -> org.apache.spark.SPARK_VERSION
354
)
355
```
356
357
## Advanced Submission Features
358
359
### Custom Driver Configuration
360
361
```scala
362
// Advanced driver pod customization
363
spark.conf.set("spark.kubernetes.driver.podTemplateFile", "/templates/driver.yaml")
364
spark.conf.set("spark.kubernetes.driver.annotation.prometheus.io/scrape", "true")
365
spark.conf.set("spark.kubernetes.driver.label.version", "v1.2.3")
366
367
// Resource limits and requests
368
spark.conf.set("spark.kubernetes.driver.limit.cores", "2")
369
spark.conf.set("spark.kubernetes.driver.request.cores", "1")
370
spark.conf.set("spark.kubernetes.driver.memory", "2g")
371
spark.conf.set("spark.kubernetes.driver.memoryOverhead", "512m")
372
```
373
374
### Dependency Management
375
376
```scala
377
// Remote dependency resolution
378
spark.conf.set("spark.jars", "s3a://my-bucket/jars/dependency1.jar,s3a://my-bucket/jars/dependency2.jar")
379
spark.conf.set("spark.files", "s3a://my-bucket/conf/app.properties")
380
381
// Python package dependencies
382
spark.conf.set("spark.kubernetes.pyspark.pythonVersion", "3.8")
383
spark.conf.set("spark.submit.pyFiles", "s3a://my-bucket/python/modules.zip")
384
```
385
386
### Security Configuration
387
388
```scala
389
// Service account and RBAC
390
spark.conf.set("spark.kubernetes.authenticate.driver.serviceAccountName", "spark-driver-sa")
391
392
// Secret mounting
393
spark.conf.set("spark.kubernetes.driver.secrets.mysecret", "/opt/spark/secrets")
394
spark.conf.set("spark.kubernetes.executor.secrets.mysecret", "/opt/spark/secrets")
395
396
// Environment variable secrets
397
spark.conf.set("spark.kubernetes.driver.secretKeyRef.DB_PASSWORD", "db-secret:password")
398
```
399
400
### Monitoring and Logging
401
402
```scala
403
// Enable driver log collection
404
spark.conf.set("spark.kubernetes.driver.log.persistentVolumeClaim.claimName", "driver-logs")
405
spark.conf.set("spark.kubernetes.driver.log.persistentVolumeClaim.mountPath", "/opt/spark/logs")
406
407
// Metrics configuration
408
spark.conf.set("spark.sql.streaming.metricsEnabled", "true")
409
spark.conf.set("spark.kubernetes.driver.annotation.prometheus.io/scrape", "true")
410
spark.conf.set("spark.kubernetes.driver.annotation.prometheus.io/port", "4040")
411
```
412
413
## Best Practices
414
415
### Application Packaging
416
417
```scala
418
// Container image best practices
419
// 1. Use multi-stage builds for smaller images
420
// 2. Include all dependencies in the image
421
// 3. Set appropriate USER directive
422
// 4. Use specific version tags
423
424
// Dockerfile example
425
FROM spark:3.0.1 AS base
426
USER spark
427
COPY target/my-app.jar /opt/spark/jars/
428
```
429
430
### Resource Planning
431
432
```scala
433
// Right-sizing resources
434
val conf = new SparkConf()
435
// Driver sizing for application coordination
436
.set("spark.kubernetes.driver.memory", "2g")
437
.set("spark.kubernetes.driver.limit.cores", "1")
438
439
// Executor sizing for parallel processing
440
.set("spark.kubernetes.executor.memory", "4g")
441
.set("spark.kubernetes.executor.cores", "2")
442
.set("spark.kubernetes.executor.instances", "10")
443
444
// Memory overhead for Kubernetes pod management
445
.set("spark.kubernetes.memoryOverheadFactor", "0.1")
446
```
447
448
### Network Configuration
449
450
```scala
451
// Service and networking
452
spark.conf.set("spark.kubernetes.driver.pod.name", "my-app-driver")
453
spark.conf.set("spark.kubernetes.driver.service.type", "ClusterIP")
454
455
// Port configuration for external access
456
spark.conf.set("spark.kubernetes.driver.port", "7077")
457
spark.conf.set("spark.kubernetes.driver.blockManager.port", "7078")
458
```
459
460
The application submission layer provides a seamless bridge between Spark's familiar submission interface and Kubernetes-native deployment, enabling developers to leverage Kubernetes benefits without changing their existing Spark workflows.