Interactive Scala shell for Apache Flink that provides a REPL environment for developing and testing Flink applications
—
The Flink Scala Shell provides a comprehensive command-line interface for starting interactive Scala sessions with different Flink cluster configurations.
import org.apache.flink.api.scala.FlinkShell.ConfigEntry point that parses command-line arguments and starts the shell.
/**
* Main entry point for the Flink Scala Shell
* @param args Command-line arguments specifying execution mode and options
*/
def main(args: Array[String]): UnitUsage Examples:
# Start with local cluster
start-scala-shell.sh local
# Start with remote cluster
start-scala-shell.sh remote localhost 8081
# Start with YARN cluster
start-scala-shell.sh yarn --jobManagerMemory 1024m --taskManagerMemory 2048mInitializes and starts the interactive shell with the provided configuration.
/**
* Starts the Flink Scala Shell with the given configuration
* @param config Configuration object containing cluster and execution settings
*/
def startShell(config: Config): UnitThe shell supports three primary execution modes, each with specific command syntax:
Starts a local Flink mini-cluster for development and testing.
start-scala-shell.sh local [options]Options:
--addclasspath/-a <path/to/jar> - Add external JARs (colon-separated paths)Example:
start-scala-shell.sh local --addclasspath /path/to/my-lib.jar:/path/to/other-lib.jarConnects to an existing remote Flink cluster.
start-scala-shell.sh remote <host> <port> [options]Required Arguments:
<host> - Remote JobManager host address<port> - Remote JobManager port numberOptions:
--addclasspath/-a <path/to/jar> - Add external JARs (colon-separated paths)Example:
start-scala-shell.sh remote flink-cluster.example.com 8081 --addclasspath /path/to/deps.jarConnects to or creates a YARN cluster for Flink execution.
start-scala-shell.sh yarn [options]YARN-Specific Options:
--jobManagerMemory/-jm <memory> - Memory allocation for JobManager container (e.g., "1024m", "2g")--taskManagerMemory/-tm <memory> - Memory allocation per TaskManager container--name/-nm <name> - Custom application name on YARN--queue/-qu <queue> - YARN queue for job submission--slots/-s <slots> - Number of slots per TaskManager--addclasspath/-a <path/to/jar> - Add external JARs (colon-separated paths)Example:
start-scala-shell.sh yarn \
--jobManagerMemory 1024m \
--taskManagerMemory 2048m \
--name "My Flink Shell Session" \
--queue production \
--slots 4 \
--addclasspath /path/to/dependencies.jarOptions available across all execution modes:
/**
* Global configuration options available for all execution modes
*/
case class GlobalOptions(
configDir: Option[String], // Configuration directory path
addclasspath: Option[String], // External JAR dependencies
help: Boolean // Show help information
)Global Command-Line Options:
--configDir <directory> - Specify Flink configuration directory path--help/-h - Display usage information and exitThe shell uses scopt library for robust command-line argument parsing with built-in validation and help generation.
/**
* Command-line parser configuration with subcommands and options
*/
private val parser: scopt.OptionParser[Config] = new scopt.OptionParser[Config]("start-scala-shell.sh") {
head("Flink Scala Shell")
cmd("local").action((_, c) => c.copy(executionMode = ExecutionMode.LOCAL))
cmd("remote").action((_, c) => c.copy(executionMode = ExecutionMode.REMOTE))
cmd("yarn").action((_, c) => c.copy(executionMode = ExecutionMode.YARN))
}The command-line interface provides comprehensive error handling for common configuration issues:
Example Error Messages:
Error: <host> or <port> is not specified!
Error: please specify execution mode: [local | remote <host> <port> | yarn]
Could not parse program argumentsThe shell automatically locates Flink configuration using:
--configDir command-line option (highest priority)FLINK_CONF_DIR environment variable/**
* Resolves configuration directory from command-line options or environment
* @param config Configuration object with optional configDir
* @return Path to configuration directory
*/
private def getConfigDir(config: Config): String
/**
* Loads global Flink configuration from resolved directory
* @param config Configuration object
* @return Loaded Configuration instance
*/
private def getGlobalConfig(config: Config): ConfigurationInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-scala-shell-2-11