Interactive Scala shell for Apache Flink that provides a REPL environment for developing and testing Flink applications
—
The FlinkILoop class provides an interactive Scala REPL environment with Flink-specific functionality, pre-configured execution environments, and automatic imports for seamless development experience.
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.java.{ScalaShellEnvironment, ScalaShellStreamEnvironment}
import java.io.{BufferedReader, File}
import scala.tools.nsc.interpreter.{ILoop, JPrintWriter}Creates and configures the interactive Scala interpreter with Flink environments.
/**
* FlinkILoop primary constructor with full configuration
* @param flinkConfig Flink cluster configuration
* @param externalJars Optional array of external JAR file paths
* @param in0 Optional buffered reader for input (primarily for testing)
* @param out0 Print writer for output
*/
class FlinkILoop(
val flinkConfig: Configuration,
val externalJars: Option[Array[String]],
in0: Option[BufferedReader],
out0: JPrintWriter
) extends ILoop(in0, out0) {
/**
* Auxiliary constructor with explicit I/O streams
* @param flinkConfig Flink cluster configuration
* @param externalJars Optional array of external JAR file paths
* @param in0 Buffered reader for input
* @param out Print writer for output
*/
def this(
flinkConfig: Configuration,
externalJars: Option[Array[String]],
in0: BufferedReader,
out: JPrintWriter
)
/**
* Auxiliary constructor using standard console I/O
* @param flinkConfig Flink cluster configuration
* @param externalJars Optional array of external JAR file paths
*/
def this(flinkConfig: Configuration, externalJars: Option[Array[String]])
/**
* Auxiliary constructor without external JARs
* @param flinkConfig Flink cluster configuration
* @param in0 Buffered reader for input
* @param out Print writer for output
*/
def this(flinkConfig: Configuration, in0: BufferedReader, out: JPrintWriter)
}Pre-configured Flink execution environments available as REPL variables.
/**
* Batch execution environment for DataSet API operations
*/
val scalaBenv: ExecutionEnvironment
/**
* Streaming execution environment for DataStream API operations
*/
val scalaSenv: StreamExecutionEnvironment
/**
* Remote batch environment (internal)
*/
private val remoteBenv: ScalaShellEnvironment
/**
* Remote streaming environment (internal)
*/
private val remoteSenv: ScalaShellStreamEnvironmentUsage in REPL:
// Batch operations using benv
val dataset = benv.readTextFile("/path/to/input.txt")
dataset.flatMap(_.split("\\s+")).writeAsText("/path/to/output")
benv.execute("Word Count Batch Job")
// Streaming operations using senv
val stream = senv.fromElements("hello", "world", "flink")
stream.flatMap(_.split("\\s+"))
.map((_, 1))
.keyBy(0)
.sum(1)
.print()
senv.execute("Word Count Stream Job")Sets up the Scala interpreter with Flink-specific imports and environment bindings.
/**
* Creates and configures the Scala interpreter with Flink imports and environments
* Automatically imports common Flink packages and binds execution environments
*/
override def createInterpreter(): UnitAutomatically Imported Packages:
org.apache.flink.core.fs._org.apache.flink.api.scala._org.apache.flink.streaming.api.scala._org.apache.flink.table.api._org.apache.flink.table.api.bridge.scala._org.apache.flink.api.common.functions._org.apache.flink.types.RowPre-bound Variables:
benv - Batch ExecutionEnvironmentsenv - Stream ExecutionEnvironmentCompiles and packages REPL-generated classes for cluster execution.
/**
* Packages compiled classes from the current shell session into a JAR file
* for execution on a Flink cluster
* @return File path to the created JAR file
*/
def writeFilesToDisk(): FileUsage Example:
// After defining functions and classes in the REPL
val jarFile = writeFilesToDisk()
println(s"Created JAR: ${jarFile.getAbsolutePath}")Handles external JAR dependencies for extended functionality.
/**
* Returns array of external JAR file paths
* @return Array of JAR paths, empty array if none specified
*/
def getExternalJars(): Array[String]Displays informative welcome message with usage examples and environment information.
/**
* Displays ASCII art welcome message and usage examples
* Shows available environment variables and sample code snippets
*/
override def printWelcome(): UnitWelcome Message Content:
benv, senv)Manages temporary directories and files for compiled classes and JAR packaging.
/**
* Base temporary directory for shell operations
*/
private val tmpDirBase: File
/**
* Directory for compiled shell commands
*/
private val tmpDirShell: File
/**
* JAR file for packaged shell commands
*/
private val tmpJarShell: FileThe REPL automatically imports comprehensive Flink API packages:
import org.apache.flink.core.fs._
import org.apache.flink.core.fs.local._
import org.apache.flink.api.common.io._import org.apache.flink.api.common.aggregators._
import org.apache.flink.api.common.accumulators._
import org.apache.flink.api.common.distributions._
import org.apache.flink.api.common.operators._
import org.apache.flink.api.common.functions._import org.apache.flink.api.scala._
import org.apache.flink.api.scala.utils._
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time._import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.connector.ChangelogMode
import org.apache.flink.table.functions._
import org.apache.flink.types.Rowimport org.apache.flink.api.java.io._
import org.apache.flink.api.java.aggregation._
import org.apache.flink.api.java.functions._
import org.apache.flink.api.java.operators._
import org.apache.flink.api.java.sampling._// Create a stream from elements
val dataStream = senv.fromElements(1, 2, 3, 4, 5)
// Apply transformations
val processed = dataStream
.filter(_ > 2)
.map(_ * 2)
.keyBy(identity)
.sum(0)
// Execute and collect results
processed.executeAndCollect().foreach(println)// Create table environment
val tenv = StreamTableEnvironment.create(senv)
// Create table from values
val table = tenv.fromValues(
row("Alice", 25),
row("Bob", 30),
row("Charlie", 35)
).as("name", "age")
// SQL queries
val result = tenv.sqlQuery("SELECT name, age FROM " + table + " WHERE age > 28")
result.execute().print()// Read from file
val dataset = benv.readTextFile("/path/to/input.txt")
// Word count example
val counts = dataset
.flatMap(_.toLowerCase.split("\\W+"))
.filter(_.nonEmpty)
.map((_, 1))
.groupBy(0)
.sum(1)
// Write results
counts.writeAsCsv("/path/to/output.csv")
benv.execute("Word Count Job")// Define custom function in REPL
class MyMapFunction extends MapFunction[String, String] {
override def map(value: String): String = {
value.toUpperCase + "!"
}
}
// Use custom function
val stream = senv.fromElements("hello", "world")
stream.map(new MyMapFunction()).print()
senv.execute("Custom Function Job")Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-scala-shell-2-11