CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-scala-shell-2-11

Interactive Scala shell for Apache Flink that provides a REPL environment for developing and testing Flink applications

Pending
Overview
Eval results
Files

interactive-repl.mddocs/

Interactive REPL Environment

The FlinkILoop class provides an interactive Scala REPL environment with Flink-specific functionality, pre-configured execution environments, and automatic imports for seamless development experience.

Imports

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.java.{ScalaShellEnvironment, ScalaShellStreamEnvironment}
import java.io.{BufferedReader, File}
import scala.tools.nsc.interpreter.{ILoop, JPrintWriter}

Capabilities

REPL Initialization

Creates and configures the interactive Scala interpreter with Flink environments.

/**
 * FlinkILoop primary constructor with full configuration
 * @param flinkConfig Flink cluster configuration
 * @param externalJars Optional array of external JAR file paths
 * @param in0 Optional buffered reader for input (primarily for testing)
 * @param out0 Print writer for output
 */
class FlinkILoop(
  val flinkConfig: Configuration,
  val externalJars: Option[Array[String]],
  in0: Option[BufferedReader],
  out0: JPrintWriter
) extends ILoop(in0, out0) {

  /**
   * Auxiliary constructor with explicit I/O streams
   * @param flinkConfig Flink cluster configuration
   * @param externalJars Optional array of external JAR file paths
   * @param in0 Buffered reader for input
   * @param out Print writer for output
   */
  def this(
    flinkConfig: Configuration,
    externalJars: Option[Array[String]],
    in0: BufferedReader,
    out: JPrintWriter
  )

  /**
   * Auxiliary constructor using standard console I/O
   * @param flinkConfig Flink cluster configuration
   * @param externalJars Optional array of external JAR file paths
   */
  def this(flinkConfig: Configuration, externalJars: Option[Array[String]])

  /**
   * Auxiliary constructor without external JARs
   * @param flinkConfig Flink cluster configuration
   * @param in0 Buffered reader for input
   * @param out Print writer for output
   */
  def this(flinkConfig: Configuration, in0: BufferedReader, out: JPrintWriter)
}

Execution Environments

Pre-configured Flink execution environments available as REPL variables.

/**
 * Batch execution environment for DataSet API operations
 */
val scalaBenv: ExecutionEnvironment

/**
 * Streaming execution environment for DataStream API operations
 */
val scalaSenv: StreamExecutionEnvironment

/**
 * Remote batch environment (internal)
 */
private val remoteBenv: ScalaShellEnvironment

/**
 * Remote streaming environment (internal)
 */
private val remoteSenv: ScalaShellStreamEnvironment

Usage in REPL:

// Batch operations using benv
val dataset = benv.readTextFile("/path/to/input.txt")
dataset.flatMap(_.split("\\s+")).writeAsText("/path/to/output")
benv.execute("Word Count Batch Job")

// Streaming operations using senv
val stream = senv.fromElements("hello", "world", "flink")
stream.flatMap(_.split("\\s+"))
  .map((_, 1))
  .keyBy(0)
  .sum(1)
  .print()
senv.execute("Word Count Stream Job")

Interpreter Configuration

Sets up the Scala interpreter with Flink-specific imports and environment bindings.

/**
 * Creates and configures the Scala interpreter with Flink imports and environments
 * Automatically imports common Flink packages and binds execution environments
 */
override def createInterpreter(): Unit

Automatically Imported Packages:

  • org.apache.flink.core.fs._
  • org.apache.flink.api.scala._
  • org.apache.flink.streaming.api.scala._
  • org.apache.flink.table.api._
  • org.apache.flink.table.api.bridge.scala._
  • org.apache.flink.api.common.functions._
  • org.apache.flink.types.Row
  • And many more Flink API packages

Pre-bound Variables:

  • benv - Batch ExecutionEnvironment
  • senv - Stream ExecutionEnvironment

JAR Compilation and Packaging

Compiles and packages REPL-generated classes for cluster execution.

/**
 * Packages compiled classes from the current shell session into a JAR file
 * for execution on a Flink cluster
 * @return File path to the created JAR file
 */
def writeFilesToDisk(): File

Usage Example:

// After defining functions and classes in the REPL
val jarFile = writeFilesToDisk()
println(s"Created JAR: ${jarFile.getAbsolutePath}")

External JAR Management

Handles external JAR dependencies for extended functionality.

/**
 * Returns array of external JAR file paths
 * @return Array of JAR paths, empty array if none specified
 */
def getExternalJars(): Array[String]

Welcome Message and Help

Displays informative welcome message with usage examples and environment information.

/**
 * Displays ASCII art welcome message and usage examples
 * Shows available environment variables and sample code snippets
 */
override def printWelcome(): Unit

Welcome Message Content:

  • ASCII art Flink logo
  • Environment variable descriptions (benv, senv)
  • DataStream API examples
  • Table API examples
  • DataSet API examples (legacy)

Temporary File Management

Manages temporary directories and files for compiled classes and JAR packaging.

/**
 * Base temporary directory for shell operations
 */
private val tmpDirBase: File

/**
 * Directory for compiled shell commands
 */
private val tmpDirShell: File

/**
 * JAR file for packaged shell commands
 */
private val tmpJarShell: File

Pre-imported Packages

The REPL automatically imports comprehensive Flink API packages:

Core Filesystem and I/O

import org.apache.flink.core.fs._
import org.apache.flink.core.fs.local._
import org.apache.flink.api.common.io._

Common API Components

import org.apache.flink.api.common.aggregators._
import org.apache.flink.api.common.accumulators._
import org.apache.flink.api.common.distributions._
import org.apache.flink.api.common.operators._
import org.apache.flink.api.common.functions._

Scala APIs

import org.apache.flink.api.scala._
import org.apache.flink.api.scala.utils._
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time._

Table API and SQL

import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.connector.ChangelogMode
import org.apache.flink.table.functions._
import org.apache.flink.types.Row

Java API Components

import org.apache.flink.api.java.io._
import org.apache.flink.api.java.aggregation._
import org.apache.flink.api.java.functions._
import org.apache.flink.api.java.operators._
import org.apache.flink.api.java.sampling._

Usage Patterns

DataStream API Development

// Create a stream from elements
val dataStream = senv.fromElements(1, 2, 3, 4, 5)

// Apply transformations
val processed = dataStream
  .filter(_ > 2)
  .map(_ * 2)
  .keyBy(identity)
  .sum(0)

// Execute and collect results
processed.executeAndCollect().foreach(println)

Table API Development

// Create table environment
val tenv = StreamTableEnvironment.create(senv)

// Create table from values
val table = tenv.fromValues(
  row("Alice", 25),
  row("Bob", 30),
  row("Charlie", 35)
).as("name", "age")

// SQL queries
val result = tenv.sqlQuery("SELECT name, age FROM " + table + " WHERE age > 28")
result.execute().print()

Batch Processing (DataSet API)

// Read from file
val dataset = benv.readTextFile("/path/to/input.txt")

// Word count example
val counts = dataset
  .flatMap(_.toLowerCase.split("\\W+"))
  .filter(_.nonEmpty)
  .map((_, 1))
  .groupBy(0)
  .sum(1)

// Write results
counts.writeAsCsv("/path/to/output.csv")
benv.execute("Word Count Job")

Custom Function Development

// Define custom function in REPL
class MyMapFunction extends MapFunction[String, String] {
  override def map(value: String): String = {
    value.toUpperCase + "!"
  }
}

// Use custom function
val stream = senv.fromElements("hello", "world")
stream.map(new MyMapFunction()).print()
senv.execute("Custom Function Job")

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-scala-shell-2-11

docs

command-line-interface.md

configuration.md

index.md

interactive-repl.md

tile.json