0
# Configuration
1
2
The Spark Connect Server provides comprehensive configuration options for server behavior, security, performance tuning, and plugin management. All configuration is handled through Spark's standard configuration system.
3
4
## Core Configuration Object
5
6
### Connect Configuration
7
8
Central configuration object with all Connect-specific settings.
9
10
```scala { .api }
11
object Connect {
12
// Server binding and network
13
val CONNECT_GRPC_BINDING_PORT: ConfigEntry[Int]
14
val CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE: ConfigEntry[Long]
15
val CONNECT_GRPC_MARSHALLER_RECURSION_LIMIT: ConfigEntry[Int]
16
17
// Interceptors and middleware
18
val CONNECT_GRPC_INTERCEPTOR_CLASSES: ConfigEntry[Seq[String]]
19
20
// Execution management
21
val CONNECT_EXECUTE_MANAGER_DETACHED_TIMEOUT: ConfigEntry[String]
22
val CONNECT_EXECUTE_MANAGER_MAINTENANCE_INTERVAL: ConfigEntry[String]
23
val CONNECT_EXECUTE_MANAGER_ABANDONED_TOMBSTONES_SIZE: ConfigEntry[Int]
24
val CONNECT_EXECUTE_REATTACHABLE_ENABLED: ConfigEntry[Boolean]
25
val CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_DURATION: ConfigEntry[String]
26
val CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_SIZE: ConfigEntry[Long]
27
val CONNECT_EXECUTE_REATTACHABLE_OBSERVER_RETRY_BUFFER_SIZE: ConfigEntry[Int]
28
29
// Plugin system
30
val CONNECT_EXTENSIONS_RELATION_CLASSES: ConfigEntry[Seq[String]]
31
val CONNECT_EXTENSIONS_EXPRESSION_CLASSES: ConfigEntry[Seq[String]]
32
val CONNECT_EXTENSIONS_COMMAND_CLASSES: ConfigEntry[Seq[String]]
33
34
// Error handling and debugging
35
val CONNECT_JVM_STACK_TRACE_MAX_SIZE: ConfigEntry[Int]
36
37
// UI and monitoring
38
val CONNECT_UI_STATEMENT_LIMIT: ConfigEntry[Int]
39
val CONNECT_UI_SESSION_LIMIT: ConfigEntry[Int]
40
41
// Arrow and data transfer
42
val CONNECT_GRPC_ARROW_MAX_BATCH_SIZE: ConfigEntry[Int]
43
44
// File system operations
45
val CONNECT_COPY_FROM_LOCAL_TO_FS_ALLOW_DEST_LOCAL: ConfigEntry[Boolean]
46
}
47
```
48
49
## Server Configuration
50
51
### Network and Binding Settings
52
53
```scala { .api }
54
// Server port (default: 15002)
55
spark.connect.grpc.binding.port=15002
56
57
// Maximum inbound message size (default: 134217728 = 128MB)
58
spark.connect.grpc.max.inbound.message.size=134217728
59
60
// gRPC marshaller recursion limit (default: 100)
61
spark.connect.grpc.marshaller.recursion.limit=100
62
```
63
64
### Connection and Protocol Settings
65
66
```scala { .api }
67
// Enable TLS encryption
68
spark.connect.grpc.tls.enabled=true
69
spark.connect.grpc.tls.keystore.path=/path/to/keystore.jks
70
spark.connect.grpc.tls.keystore.password=password
71
spark.connect.grpc.tls.truststore.path=/path/to/truststore.jks
72
spark.connect.grpc.tls.truststore.password=password
73
74
// Client authentication
75
spark.connect.grpc.client.auth.enabled=true
76
spark.connect.grpc.client.cert.required=true
77
```
78
79
## Execution Configuration
80
81
### Execution Management
82
83
```scala { .api }
84
// Enable reattachable executions (default: true)
85
spark.connect.execute.reattachable.enabled=true
86
87
// Detached execution timeout (default: "2min")
88
spark.connect.execute.manager.detached.timeout=2min
89
90
// Maximum concurrent executions per session
91
spark.connect.execute.manager.max.concurrent=10
92
93
// Execution result caching
94
spark.connect.execute.result.cache.enabled=true
95
spark.connect.execute.result.cache.max.size=1000
96
```
97
98
### Session Management
99
100
```scala { .api }
101
// Session timeout for idle connections
102
spark.connect.session.timeout=30min
103
104
// Maximum sessions per server
105
spark.connect.session.max.total=1000
106
107
// Session cleanup interval
108
spark.connect.session.cleanup.interval=5min
109
110
// Enable session isolation
111
spark.connect.session.isolation.enabled=true
112
```
113
114
## Plugin Configuration
115
116
### Extension Classes
117
118
```scala { .api }
119
// Custom relation plugins
120
spark.connect.extensions.relation.classes=com.mycompany.MyRelationPlugin,com.mycompany.AnotherRelationPlugin
121
122
// Custom expression plugins
123
spark.connect.extensions.expression.classes=com.mycompany.MyExpressionPlugin
124
125
// Custom command plugins
126
spark.connect.extensions.command.classes=com.mycompany.MyCommandPlugin,com.mycompany.AdminCommandPlugin
127
```
128
129
### Plugin Settings
130
131
```scala { .api }
132
// Plugin loading timeout
133
spark.connect.extensions.loading.timeout=30s
134
135
// Enable plugin validation
136
spark.connect.extensions.validation.enabled=true
137
138
// Plugin class loader isolation
139
spark.connect.extensions.classloader.isolation=true
140
```
141
142
## Interceptor Configuration
143
144
### gRPC Interceptors
145
146
```scala { .api }
147
// Custom interceptor classes
148
spark.connect.grpc.interceptor.classes=com.mycompany.AuthInterceptor,com.mycompany.LoggingInterceptor
149
150
// Built-in interceptor settings
151
spark.connect.grpc.interceptor.logging.enabled=true
152
spark.connect.grpc.interceptor.logging.level=INFO
153
spark.connect.grpc.interceptor.logging.requests=true
154
spark.connect.grpc.interceptor.logging.responses=false
155
156
// Local properties cleanup
157
spark.connect.grpc.interceptor.properties.cleanup.enabled=true
158
```
159
160
## Data Transfer Configuration
161
162
### Arrow and Serialization
163
164
```scala { .api }
165
// Arrow batch size for large results (default: 10000)
166
spark.connect.grpc.arrow.max.batch.size=10000
167
168
// Enable Arrow optimization
169
spark.connect.grpc.arrow.enabled=true
170
171
// Compression for large responses
172
spark.connect.grpc.compression.enabled=true
173
spark.connect.grpc.compression.algorithm=gzip
174
```
175
176
### Streaming Configuration
177
178
```scala { .api }
179
// Streaming query cache size
180
spark.connect.streaming.query.cache.size=100
181
182
// Streaming result buffer size
183
spark.connect.streaming.result.buffer.size=1000
184
185
// Enable streaming query monitoring
186
spark.connect.streaming.monitoring.enabled=true
187
```
188
189
## Security Configuration
190
191
### Authentication and Authorization
192
193
```scala { .api }
194
// Enable authentication
195
spark.connect.security.auth.enabled=true
196
spark.connect.security.auth.provider=com.mycompany.AuthProvider
197
198
// User identity mapping
199
spark.connect.security.user.mapping.enabled=true
200
spark.connect.security.user.mapping.class=com.mycompany.UserMapper
201
202
// Authorization provider
203
spark.connect.security.authorization.enabled=true
204
spark.connect.security.authorization.provider=com.mycompany.AuthzProvider
205
```
206
207
### Artifact Security
208
209
```scala { .api }
210
// Artifact validation
211
spark.connect.artifacts.validation.enabled=true
212
spark.connect.artifacts.allowed.types=jar,py,zip
213
214
// Maximum artifact sizes
215
spark.connect.artifacts.max.file.size=100MB
216
spark.connect.artifacts.max.session.size=1GB
217
218
// Artifact scanning
219
spark.connect.artifacts.scanning.enabled=true
220
spark.connect.artifacts.quarantine.enabled=true
221
```
222
223
## Monitoring and UI Configuration
224
225
### Web UI Settings
226
227
```scala { .api }
228
// Enable Connect UI tab
229
spark.connect.ui.enabled=true
230
231
// UI data retention limits
232
spark.connect.ui.statement.limit=200
233
spark.connect.ui.session.limit=100
234
spark.connect.ui.execution.limit=1000
235
236
// UI refresh intervals
237
spark.connect.ui.refresh.interval=5s
238
spark.connect.ui.auto.refresh.enabled=true
239
```
240
241
### Metrics and Logging
242
243
```scala { .api }
244
// Enable detailed metrics collection
245
spark.connect.metrics.enabled=true
246
spark.connect.metrics.collection.interval=10s
247
248
// JVM stack trace limits for errors
249
spark.connect.jvm.stack.trace.max.size=2048
250
251
// Event listener configuration
252
spark.connect.listener.enabled=true
253
spark.connect.listener.async=true
254
spark.connect.listener.queue.size=10000
255
```
256
257
## Performance Configuration
258
259
### Resource Limits
260
261
```scala { .api }
262
// Memory limits per session
263
spark.connect.session.memory.limit=2GB
264
265
// Thread pool configuration
266
spark.connect.executor.thread.pool.size=50
267
spark.connect.executor.thread.pool.queue.size=1000
268
269
// Request timeout settings
270
spark.connect.request.timeout=300s
271
spark.connect.response.timeout=600s
272
```
273
274
### Caching and Optimization
275
276
```scala { .api }
277
// Plan caching
278
spark.connect.plan.cache.enabled=true
279
spark.connect.plan.cache.size=1000
280
spark.connect.plan.cache.ttl=1h
281
282
// Result caching
283
spark.connect.result.cache.enabled=true
284
spark.connect.result.cache.max.size=100MB
285
spark.connect.result.cache.ttl=30min
286
```
287
288
## Usage Examples
289
290
### Basic Server Configuration
291
292
```scala
293
import org.apache.spark.sql.SparkSession
294
295
val spark = SparkSession.builder()
296
.appName("Connect Server")
297
.config("spark.connect.grpc.binding.port", "15002")
298
.config("spark.connect.grpc.max.inbound.message.size", "268435456") // 256MB
299
.config("spark.connect.execute.reattachable.enabled", "true")
300
.config("spark.connect.session.timeout", "60min")
301
.getOrCreate()
302
```
303
304
### Security-Enabled Configuration
305
306
```scala
307
val spark = SparkSession.builder()
308
.appName("Secure Connect Server")
309
// TLS configuration
310
.config("spark.connect.grpc.tls.enabled", "true")
311
.config("spark.connect.grpc.tls.keystore.path", "/etc/spark/keystore.jks")
312
.config("spark.connect.grpc.tls.keystore.password", "keystore-password")
313
314
// Authentication
315
.config("spark.connect.security.auth.enabled", "true")
316
.config("spark.connect.security.auth.provider", "com.company.KerberosAuthProvider")
317
318
// Artifact security
319
.config("spark.connect.artifacts.validation.enabled", "true")
320
.config("spark.connect.artifacts.max.file.size", "50MB")
321
.getOrCreate()
322
```
323
324
### High-Performance Configuration
325
326
```scala
327
val spark = SparkSession.builder()
328
.appName("High Performance Connect Server")
329
// Increase message size and batch limits
330
.config("spark.connect.grpc.max.inbound.message.size", "536870912") // 512MB
331
.config("spark.connect.grpc.arrow.max.batch.size", "50000")
332
333
// Optimize execution
334
.config("spark.connect.execute.manager.max.concurrent", "20")
335
.config("spark.connect.executor.thread.pool.size", "100")
336
337
// Enable caching
338
.config("spark.connect.plan.cache.enabled", "true")
339
.config("spark.connect.result.cache.enabled", "true")
340
.config("spark.connect.result.cache.max.size", "500MB")
341
.getOrCreate()
342
```
343
344
### Plugin-Enabled Configuration
345
346
```scala
347
val spark = SparkSession.builder()
348
.appName("Connect Server with Plugins")
349
// Enable Spark Connect plugin
350
.config("spark.sql.extensions", "org.apache.spark.sql.connect.SparkConnectPlugin")
351
352
// Configure custom plugins
353
.config("spark.connect.extensions.relation.classes",
354
"com.company.CustomDataSourcePlugin,com.company.CachePlugin")
355
.config("spark.connect.extensions.expression.classes",
356
"com.company.CustomFunctionPlugin")
357
.config("spark.connect.extensions.command.classes",
358
"com.company.AdminCommandPlugin")
359
360
// Plugin settings
361
.config("spark.connect.extensions.validation.enabled", "true")
362
.config("spark.connect.extensions.loading.timeout", "60s")
363
.getOrCreate()
364
```
365
366
## Environment Variables
367
368
### Server Environment
369
370
```bash
371
# Server binding
372
export SPARK_CONNECT_GRPC_PORT=15002
373
export SPARK_CONNECT_GRPC_HOST=0.0.0.0
374
375
# JVM settings
376
export SPARK_CONNECT_JAVA_OPTS="-Xmx4g -XX:+UseG1GC"
377
378
# Security
379
export SPARK_CONNECT_TLS_KEYSTORE_PATH=/etc/spark/keystore.jks
380
export SPARK_CONNECT_TLS_KEYSTORE_PASSWORD_FILE=/etc/spark/keystore.password
381
```
382
383
### Development Environment
384
385
```bash
386
# Enable debug logging
387
export SPARK_CONNECT_LOG_LEVEL=DEBUG
388
389
# Development mode settings
390
export SPARK_CONNECT_DEV_MODE=true
391
export SPARK_CONNECT_PLUGIN_RELOAD=true
392
```
393
394
## Configuration Validation
395
396
### Validation Rules
397
398
The server validates configuration at startup:
399
400
```scala
401
// Configuration validation example
402
object ConfigValidator {
403
def validateConfig(conf: SparkConf): Unit = {
404
// Port validation
405
val port = conf.get(CONNECT_GRPC_BINDING_PORT)
406
require(port > 0 && port < 65536, s"Invalid port: $port")
407
408
// Memory limits
409
val messageSize = conf.get(CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE)
410
require(messageSize > 0, s"Message size must be positive: $messageSize")
411
412
// Plugin class validation
413
val relationPlugins = conf.get(CONNECT_EXTENSIONS_RELATION_CLASSES)
414
relationPlugins.foreach { className =>
415
require(isValidClassName(className), s"Invalid plugin class: $className")
416
}
417
}
418
}
419
```
420
421
### Common Configuration Issues
422
423
- **Port Conflicts**: Ensure the configured port is available
424
- **Memory Limits**: Set appropriate heap size for message limits
425
- **Plugin Loading**: Verify plugin classes are in the classpath
426
- **Security Settings**: Ensure certificates and keys are accessible
427
- **Network Configuration**: Check firewall and network policies
428
429
## Migration and Compatibility
430
431
### Version Compatibility
432
433
Configuration compatibility across Spark versions:
434
435
- **Spark 3.4+**: Full Connect server support
436
- **Spark 3.5+**: Enhanced plugin system and UI features
437
- **Future Versions**: Forward compatibility for core settings
438
439
### Configuration Migration
440
441
```scala
442
// Migration helper for configuration updates
443
object ConfigMigration {
444
def migrateConfig(oldConf: SparkConf): SparkConf = {
445
val newConf = oldConf.clone()
446
447
// Migrate deprecated settings
448
if (oldConf.contains("spark.connect.server.port")) {
449
val port = oldConf.get("spark.connect.server.port")
450
newConf.set("spark.connect.grpc.binding.port", port)
451
newConf.remove("spark.connect.server.port")
452
}
453
454
newConf
455
}
456
}
457
```