Interactive Scala shell for Apache Flink that provides a REPL environment for developing and testing Flink applications
npx @tessl/cli install tessl/maven-org-apache-flink--flink-scala-shell-2-11@1.14.0Flink Scala Shell is an interactive REPL environment for Apache Flink that enables developers to interactively develop, test, and experiment with Flink applications written in Scala. It provides access to both DataStream and DataSet APIs with support for local, remote, and YARN cluster execution modes.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala-shell_2.11</artifactId>
<version>1.14.6</version>
</dependency>The shell automatically provides these pre-imported packages and environment variables:
// Core Flink APIs are pre-imported:
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.client.program.ClusterClient
import org.apache.flink.annotation.Internal
import java.io.{BufferedReader, File}
import scala.tools.nsc.interpreter.{ILoop, JPrintWriter}
// Environment variables are pre-configured:
// benv: ExecutionEnvironment (for DataSet API)
// senv: StreamExecutionEnvironment (for DataStream API)# Local cluster mode
start-scala-shell.sh local
# Remote cluster mode
start-scala-shell.sh remote <host> <port>
# YARN cluster mode
start-scala-shell.sh yarn// DataStream API - Streaming operations
val dataStream = senv.fromElements(1, 2, 3, 4)
dataStream
.countWindowAll(2)
.sum(0)
.executeAndCollect()
.foreach(println)
// Table API - SQL operations
val tenv = StreamTableEnvironment.create(senv)
val table = tenv.fromValues(row("Alice", 1), row("Bob", 2)).as("name", "score")
table
.groupBy($"name")
.select($"name", $"score".sum)
.execute()
.print()
// DataSet API - Batch operations (legacy)
val dataSet = benv.readTextFile("/path/to/data")
dataSet.writeAsText("/path/to/output")
benv.execute("My batch program")The Flink Scala Shell consists of several key components:
Core command-line interface for starting the shell with different execution modes and configuration options.
object FlinkShell {
def main(args: Array[String]): Unit
def startShell(config: Config): Unit
@Internal def ensureYarnConfig(config: Config): YarnConfig
@Internal def fetchConnectionInfo(config: Config, flinkConfig: Configuration): (Configuration, Option[ClusterClient[_]])
def parseArgList(config: Config, mode: String): Array[String]
}Interactive Scala REPL environment with Flink-specific functionality and pre-configured execution environments.
class FlinkILoop(
val flinkConfig: Configuration,
val externalJars: Option[Array[String]],
in0: Option[BufferedReader],
out0: JPrintWriter
) extends ILoop(in0, out0) {
val scalaBenv: ExecutionEnvironment
val scalaSenv: StreamExecutionEnvironment
def this(flinkConfig: Configuration, externalJars: Option[Array[String]], in0: BufferedReader, out: JPrintWriter)
def this(flinkConfig: Configuration, externalJars: Option[Array[String]])
def this(flinkConfig: Configuration, in0: BufferedReader, out: JPrintWriter)
override def createInterpreter(): Unit
def writeFilesToDisk(): File
override def printWelcome(): Unit
def getExternalJars(): Array[String]
}Configuration system for managing cluster connections, execution modes, and YARN settings.
case class Config(
host: Option[String] = None,
port: Option[Int] = None,
externalJars: Option[Array[String]] = None,
executionMode: ExecutionMode.Value = ExecutionMode.UNDEFINED,
yarnConfig: Option[YarnConfig] = None,
configDir: Option[String] = None
)
case class YarnConfig(
jobManagerMemory: Option[String] = None,
name: Option[String] = None,
queue: Option[String] = None,
slots: Option[Int] = None,
taskManagerMemory: Option[String] = None
)
object ExecutionMode extends Enumeration {
val UNDEFINED, LOCAL, REMOTE, YARN = Value
}object ExecutionMode extends Enumeration {
val UNDEFINED: ExecutionMode.Value
val LOCAL: ExecutionMode.Value
val REMOTE: ExecutionMode.Value
val YARN: ExecutionMode.Value
}
case class Config(
host: Option[String],
port: Option[Int],
externalJars: Option[Array[String]],
executionMode: ExecutionMode.Value,
yarnConfig: Option[YarnConfig],
configDir: Option[String]
)
case class YarnConfig(
jobManagerMemory: Option[String],
name: Option[String],
queue: Option[String],
slots: Option[Int],
taskManagerMemory: Option[String]
)