or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

command-line-interface.mdconfiguration.mdindex.mdinteractive-repl.md
tile.json

tessl/maven-org-apache-flink--flink-scala-shell-2-11

Interactive Scala shell for Apache Flink that provides a REPL environment for developing and testing Flink applications

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-scala-shell_2.11@1.14.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-scala-shell-2-11@1.14.0

index.mddocs/

Flink Scala Shell

Flink Scala Shell is an interactive REPL environment for Apache Flink that enables developers to interactively develop, test, and experiment with Flink applications written in Scala. It provides access to both DataStream and DataSet APIs with support for local, remote, and YARN cluster execution modes.

Package Information

  • Package Name: flink-scala-shell_2.11
  • Package Type: maven
  • Language: Scala
  • Version: 1.14.6
  • Installation: Add as Maven dependency or use as part of Flink distribution
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-scala-shell_2.11</artifactId>
    <version>1.14.6</version>
</dependency>

Core Imports

The shell automatically provides these pre-imported packages and environment variables:

// Core Flink APIs are pre-imported:
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.client.program.ClusterClient
import org.apache.flink.annotation.Internal
import java.io.{BufferedReader, File}
import scala.tools.nsc.interpreter.{ILoop, JPrintWriter}

// Environment variables are pre-configured:
// benv: ExecutionEnvironment (for DataSet API)
// senv: StreamExecutionEnvironment (for DataStream API)

Basic Usage

Starting the Shell

# Local cluster mode
start-scala-shell.sh local

# Remote cluster mode  
start-scala-shell.sh remote <host> <port>

# YARN cluster mode
start-scala-shell.sh yarn

Interactive Development

// DataStream API - Streaming operations
val dataStream = senv.fromElements(1, 2, 3, 4)
dataStream
  .countWindowAll(2)
  .sum(0)
  .executeAndCollect()
  .foreach(println)

// Table API - SQL operations
val tenv = StreamTableEnvironment.create(senv)
val table = tenv.fromValues(row("Alice", 1), row("Bob", 2)).as("name", "score")
table
  .groupBy($"name")
  .select($"name", $"score".sum)
  .execute()
  .print()

// DataSet API - Batch operations (legacy)
val dataSet = benv.readTextFile("/path/to/data")
dataSet.writeAsText("/path/to/output")
benv.execute("My batch program")

Architecture

The Flink Scala Shell consists of several key components:

  • FlinkShell: Main entry point handling command-line parsing and cluster connection
  • FlinkILoop: Interactive REPL extending Scala's ILoop with Flink-specific functionality
  • Execution Modes: Support for local, remote, and YARN cluster execution
  • Environment Setup: Automatic configuration of batch (benv) and streaming (senv) environments
  • JAR Packaging: Compilation and packaging of REPL code for cluster execution

Capabilities

Command-Line Interface

Core command-line interface for starting the shell with different execution modes and configuration options.

object FlinkShell {
  def main(args: Array[String]): Unit
  def startShell(config: Config): Unit
  @Internal def ensureYarnConfig(config: Config): YarnConfig
  @Internal def fetchConnectionInfo(config: Config, flinkConfig: Configuration): (Configuration, Option[ClusterClient[_]])
  def parseArgList(config: Config, mode: String): Array[String]
}

Command-Line Interface

Interactive REPL Environment

Interactive Scala REPL environment with Flink-specific functionality and pre-configured execution environments.

class FlinkILoop(
  val flinkConfig: Configuration,
  val externalJars: Option[Array[String]],
  in0: Option[BufferedReader],
  out0: JPrintWriter
) extends ILoop(in0, out0) {
  val scalaBenv: ExecutionEnvironment
  val scalaSenv: StreamExecutionEnvironment
  
  def this(flinkConfig: Configuration, externalJars: Option[Array[String]], in0: BufferedReader, out: JPrintWriter)
  def this(flinkConfig: Configuration, externalJars: Option[Array[String]])
  def this(flinkConfig: Configuration, in0: BufferedReader, out: JPrintWriter)
  
  override def createInterpreter(): Unit
  def writeFilesToDisk(): File
  override def printWelcome(): Unit
  def getExternalJars(): Array[String]
}

Interactive REPL Environment

Configuration Management

Configuration system for managing cluster connections, execution modes, and YARN settings.

case class Config(
  host: Option[String] = None,
  port: Option[Int] = None,
  externalJars: Option[Array[String]] = None,
  executionMode: ExecutionMode.Value = ExecutionMode.UNDEFINED,
  yarnConfig: Option[YarnConfig] = None,
  configDir: Option[String] = None
)

case class YarnConfig(
  jobManagerMemory: Option[String] = None,
  name: Option[String] = None,
  queue: Option[String] = None,
  slots: Option[Int] = None,
  taskManagerMemory: Option[String] = None
)

object ExecutionMode extends Enumeration {
  val UNDEFINED, LOCAL, REMOTE, YARN = Value
}

Configuration Management

Types

object ExecutionMode extends Enumeration {
  val UNDEFINED: ExecutionMode.Value
  val LOCAL: ExecutionMode.Value
  val REMOTE: ExecutionMode.Value
  val YARN: ExecutionMode.Value
}

case class Config(
  host: Option[String],
  port: Option[Int],
  externalJars: Option[Array[String]],
  executionMode: ExecutionMode.Value,
  yarnConfig: Option[YarnConfig],
  configDir: Option[String]
)

case class YarnConfig(
  jobManagerMemory: Option[String],
  name: Option[String],
  queue: Option[String],
  slots: Option[Int],
  taskManagerMemory: Option[String]
)