Interactive Scala shell for Apache Flink that provides a REPL environment for developing and testing Flink applications
—
The Flink Scala Shell provides comprehensive configuration management for cluster connections, execution modes, and resource allocation across local, remote, and YARN environments.
import org.apache.flink.api.scala.FlinkShell.{Config, ExecutionMode, YarnConfig}
import org.apache.flink.configuration.Configuration
import org.apache.flink.client.program.ClusterClient
import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterClient}Main configuration container for shell execution settings.
/**
* Configuration container for Flink Scala Shell execution settings
* @param host Optional remote cluster host address
* @param port Optional remote cluster port number
* @param externalJars Optional array of external JAR file paths
* @param executionMode Cluster execution mode (LOCAL, REMOTE, YARN, UNDEFINED)
* @param yarnConfig Optional YARN-specific configuration settings
* @param configDir Optional path to Flink configuration directory
*/
case class Config(
host: Option[String] = None,
port: Option[Int] = None,
externalJars: Option[Array[String]] = None,
executionMode: ExecutionMode.Value = ExecutionMode.UNDEFINED,
yarnConfig: Option[YarnConfig] = None,
configDir: Option[String] = None
)Usage Examples:
// Local execution configuration
val localConfig = Config(executionMode = ExecutionMode.LOCAL)
// Remote execution configuration
val remoteConfig = Config(
host = Some("flink-cluster.example.com"),
port = Some(8081),
executionMode = ExecutionMode.REMOTE
)
// YARN execution with external JARs
val yarnConfig = Config(
executionMode = ExecutionMode.YARN,
externalJars = Some(Array("/path/to/lib1.jar", "/path/to/lib2.jar")),
yarnConfig = Some(YarnConfig(
jobManagerMemory = Some("1024m"),
taskManagerMemory = Some("2048m")
))
)Defines available cluster execution modes for the shell.
/**
* Enumeration of supported execution modes
*/
object ExecutionMode extends Enumeration {
/**
* Undefined execution mode (initial state)
*/
val UNDEFINED: ExecutionMode.Value
/**
* Local mini-cluster execution mode for development and testing
*/
val LOCAL: ExecutionMode.Value
/**
* Remote cluster connection mode for existing Flink clusters
*/
val REMOTE: ExecutionMode.Value
/**
* YARN cluster execution mode for Hadoop environments
*/
val YARN: ExecutionMode.Value
}Specialized configuration for YARN cluster deployments with resource management options.
/**
* YARN-specific configuration for cluster resource allocation and job management
* @param jobManagerMemory Optional memory allocation for JobManager container (e.g., "1024m", "2g")
* @param name Optional custom application name displayed in YARN UI
* @param queue Optional YARN queue for job submission and resource isolation
* @param slots Optional number of task slots per TaskManager for parallelism control
* @param taskManagerMemory Optional memory allocation per TaskManager container
*/
case class YarnConfig(
jobManagerMemory: Option[String] = None,
name: Option[String] = None,
queue: Option[String] = None,
slots: Option[Int] = None,
taskManagerMemory: Option[String] = None
)Usage Examples:
// Basic YARN configuration
val basicYarn = YarnConfig(
jobManagerMemory = Some("1024m"),
taskManagerMemory = Some("2048m")
)
// Advanced YARN configuration with resource management
val advancedYarn = YarnConfig(
jobManagerMemory = Some("2g"),
taskManagerMemory = Some("4g"),
name = Some("Flink Scala Shell - Data Analysis"),
queue = Some("analytics"),
slots = Some(8)
)Handles cluster connection setup and configuration resolution based on execution mode.
/**
* Fetches connection information and sets up cluster configuration
* @param config Shell configuration with execution mode and parameters
* @param flinkConfig Base Flink configuration
* @return Tuple of effective configuration and optional cluster client
*/
@Internal
def fetchConnectionInfo(
config: Config,
flinkConfig: Configuration
): (Configuration, Option[ClusterClient[_]])Creates and configures local mini-cluster for development and testing.
/**
* Creates local mini-cluster configuration and client
* @param flinkConfig Base Flink configuration
* @return Tuple of effective configuration and cluster client
*/
private def createLocalClusterAndConfig(
flinkConfig: Configuration
): (Configuration, Some[MiniClusterClient])
/**
* Creates local mini-cluster instance with specified configuration
* @param flinkConfig Configuration for cluster setup
* @return Started MiniCluster instance
*/
private def createLocalCluster(flinkConfig: Configuration): MiniClusterLocal Cluster Features:
Configures connection to existing remote Flink clusters.
/**
* Creates configuration for remote cluster connection
* @param config Shell configuration with host and port
* @param flinkConfig Base Flink configuration
* @return Tuple of effective configuration and None (no local client)
*/
private def createRemoteConfig(
config: Config,
flinkConfig: Configuration
): (Configuration, None.type)
/**
* Sets JobManager connection information in configuration
* @param config Configuration to modify
* @param host JobManager host address
* @param port JobManager port number
*/
private def setJobManagerInfoToConfig(
config: Configuration,
host: String,
port: Integer
): UnitRemote Configuration Requirements:
Handles YARN cluster deployment and connection configuration.
/**
* Creates or connects to YARN cluster based on configuration
* @param config Shell configuration with YARN settings
* @param flinkConfig Base Flink configuration
* @return Tuple of effective configuration and optional cluster client
*/
private def createYarnClusterIfNeededAndGetConfig(
config: Config,
flinkConfig: Configuration
): (Configuration, Option[ClusterClient[_]])
/**
* Deploys new YARN cluster with specified configuration
* @param config Shell configuration
* @param flinkConfig Base Flink configuration
* @return Tuple of effective configuration and cluster client
*/
private def deployNewYarnCluster(
config: Config,
flinkConfig: Configuration
): (Configuration, Some[ClusterClient[_]])
/**
* Fetches configuration information from existing YARN cluster
* @param config Shell configuration
* @param flinkConfig Base Flink configuration
* @param mode Execution mode string ("yarn-cluster", "default")
* @return Tuple of effective configuration and None (no local client)
*/
private def fetchDeployedYarnClusterInfo(
config: Config,
flinkConfig: Configuration,
mode: String
): (Configuration, None.type)
/**
* Ensures YarnConfig exists, creating default if necessary
* @param config Shell configuration
* @return YarnConfig instance (existing or default)
*/
@Internal
def ensureYarnConfig(config: Config): YarnConfigConverts configuration objects to command-line arguments for cluster deployment.
/**
* Converts configuration to command-line argument array for cluster deployment
* @param config Shell configuration
* @param mode Execution mode string ("local", "remote", "yarn-cluster", "default")
* @return Array of command-line arguments
*/
def parseArgList(config: Config, mode: String): Array[String]Generated Arguments Examples:
// YARN mode arguments
parseArgList(yarnConfig, "yarn-cluster")
// Returns: Array("-m", "yarn-cluster", "-yjm", "1024m", "-ytm", "2048m", "-ynm", "MyApp")
// Default mode (no specific arguments)
parseArgList(defaultConfig, "default")
// Returns: Array()Manages Flink configuration directory discovery and loading.
/**
* Resolves configuration directory from multiple sources
* Priority: command-line option > environment variable > default
* @param config Shell configuration with optional configDir
* @return Resolved configuration directory path
*/
private def getConfigDir(config: Config): String
/**
* Loads global Flink configuration from resolved directory
* @param config Shell configuration
* @return Loaded Configuration instance with merged settings
*/
private def getGlobalConfig(config: Config): ConfigurationConfiguration Resolution Order:
--configDir command-line option (highest priority)FLINK_CONF_DIR environment variableCliFrontend.getConfigurationDirectoryFromEnv() (Flink default)Comprehensive validation and error handling for configuration issues.
Common Configuration Errors:
Error Handling Patterns:
// Remote mode validation
if (config.host.isEmpty || config.port.isEmpty) {
throw new IllegalArgumentException("<host> or <port> is not specified!")
}
// Execution mode validation
config.executionMode match {
case ExecutionMode.UNDEFINED =>
throw new IllegalArgumentException("please specify execution mode: [local | remote <host> <port> | yarn]")
case _ => // Continue with valid mode
}val devConfig = Config(
executionMode = ExecutionMode.LOCAL,
externalJars = Some(Array("/path/to/test-data.jar")),
configDir = Some("/opt/flink/conf")
)val prodConfig = Config(
host = Some("prod-flink-cluster.company.com"),
port = Some(8081),
executionMode = ExecutionMode.REMOTE,
externalJars = Some(Array(
"/shared/libs/company-commons.jar",
"/shared/libs/data-connectors.jar"
))
)val analyticsConfig = Config(
executionMode = ExecutionMode.YARN,
yarnConfig = Some(YarnConfig(
jobManagerMemory = Some("4g"),
taskManagerMemory = Some("8g"),
name = Some("Analytics Shell - User Research"),
queue = Some("analytics-queue"),
slots = Some(16)
)),
externalJars = Some(Array("/analytics/libs/ml-algorithms.jar"))
)Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-scala-shell-2-11