0
# Flink Scala Shell
1
2
Flink Scala Shell is an interactive REPL environment for Apache Flink that enables developers to interactively develop, test, and experiment with Flink applications written in Scala. It provides access to both DataStream and DataSet APIs with support for local, remote, and YARN cluster execution modes.
3
4
## Package Information
5
6
- **Package Name**: flink-scala-shell_2.11
7
- **Package Type**: maven
8
- **Language**: Scala
9
- **Version**: 1.14.6
10
- **Installation**: Add as Maven dependency or use as part of Flink distribution
11
12
```xml
13
<dependency>
14
<groupId>org.apache.flink</groupId>
15
<artifactId>flink-scala-shell_2.11</artifactId>
16
<version>1.14.6</version>
17
</dependency>
18
```
19
20
## Core Imports
21
22
The shell automatically provides these pre-imported packages and environment variables:
23
24
```scala
25
// Core Flink APIs are pre-imported:
26
import org.apache.flink.api.scala._
27
import org.apache.flink.streaming.api.scala._
28
import org.apache.flink.table.api._
29
import org.apache.flink.table.api.bridge.scala._
30
import org.apache.flink.configuration.Configuration
31
import org.apache.flink.client.program.ClusterClient
32
import org.apache.flink.annotation.Internal
33
import java.io.{BufferedReader, File}
34
import scala.tools.nsc.interpreter.{ILoop, JPrintWriter}
35
36
// Environment variables are pre-configured:
37
// benv: ExecutionEnvironment (for DataSet API)
38
// senv: StreamExecutionEnvironment (for DataStream API)
39
```
40
41
## Basic Usage
42
43
### Starting the Shell
44
45
```bash
46
# Local cluster mode
47
start-scala-shell.sh local
48
49
# Remote cluster mode
50
start-scala-shell.sh remote <host> <port>
51
52
# YARN cluster mode
53
start-scala-shell.sh yarn
54
```
55
56
### Interactive Development
57
58
```scala
59
// DataStream API - Streaming operations
60
val dataStream = senv.fromElements(1, 2, 3, 4)
61
dataStream
62
.countWindowAll(2)
63
.sum(0)
64
.executeAndCollect()
65
.foreach(println)
66
67
// Table API - SQL operations
68
val tenv = StreamTableEnvironment.create(senv)
69
val table = tenv.fromValues(row("Alice", 1), row("Bob", 2)).as("name", "score")
70
table
71
.groupBy($"name")
72
.select($"name", $"score".sum)
73
.execute()
74
.print()
75
76
// DataSet API - Batch operations (legacy)
77
val dataSet = benv.readTextFile("/path/to/data")
78
dataSet.writeAsText("/path/to/output")
79
benv.execute("My batch program")
80
```
81
82
## Architecture
83
84
The Flink Scala Shell consists of several key components:
85
86
- **FlinkShell**: Main entry point handling command-line parsing and cluster connection
87
- **FlinkILoop**: Interactive REPL extending Scala's ILoop with Flink-specific functionality
88
- **Execution Modes**: Support for local, remote, and YARN cluster execution
89
- **Environment Setup**: Automatic configuration of batch (benv) and streaming (senv) environments
90
- **JAR Packaging**: Compilation and packaging of REPL code for cluster execution
91
92
## Capabilities
93
94
### Command-Line Interface
95
96
Core command-line interface for starting the shell with different execution modes and configuration options.
97
98
```scala { .api }
99
object FlinkShell {
100
def main(args: Array[String]): Unit
101
def startShell(config: Config): Unit
102
@Internal def ensureYarnConfig(config: Config): YarnConfig
103
@Internal def fetchConnectionInfo(config: Config, flinkConfig: Configuration): (Configuration, Option[ClusterClient[_]])
104
def parseArgList(config: Config, mode: String): Array[String]
105
}
106
```
107
108
[Command-Line Interface](./command-line-interface.md)
109
110
### Interactive REPL Environment
111
112
Interactive Scala REPL environment with Flink-specific functionality and pre-configured execution environments.
113
114
```scala { .api }
115
class FlinkILoop(
116
val flinkConfig: Configuration,
117
val externalJars: Option[Array[String]],
118
in0: Option[BufferedReader],
119
out0: JPrintWriter
120
) extends ILoop(in0, out0) {
121
val scalaBenv: ExecutionEnvironment
122
val scalaSenv: StreamExecutionEnvironment
123
124
def this(flinkConfig: Configuration, externalJars: Option[Array[String]], in0: BufferedReader, out: JPrintWriter)
125
def this(flinkConfig: Configuration, externalJars: Option[Array[String]])
126
def this(flinkConfig: Configuration, in0: BufferedReader, out: JPrintWriter)
127
128
override def createInterpreter(): Unit
129
def writeFilesToDisk(): File
130
override def printWelcome(): Unit
131
def getExternalJars(): Array[String]
132
}
133
```
134
135
[Interactive REPL Environment](./interactive-repl.md)
136
137
### Configuration Management
138
139
Configuration system for managing cluster connections, execution modes, and YARN settings.
140
141
```scala { .api }
142
case class Config(
143
host: Option[String] = None,
144
port: Option[Int] = None,
145
externalJars: Option[Array[String]] = None,
146
executionMode: ExecutionMode.Value = ExecutionMode.UNDEFINED,
147
yarnConfig: Option[YarnConfig] = None,
148
configDir: Option[String] = None
149
)
150
151
case class YarnConfig(
152
jobManagerMemory: Option[String] = None,
153
name: Option[String] = None,
154
queue: Option[String] = None,
155
slots: Option[Int] = None,
156
taskManagerMemory: Option[String] = None
157
)
158
159
object ExecutionMode extends Enumeration {
160
val UNDEFINED, LOCAL, REMOTE, YARN = Value
161
}
162
```
163
164
[Configuration Management](./configuration.md)
165
166
## Types
167
168
```scala { .api }
169
object ExecutionMode extends Enumeration {
170
val UNDEFINED: ExecutionMode.Value
171
val LOCAL: ExecutionMode.Value
172
val REMOTE: ExecutionMode.Value
173
val YARN: ExecutionMode.Value
174
}
175
176
case class Config(
177
host: Option[String],
178
port: Option[Int],
179
externalJars: Option[Array[String]],
180
executionMode: ExecutionMode.Value,
181
yarnConfig: Option[YarnConfig],
182
configDir: Option[String]
183
)
184
185
case class YarnConfig(
186
jobManagerMemory: Option[String],
187
name: Option[String],
188
queue: Option[String],
189
slots: Option[Int],
190
taskManagerMemory: Option[String]
191
)
192
```