or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

artifact-management.mdconfiguration.mdindex.mdmonitoring-ui.mdplan-processing.mdplugin-system.mdserver-management.mdsession-management.md
tile.json

configuration.mddocs/

Configuration

The Spark Connect Server provides comprehensive configuration options for server behavior, security, performance tuning, and plugin management. All configuration is handled through Spark's standard configuration system.

Core Configuration Object

Connect Configuration

Central configuration object with all Connect-specific settings.

object Connect {
  // Server binding and network
  val CONNECT_GRPC_BINDING_PORT: ConfigEntry[Int]
  val CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE: ConfigEntry[Long]
  val CONNECT_GRPC_MARSHALLER_RECURSION_LIMIT: ConfigEntry[Int]
  
  // Interceptors and middleware
  val CONNECT_GRPC_INTERCEPTOR_CLASSES: ConfigEntry[Seq[String]]
  
  // Execution management
  val CONNECT_EXECUTE_MANAGER_DETACHED_TIMEOUT: ConfigEntry[String]
  val CONNECT_EXECUTE_MANAGER_MAINTENANCE_INTERVAL: ConfigEntry[String]
  val CONNECT_EXECUTE_MANAGER_ABANDONED_TOMBSTONES_SIZE: ConfigEntry[Int]
  val CONNECT_EXECUTE_REATTACHABLE_ENABLED: ConfigEntry[Boolean]
  val CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_DURATION: ConfigEntry[String]
  val CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_SIZE: ConfigEntry[Long]
  val CONNECT_EXECUTE_REATTACHABLE_OBSERVER_RETRY_BUFFER_SIZE: ConfigEntry[Int]
  
  // Plugin system
  val CONNECT_EXTENSIONS_RELATION_CLASSES: ConfigEntry[Seq[String]]
  val CONNECT_EXTENSIONS_EXPRESSION_CLASSES: ConfigEntry[Seq[String]]
  val CONNECT_EXTENSIONS_COMMAND_CLASSES: ConfigEntry[Seq[String]]
  
  // Error handling and debugging
  val CONNECT_JVM_STACK_TRACE_MAX_SIZE: ConfigEntry[Int]
  
  // UI and monitoring
  val CONNECT_UI_STATEMENT_LIMIT: ConfigEntry[Int]
  val CONNECT_UI_SESSION_LIMIT: ConfigEntry[Int]
  
  // Arrow and data transfer
  val CONNECT_GRPC_ARROW_MAX_BATCH_SIZE: ConfigEntry[Int]
  
  // File system operations
  val CONNECT_COPY_FROM_LOCAL_TO_FS_ALLOW_DEST_LOCAL: ConfigEntry[Boolean]
}

Server Configuration

Network and Binding Settings

// Server port (default: 15002)
spark.connect.grpc.binding.port=15002

// Maximum inbound message size (default: 134217728 = 128MB)
spark.connect.grpc.max.inbound.message.size=134217728

// gRPC marshaller recursion limit (default: 100)
spark.connect.grpc.marshaller.recursion.limit=100

Connection and Protocol Settings

// Enable TLS encryption
spark.connect.grpc.tls.enabled=true
spark.connect.grpc.tls.keystore.path=/path/to/keystore.jks
spark.connect.grpc.tls.keystore.password=password
spark.connect.grpc.tls.truststore.path=/path/to/truststore.jks
spark.connect.grpc.tls.truststore.password=password

// Client authentication
spark.connect.grpc.client.auth.enabled=true
spark.connect.grpc.client.cert.required=true

Execution Configuration

Execution Management

// Enable reattachable executions (default: true)
spark.connect.execute.reattachable.enabled=true

// Detached execution timeout (default: "2min")
spark.connect.execute.manager.detached.timeout=2min

// Maximum concurrent executions per session
spark.connect.execute.manager.max.concurrent=10

// Execution result caching
spark.connect.execute.result.cache.enabled=true
spark.connect.execute.result.cache.max.size=1000

Session Management

// Session timeout for idle connections
spark.connect.session.timeout=30min

// Maximum sessions per server
spark.connect.session.max.total=1000

// Session cleanup interval
spark.connect.session.cleanup.interval=5min

// Enable session isolation
spark.connect.session.isolation.enabled=true

Plugin Configuration

Extension Classes

// Custom relation plugins
spark.connect.extensions.relation.classes=com.mycompany.MyRelationPlugin,com.mycompany.AnotherRelationPlugin

// Custom expression plugins  
spark.connect.extensions.expression.classes=com.mycompany.MyExpressionPlugin

// Custom command plugins
spark.connect.extensions.command.classes=com.mycompany.MyCommandPlugin,com.mycompany.AdminCommandPlugin

Plugin Settings

// Plugin loading timeout
spark.connect.extensions.loading.timeout=30s

// Enable plugin validation
spark.connect.extensions.validation.enabled=true

// Plugin class loader isolation
spark.connect.extensions.classloader.isolation=true

Interceptor Configuration

gRPC Interceptors

// Custom interceptor classes
spark.connect.grpc.interceptor.classes=com.mycompany.AuthInterceptor,com.mycompany.LoggingInterceptor

// Built-in interceptor settings
spark.connect.grpc.interceptor.logging.enabled=true
spark.connect.grpc.interceptor.logging.level=INFO
spark.connect.grpc.interceptor.logging.requests=true
spark.connect.grpc.interceptor.logging.responses=false

// Local properties cleanup
spark.connect.grpc.interceptor.properties.cleanup.enabled=true

Data Transfer Configuration

Arrow and Serialization

// Arrow batch size for large results (default: 10000)
spark.connect.grpc.arrow.max.batch.size=10000

// Enable Arrow optimization
spark.connect.grpc.arrow.enabled=true

// Compression for large responses
spark.connect.grpc.compression.enabled=true
spark.connect.grpc.compression.algorithm=gzip

Streaming Configuration

// Streaming query cache size
spark.connect.streaming.query.cache.size=100

// Streaming result buffer size
spark.connect.streaming.result.buffer.size=1000

// Enable streaming query monitoring
spark.connect.streaming.monitoring.enabled=true

Security Configuration

Authentication and Authorization

// Enable authentication
spark.connect.security.auth.enabled=true
spark.connect.security.auth.provider=com.mycompany.AuthProvider

// User identity mapping
spark.connect.security.user.mapping.enabled=true
spark.connect.security.user.mapping.class=com.mycompany.UserMapper

// Authorization provider
spark.connect.security.authorization.enabled=true
spark.connect.security.authorization.provider=com.mycompany.AuthzProvider

Artifact Security

// Artifact validation
spark.connect.artifacts.validation.enabled=true
spark.connect.artifacts.allowed.types=jar,py,zip

// Maximum artifact sizes
spark.connect.artifacts.max.file.size=100MB
spark.connect.artifacts.max.session.size=1GB

// Artifact scanning
spark.connect.artifacts.scanning.enabled=true
spark.connect.artifacts.quarantine.enabled=true

Monitoring and UI Configuration

Web UI Settings

// Enable Connect UI tab
spark.connect.ui.enabled=true

// UI data retention limits
spark.connect.ui.statement.limit=200
spark.connect.ui.session.limit=100
spark.connect.ui.execution.limit=1000

// UI refresh intervals
spark.connect.ui.refresh.interval=5s
spark.connect.ui.auto.refresh.enabled=true

Metrics and Logging

// Enable detailed metrics collection
spark.connect.metrics.enabled=true
spark.connect.metrics.collection.interval=10s

// JVM stack trace limits for errors
spark.connect.jvm.stack.trace.max.size=2048

// Event listener configuration
spark.connect.listener.enabled=true
spark.connect.listener.async=true
spark.connect.listener.queue.size=10000

Performance Configuration

Resource Limits

// Memory limits per session
spark.connect.session.memory.limit=2GB

// Thread pool configuration
spark.connect.executor.thread.pool.size=50
spark.connect.executor.thread.pool.queue.size=1000

// Request timeout settings
spark.connect.request.timeout=300s
spark.connect.response.timeout=600s

Caching and Optimization

// Plan caching
spark.connect.plan.cache.enabled=true
spark.connect.plan.cache.size=1000
spark.connect.plan.cache.ttl=1h

// Result caching
spark.connect.result.cache.enabled=true
spark.connect.result.cache.max.size=100MB
spark.connect.result.cache.ttl=30min

Usage Examples

Basic Server Configuration

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("Connect Server")
  .config("spark.connect.grpc.binding.port", "15002")
  .config("spark.connect.grpc.max.inbound.message.size", "268435456") // 256MB
  .config("spark.connect.execute.reattachable.enabled", "true")
  .config("spark.connect.session.timeout", "60min")
  .getOrCreate()

Security-Enabled Configuration

val spark = SparkSession.builder()
  .appName("Secure Connect Server")
  // TLS configuration
  .config("spark.connect.grpc.tls.enabled", "true")
  .config("spark.connect.grpc.tls.keystore.path", "/etc/spark/keystore.jks")
  .config("spark.connect.grpc.tls.keystore.password", "keystore-password")
  
  // Authentication
  .config("spark.connect.security.auth.enabled", "true")
  .config("spark.connect.security.auth.provider", "com.company.KerberosAuthProvider")
  
  // Artifact security
  .config("spark.connect.artifacts.validation.enabled", "true")
  .config("spark.connect.artifacts.max.file.size", "50MB")
  .getOrCreate()

High-Performance Configuration

val spark = SparkSession.builder()
  .appName("High Performance Connect Server")
  // Increase message size and batch limits
  .config("spark.connect.grpc.max.inbound.message.size", "536870912") // 512MB
  .config("spark.connect.grpc.arrow.max.batch.size", "50000")
  
  // Optimize execution
  .config("spark.connect.execute.manager.max.concurrent", "20")
  .config("spark.connect.executor.thread.pool.size", "100")
  
  // Enable caching
  .config("spark.connect.plan.cache.enabled", "true")
  .config("spark.connect.result.cache.enabled", "true")
  .config("spark.connect.result.cache.max.size", "500MB")
  .getOrCreate()

Plugin-Enabled Configuration

val spark = SparkSession.builder()
  .appName("Connect Server with Plugins")
  // Enable Spark Connect plugin
  .config("spark.sql.extensions", "org.apache.spark.sql.connect.SparkConnectPlugin")
  
  // Configure custom plugins
  .config("spark.connect.extensions.relation.classes", 
    "com.company.CustomDataSourcePlugin,com.company.CachePlugin")
  .config("spark.connect.extensions.expression.classes",
    "com.company.CustomFunctionPlugin")
  .config("spark.connect.extensions.command.classes",
    "com.company.AdminCommandPlugin")
    
  // Plugin settings  
  .config("spark.connect.extensions.validation.enabled", "true")
  .config("spark.connect.extensions.loading.timeout", "60s")
  .getOrCreate()

Environment Variables

Server Environment

# Server binding
export SPARK_CONNECT_GRPC_PORT=15002
export SPARK_CONNECT_GRPC_HOST=0.0.0.0

# JVM settings
export SPARK_CONNECT_JAVA_OPTS="-Xmx4g -XX:+UseG1GC"

# Security
export SPARK_CONNECT_TLS_KEYSTORE_PATH=/etc/spark/keystore.jks
export SPARK_CONNECT_TLS_KEYSTORE_PASSWORD_FILE=/etc/spark/keystore.password

Development Environment

# Enable debug logging
export SPARK_CONNECT_LOG_LEVEL=DEBUG

# Development mode settings
export SPARK_CONNECT_DEV_MODE=true
export SPARK_CONNECT_PLUGIN_RELOAD=true

Configuration Validation

Validation Rules

The server validates configuration at startup:

// Configuration validation example
object ConfigValidator {
  def validateConfig(conf: SparkConf): Unit = {
    // Port validation
    val port = conf.get(CONNECT_GRPC_BINDING_PORT)
    require(port > 0 && port < 65536, s"Invalid port: $port")
    
    // Memory limits
    val messageSize = conf.get(CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE)
    require(messageSize > 0, s"Message size must be positive: $messageSize")
    
    // Plugin class validation
    val relationPlugins = conf.get(CONNECT_EXTENSIONS_RELATION_CLASSES)
    relationPlugins.foreach { className =>
      require(isValidClassName(className), s"Invalid plugin class: $className")
    }
  }
}

Common Configuration Issues

  • Port Conflicts: Ensure the configured port is available
  • Memory Limits: Set appropriate heap size for message limits
  • Plugin Loading: Verify plugin classes are in the classpath
  • Security Settings: Ensure certificates and keys are accessible
  • Network Configuration: Check firewall and network policies

Migration and Compatibility

Version Compatibility

Configuration compatibility across Spark versions:

  • Spark 3.4+: Full Connect server support
  • Spark 3.5+: Enhanced plugin system and UI features
  • Future Versions: Forward compatibility for core settings

Configuration Migration

// Migration helper for configuration updates
object ConfigMigration {
  def migrateConfig(oldConf: SparkConf): SparkConf = {
    val newConf = oldConf.clone()
    
    // Migrate deprecated settings
    if (oldConf.contains("spark.connect.server.port")) {
      val port = oldConf.get("spark.connect.server.port")
      newConf.set("spark.connect.grpc.binding.port", port)
      newConf.remove("spark.connect.server.port")
    }
    
    newConf
  }
}