The Spark Connect Server provides comprehensive configuration options for server behavior, security, performance tuning, and plugin management. All configuration is handled through Spark's standard configuration system.
Central configuration object with all Connect-specific settings.
object Connect {
// Server binding and network
val CONNECT_GRPC_BINDING_PORT: ConfigEntry[Int]
val CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE: ConfigEntry[Long]
val CONNECT_GRPC_MARSHALLER_RECURSION_LIMIT: ConfigEntry[Int]
// Interceptors and middleware
val CONNECT_GRPC_INTERCEPTOR_CLASSES: ConfigEntry[Seq[String]]
// Execution management
val CONNECT_EXECUTE_MANAGER_DETACHED_TIMEOUT: ConfigEntry[String]
val CONNECT_EXECUTE_MANAGER_MAINTENANCE_INTERVAL: ConfigEntry[String]
val CONNECT_EXECUTE_MANAGER_ABANDONED_TOMBSTONES_SIZE: ConfigEntry[Int]
val CONNECT_EXECUTE_REATTACHABLE_ENABLED: ConfigEntry[Boolean]
val CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_DURATION: ConfigEntry[String]
val CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_SIZE: ConfigEntry[Long]
val CONNECT_EXECUTE_REATTACHABLE_OBSERVER_RETRY_BUFFER_SIZE: ConfigEntry[Int]
// Plugin system
val CONNECT_EXTENSIONS_RELATION_CLASSES: ConfigEntry[Seq[String]]
val CONNECT_EXTENSIONS_EXPRESSION_CLASSES: ConfigEntry[Seq[String]]
val CONNECT_EXTENSIONS_COMMAND_CLASSES: ConfigEntry[Seq[String]]
// Error handling and debugging
val CONNECT_JVM_STACK_TRACE_MAX_SIZE: ConfigEntry[Int]
// UI and monitoring
val CONNECT_UI_STATEMENT_LIMIT: ConfigEntry[Int]
val CONNECT_UI_SESSION_LIMIT: ConfigEntry[Int]
// Arrow and data transfer
val CONNECT_GRPC_ARROW_MAX_BATCH_SIZE: ConfigEntry[Int]
// File system operations
val CONNECT_COPY_FROM_LOCAL_TO_FS_ALLOW_DEST_LOCAL: ConfigEntry[Boolean]
}// Server port (default: 15002)
spark.connect.grpc.binding.port=15002
// Maximum inbound message size (default: 134217728 = 128MB)
spark.connect.grpc.max.inbound.message.size=134217728
// gRPC marshaller recursion limit (default: 100)
spark.connect.grpc.marshaller.recursion.limit=100// Enable TLS encryption
spark.connect.grpc.tls.enabled=true
spark.connect.grpc.tls.keystore.path=/path/to/keystore.jks
spark.connect.grpc.tls.keystore.password=password
spark.connect.grpc.tls.truststore.path=/path/to/truststore.jks
spark.connect.grpc.tls.truststore.password=password
// Client authentication
spark.connect.grpc.client.auth.enabled=true
spark.connect.grpc.client.cert.required=true// Enable reattachable executions (default: true)
spark.connect.execute.reattachable.enabled=true
// Detached execution timeout (default: "2min")
spark.connect.execute.manager.detached.timeout=2min
// Maximum concurrent executions per session
spark.connect.execute.manager.max.concurrent=10
// Execution result caching
spark.connect.execute.result.cache.enabled=true
spark.connect.execute.result.cache.max.size=1000// Session timeout for idle connections
spark.connect.session.timeout=30min
// Maximum sessions per server
spark.connect.session.max.total=1000
// Session cleanup interval
spark.connect.session.cleanup.interval=5min
// Enable session isolation
spark.connect.session.isolation.enabled=true// Custom relation plugins
spark.connect.extensions.relation.classes=com.mycompany.MyRelationPlugin,com.mycompany.AnotherRelationPlugin
// Custom expression plugins
spark.connect.extensions.expression.classes=com.mycompany.MyExpressionPlugin
// Custom command plugins
spark.connect.extensions.command.classes=com.mycompany.MyCommandPlugin,com.mycompany.AdminCommandPlugin// Plugin loading timeout
spark.connect.extensions.loading.timeout=30s
// Enable plugin validation
spark.connect.extensions.validation.enabled=true
// Plugin class loader isolation
spark.connect.extensions.classloader.isolation=true// Custom interceptor classes
spark.connect.grpc.interceptor.classes=com.mycompany.AuthInterceptor,com.mycompany.LoggingInterceptor
// Built-in interceptor settings
spark.connect.grpc.interceptor.logging.enabled=true
spark.connect.grpc.interceptor.logging.level=INFO
spark.connect.grpc.interceptor.logging.requests=true
spark.connect.grpc.interceptor.logging.responses=false
// Local properties cleanup
spark.connect.grpc.interceptor.properties.cleanup.enabled=true// Arrow batch size for large results (default: 10000)
spark.connect.grpc.arrow.max.batch.size=10000
// Enable Arrow optimization
spark.connect.grpc.arrow.enabled=true
// Compression for large responses
spark.connect.grpc.compression.enabled=true
spark.connect.grpc.compression.algorithm=gzip// Streaming query cache size
spark.connect.streaming.query.cache.size=100
// Streaming result buffer size
spark.connect.streaming.result.buffer.size=1000
// Enable streaming query monitoring
spark.connect.streaming.monitoring.enabled=true// Enable authentication
spark.connect.security.auth.enabled=true
spark.connect.security.auth.provider=com.mycompany.AuthProvider
// User identity mapping
spark.connect.security.user.mapping.enabled=true
spark.connect.security.user.mapping.class=com.mycompany.UserMapper
// Authorization provider
spark.connect.security.authorization.enabled=true
spark.connect.security.authorization.provider=com.mycompany.AuthzProvider// Artifact validation
spark.connect.artifacts.validation.enabled=true
spark.connect.artifacts.allowed.types=jar,py,zip
// Maximum artifact sizes
spark.connect.artifacts.max.file.size=100MB
spark.connect.artifacts.max.session.size=1GB
// Artifact scanning
spark.connect.artifacts.scanning.enabled=true
spark.connect.artifacts.quarantine.enabled=true// Enable Connect UI tab
spark.connect.ui.enabled=true
// UI data retention limits
spark.connect.ui.statement.limit=200
spark.connect.ui.session.limit=100
spark.connect.ui.execution.limit=1000
// UI refresh intervals
spark.connect.ui.refresh.interval=5s
spark.connect.ui.auto.refresh.enabled=true// Enable detailed metrics collection
spark.connect.metrics.enabled=true
spark.connect.metrics.collection.interval=10s
// JVM stack trace limits for errors
spark.connect.jvm.stack.trace.max.size=2048
// Event listener configuration
spark.connect.listener.enabled=true
spark.connect.listener.async=true
spark.connect.listener.queue.size=10000// Memory limits per session
spark.connect.session.memory.limit=2GB
// Thread pool configuration
spark.connect.executor.thread.pool.size=50
spark.connect.executor.thread.pool.queue.size=1000
// Request timeout settings
spark.connect.request.timeout=300s
spark.connect.response.timeout=600s// Plan caching
spark.connect.plan.cache.enabled=true
spark.connect.plan.cache.size=1000
spark.connect.plan.cache.ttl=1h
// Result caching
spark.connect.result.cache.enabled=true
spark.connect.result.cache.max.size=100MB
spark.connect.result.cache.ttl=30minimport org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("Connect Server")
.config("spark.connect.grpc.binding.port", "15002")
.config("spark.connect.grpc.max.inbound.message.size", "268435456") // 256MB
.config("spark.connect.execute.reattachable.enabled", "true")
.config("spark.connect.session.timeout", "60min")
.getOrCreate()val spark = SparkSession.builder()
.appName("Secure Connect Server")
// TLS configuration
.config("spark.connect.grpc.tls.enabled", "true")
.config("spark.connect.grpc.tls.keystore.path", "/etc/spark/keystore.jks")
.config("spark.connect.grpc.tls.keystore.password", "keystore-password")
// Authentication
.config("spark.connect.security.auth.enabled", "true")
.config("spark.connect.security.auth.provider", "com.company.KerberosAuthProvider")
// Artifact security
.config("spark.connect.artifacts.validation.enabled", "true")
.config("spark.connect.artifacts.max.file.size", "50MB")
.getOrCreate()val spark = SparkSession.builder()
.appName("High Performance Connect Server")
// Increase message size and batch limits
.config("spark.connect.grpc.max.inbound.message.size", "536870912") // 512MB
.config("spark.connect.grpc.arrow.max.batch.size", "50000")
// Optimize execution
.config("spark.connect.execute.manager.max.concurrent", "20")
.config("spark.connect.executor.thread.pool.size", "100")
// Enable caching
.config("spark.connect.plan.cache.enabled", "true")
.config("spark.connect.result.cache.enabled", "true")
.config("spark.connect.result.cache.max.size", "500MB")
.getOrCreate()val spark = SparkSession.builder()
.appName("Connect Server with Plugins")
// Enable Spark Connect plugin
.config("spark.sql.extensions", "org.apache.spark.sql.connect.SparkConnectPlugin")
// Configure custom plugins
.config("spark.connect.extensions.relation.classes",
"com.company.CustomDataSourcePlugin,com.company.CachePlugin")
.config("spark.connect.extensions.expression.classes",
"com.company.CustomFunctionPlugin")
.config("spark.connect.extensions.command.classes",
"com.company.AdminCommandPlugin")
// Plugin settings
.config("spark.connect.extensions.validation.enabled", "true")
.config("spark.connect.extensions.loading.timeout", "60s")
.getOrCreate()# Server binding
export SPARK_CONNECT_GRPC_PORT=15002
export SPARK_CONNECT_GRPC_HOST=0.0.0.0
# JVM settings
export SPARK_CONNECT_JAVA_OPTS="-Xmx4g -XX:+UseG1GC"
# Security
export SPARK_CONNECT_TLS_KEYSTORE_PATH=/etc/spark/keystore.jks
export SPARK_CONNECT_TLS_KEYSTORE_PASSWORD_FILE=/etc/spark/keystore.password# Enable debug logging
export SPARK_CONNECT_LOG_LEVEL=DEBUG
# Development mode settings
export SPARK_CONNECT_DEV_MODE=true
export SPARK_CONNECT_PLUGIN_RELOAD=trueThe server validates configuration at startup:
// Configuration validation example
object ConfigValidator {
def validateConfig(conf: SparkConf): Unit = {
// Port validation
val port = conf.get(CONNECT_GRPC_BINDING_PORT)
require(port > 0 && port < 65536, s"Invalid port: $port")
// Memory limits
val messageSize = conf.get(CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE)
require(messageSize > 0, s"Message size must be positive: $messageSize")
// Plugin class validation
val relationPlugins = conf.get(CONNECT_EXTENSIONS_RELATION_CLASSES)
relationPlugins.foreach { className =>
require(isValidClassName(className), s"Invalid plugin class: $className")
}
}
}Configuration compatibility across Spark versions:
// Migration helper for configuration updates
object ConfigMigration {
def migrateConfig(oldConf: SparkConf): SparkConf = {
val newConf = oldConf.clone()
// Migrate deprecated settings
if (oldConf.contains("spark.connect.server.port")) {
val port = oldConf.get("spark.connect.server.port")
newConf.set("spark.connect.grpc.binding.port", port)
newConf.remove("spark.connect.server.port")
}
newConf
}
}