CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-scala-shell-2-11

Interactive Scala shell for Apache Flink that provides a REPL environment for developing and testing Flink applications

Pending
Overview
Eval results
Files

configuration.mddocs/

Configuration Management

The Flink Scala Shell provides comprehensive configuration management for cluster connections, execution modes, and resource allocation across local, remote, and YARN environments.

Imports

import org.apache.flink.api.scala.FlinkShell.{Config, ExecutionMode, YarnConfig}
import org.apache.flink.configuration.Configuration
import org.apache.flink.client.program.ClusterClient
import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterClient}

Capabilities

Core Configuration

Main configuration container for shell execution settings.

/**
 * Configuration container for Flink Scala Shell execution settings
 * @param host Optional remote cluster host address
 * @param port Optional remote cluster port number
 * @param externalJars Optional array of external JAR file paths
 * @param executionMode Cluster execution mode (LOCAL, REMOTE, YARN, UNDEFINED)
 * @param yarnConfig Optional YARN-specific configuration settings
 * @param configDir Optional path to Flink configuration directory
 */
case class Config(
  host: Option[String] = None,
  port: Option[Int] = None,
  externalJars: Option[Array[String]] = None,
  executionMode: ExecutionMode.Value = ExecutionMode.UNDEFINED,
  yarnConfig: Option[YarnConfig] = None,
  configDir: Option[String] = None
)

Usage Examples:

// Local execution configuration
val localConfig = Config(executionMode = ExecutionMode.LOCAL)

// Remote execution configuration  
val remoteConfig = Config(
  host = Some("flink-cluster.example.com"),
  port = Some(8081),
  executionMode = ExecutionMode.REMOTE
)

// YARN execution with external JARs
val yarnConfig = Config(
  executionMode = ExecutionMode.YARN,
  externalJars = Some(Array("/path/to/lib1.jar", "/path/to/lib2.jar")),
  yarnConfig = Some(YarnConfig(
    jobManagerMemory = Some("1024m"),
    taskManagerMemory = Some("2048m")
  ))
)

Execution Mode Enumeration

Defines available cluster execution modes for the shell.

/**
 * Enumeration of supported execution modes
 */
object ExecutionMode extends Enumeration {
  /**
   * Undefined execution mode (initial state)
   */
  val UNDEFINED: ExecutionMode.Value
  
  /**
   * Local mini-cluster execution mode for development and testing
   */
  val LOCAL: ExecutionMode.Value
  
  /**
   * Remote cluster connection mode for existing Flink clusters
   */
  val REMOTE: ExecutionMode.Value
  
  /**
   * YARN cluster execution mode for Hadoop environments
   */
  val YARN: ExecutionMode.Value
}

YARN Configuration

Specialized configuration for YARN cluster deployments with resource management options.

/**
 * YARN-specific configuration for cluster resource allocation and job management
 * @param jobManagerMemory Optional memory allocation for JobManager container (e.g., "1024m", "2g")
 * @param name Optional custom application name displayed in YARN UI
 * @param queue Optional YARN queue for job submission and resource isolation
 * @param slots Optional number of task slots per TaskManager for parallelism control
 * @param taskManagerMemory Optional memory allocation per TaskManager container
 */
case class YarnConfig(
  jobManagerMemory: Option[String] = None,
  name: Option[String] = None,
  queue: Option[String] = None,
  slots: Option[Int] = None,
  taskManagerMemory: Option[String] = None
)

Usage Examples:

// Basic YARN configuration
val basicYarn = YarnConfig(
  jobManagerMemory = Some("1024m"),
  taskManagerMemory = Some("2048m")
)

// Advanced YARN configuration with resource management
val advancedYarn = YarnConfig(
  jobManagerMemory = Some("2g"),
  taskManagerMemory = Some("4g"),
  name = Some("Flink Scala Shell - Data Analysis"),
  queue = Some("analytics"),
  slots = Some(8)
)

Connection Management

Handles cluster connection setup and configuration resolution based on execution mode.

/**
 * Fetches connection information and sets up cluster configuration
 * @param config Shell configuration with execution mode and parameters
 * @param flinkConfig Base Flink configuration
 * @return Tuple of effective configuration and optional cluster client
 */
@Internal
def fetchConnectionInfo(
  config: Config,
  flinkConfig: Configuration
): (Configuration, Option[ClusterClient[_]])

Local Cluster Configuration

Creates and configures local mini-cluster for development and testing.

/**
 * Creates local mini-cluster configuration and client
 * @param flinkConfig Base Flink configuration
 * @return Tuple of effective configuration and cluster client
 */
private def createLocalClusterAndConfig(
  flinkConfig: Configuration
): (Configuration, Some[MiniClusterClient])

/**
 * Creates local mini-cluster instance with specified configuration
 * @param flinkConfig Configuration for cluster setup
 * @return Started MiniCluster instance
 */
private def createLocalCluster(flinkConfig: Configuration): MiniCluster

Local Cluster Features:

  • Automatic port allocation (uses port 0 for dynamic assignment)
  • JobManager and TaskManager co-location
  • Configurable task slots and parallelism
  • Immediate cluster startup and connection

Remote Cluster Configuration

Configures connection to existing remote Flink clusters.

/**
 * Creates configuration for remote cluster connection
 * @param config Shell configuration with host and port
 * @param flinkConfig Base Flink configuration
 * @return Tuple of effective configuration and None (no local client)
 */
private def createRemoteConfig(
  config: Config,
  flinkConfig: Configuration
): (Configuration, None.type)

/**
 * Sets JobManager connection information in configuration
 * @param config Configuration to modify
 * @param host JobManager host address
 * @param port JobManager port number
 */
private def setJobManagerInfoToConfig(
  config: Configuration,
  host: String,
  port: Integer
): Unit

Remote Configuration Requirements:

  • Valid host address (IP or hostname)
  • Accessible JobManager port
  • Network connectivity to Flink cluster
  • Compatible Flink version

YARN Configuration Management

Handles YARN cluster deployment and connection configuration.

/**
 * Creates or connects to YARN cluster based on configuration
 * @param config Shell configuration with YARN settings
 * @param flinkConfig Base Flink configuration
 * @return Tuple of effective configuration and optional cluster client
 */
private def createYarnClusterIfNeededAndGetConfig(
  config: Config,
  flinkConfig: Configuration
): (Configuration, Option[ClusterClient[_]])

/**
 * Deploys new YARN cluster with specified configuration
 * @param config Shell configuration
 * @param flinkConfig Base Flink configuration
 * @return Tuple of effective configuration and cluster client
 */
private def deployNewYarnCluster(
  config: Config,
  flinkConfig: Configuration
): (Configuration, Some[ClusterClient[_]])

/**
 * Fetches configuration information from existing YARN cluster
 * @param config Shell configuration
 * @param flinkConfig Base Flink configuration  
 * @param mode Execution mode string ("yarn-cluster", "default")
 * @return Tuple of effective configuration and None (no local client)
 */
private def fetchDeployedYarnClusterInfo(
  config: Config,
  flinkConfig: Configuration,
  mode: String
): (Configuration, None.type)

/**
 * Ensures YarnConfig exists, creating default if necessary
 * @param config Shell configuration
 * @return YarnConfig instance (existing or default)
 */
@Internal
def ensureYarnConfig(config: Config): YarnConfig

Command-Line Argument Parsing

Converts configuration objects to command-line arguments for cluster deployment.

/**
 * Converts configuration to command-line argument array for cluster deployment
 * @param config Shell configuration
 * @param mode Execution mode string ("local", "remote", "yarn-cluster", "default")
 * @return Array of command-line arguments
 */
def parseArgList(config: Config, mode: String): Array[String]

Generated Arguments Examples:

// YARN mode arguments
parseArgList(yarnConfig, "yarn-cluster")
// Returns: Array("-m", "yarn-cluster", "-yjm", "1024m", "-ytm", "2048m", "-ynm", "MyApp")

// Default mode (no specific arguments)
parseArgList(defaultConfig, "default")  
// Returns: Array()

Configuration Directory Resolution

Manages Flink configuration directory discovery and loading.

/**
 * Resolves configuration directory from multiple sources
 * Priority: command-line option > environment variable > default
 * @param config Shell configuration with optional configDir
 * @return Resolved configuration directory path
 */
private def getConfigDir(config: Config): String

/**
 * Loads global Flink configuration from resolved directory
 * @param config Shell configuration
 * @return Loaded Configuration instance with merged settings
 */
private def getGlobalConfig(config: Config): Configuration

Configuration Resolution Order:

  1. --configDir command-line option (highest priority)
  2. FLINK_CONF_DIR environment variable
  3. CliFrontend.getConfigurationDirectoryFromEnv() (Flink default)

Error Handling and Validation

Comprehensive validation and error handling for configuration issues.

Common Configuration Errors:

  • Missing host/port for remote mode
  • Invalid YARN configuration parameters
  • Inaccessible configuration directories
  • Network connectivity issues
  • Incompatible Flink versions

Error Handling Patterns:

// Remote mode validation
if (config.host.isEmpty || config.port.isEmpty) {
  throw new IllegalArgumentException("<host> or <port> is not specified!")
}

// Execution mode validation
config.executionMode match {
  case ExecutionMode.UNDEFINED => 
    throw new IllegalArgumentException("please specify execution mode: [local | remote <host> <port> | yarn]")
  case _ => // Continue with valid mode
}

Configuration Examples

Development Setup (Local)

val devConfig = Config(
  executionMode = ExecutionMode.LOCAL,
  externalJars = Some(Array("/path/to/test-data.jar")),
  configDir = Some("/opt/flink/conf")
)

Production Remote Cluster

val prodConfig = Config(
  host = Some("prod-flink-cluster.company.com"),
  port = Some(8081),
  executionMode = ExecutionMode.REMOTE,
  externalJars = Some(Array(
    "/shared/libs/company-commons.jar",
    "/shared/libs/data-connectors.jar"
  ))
)

YARN Analytics Environment

val analyticsConfig = Config(
  executionMode = ExecutionMode.YARN,
  yarnConfig = Some(YarnConfig(
    jobManagerMemory = Some("4g"),
    taskManagerMemory = Some("8g"),
    name = Some("Analytics Shell - User Research"),
    queue = Some("analytics-queue"),
    slots = Some(16)
  )),
  externalJars = Some(Array("/analytics/libs/ml-algorithms.jar"))
)

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-scala-shell-2-11

docs

command-line-interface.md

configuration.md

index.md

interactive-repl.md

tile.json