0
# Interactive REPL Environment
1
2
The FlinkILoop class provides an interactive Scala REPL environment with Flink-specific functionality, pre-configured execution environments, and automatic imports for seamless development experience.
3
4
## Imports
5
6
```scala
7
import org.apache.flink.configuration.Configuration
8
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
9
import org.apache.flink.api.java.{ScalaShellEnvironment, ScalaShellStreamEnvironment}
10
import java.io.{BufferedReader, File}
11
import scala.tools.nsc.interpreter.{ILoop, JPrintWriter}
12
```
13
14
## Capabilities
15
16
### REPL Initialization
17
18
Creates and configures the interactive Scala interpreter with Flink environments.
19
20
```scala { .api }
21
/**
22
* FlinkILoop primary constructor with full configuration
23
* @param flinkConfig Flink cluster configuration
24
* @param externalJars Optional array of external JAR file paths
25
* @param in0 Optional buffered reader for input (primarily for testing)
26
* @param out0 Print writer for output
27
*/
28
class FlinkILoop(
29
val flinkConfig: Configuration,
30
val externalJars: Option[Array[String]],
31
in0: Option[BufferedReader],
32
out0: JPrintWriter
33
) extends ILoop(in0, out0) {
34
35
/**
36
* Auxiliary constructor with explicit I/O streams
37
* @param flinkConfig Flink cluster configuration
38
* @param externalJars Optional array of external JAR file paths
39
* @param in0 Buffered reader for input
40
* @param out Print writer for output
41
*/
42
def this(
43
flinkConfig: Configuration,
44
externalJars: Option[Array[String]],
45
in0: BufferedReader,
46
out: JPrintWriter
47
)
48
49
/**
50
* Auxiliary constructor using standard console I/O
51
* @param flinkConfig Flink cluster configuration
52
* @param externalJars Optional array of external JAR file paths
53
*/
54
def this(flinkConfig: Configuration, externalJars: Option[Array[String]])
55
56
/**
57
* Auxiliary constructor without external JARs
58
* @param flinkConfig Flink cluster configuration
59
* @param in0 Buffered reader for input
60
* @param out Print writer for output
61
*/
62
def this(flinkConfig: Configuration, in0: BufferedReader, out: JPrintWriter)
63
}
64
```
65
66
### Execution Environments
67
68
Pre-configured Flink execution environments available as REPL variables.
69
70
```scala { .api }
71
/**
72
* Batch execution environment for DataSet API operations
73
*/
74
val scalaBenv: ExecutionEnvironment
75
76
/**
77
* Streaming execution environment for DataStream API operations
78
*/
79
val scalaSenv: StreamExecutionEnvironment
80
81
/**
82
* Remote batch environment (internal)
83
*/
84
private val remoteBenv: ScalaShellEnvironment
85
86
/**
87
* Remote streaming environment (internal)
88
*/
89
private val remoteSenv: ScalaShellStreamEnvironment
90
```
91
92
**Usage in REPL:**
93
```scala
94
// Batch operations using benv
95
val dataset = benv.readTextFile("/path/to/input.txt")
96
dataset.flatMap(_.split("\\s+")).writeAsText("/path/to/output")
97
benv.execute("Word Count Batch Job")
98
99
// Streaming operations using senv
100
val stream = senv.fromElements("hello", "world", "flink")
101
stream.flatMap(_.split("\\s+"))
102
.map((_, 1))
103
.keyBy(0)
104
.sum(1)
105
.print()
106
senv.execute("Word Count Stream Job")
107
```
108
109
### Interpreter Configuration
110
111
Sets up the Scala interpreter with Flink-specific imports and environment bindings.
112
113
```scala { .api }
114
/**
115
* Creates and configures the Scala interpreter with Flink imports and environments
116
* Automatically imports common Flink packages and binds execution environments
117
*/
118
override def createInterpreter(): Unit
119
```
120
121
**Automatically Imported Packages:**
122
- `org.apache.flink.core.fs._`
123
- `org.apache.flink.api.scala._`
124
- `org.apache.flink.streaming.api.scala._`
125
- `org.apache.flink.table.api._`
126
- `org.apache.flink.table.api.bridge.scala._`
127
- `org.apache.flink.api.common.functions._`
128
- `org.apache.flink.types.Row`
129
- And many more Flink API packages
130
131
**Pre-bound Variables:**
132
- `benv` - Batch ExecutionEnvironment
133
- `senv` - Stream ExecutionEnvironment
134
135
### JAR Compilation and Packaging
136
137
Compiles and packages REPL-generated classes for cluster execution.
138
139
```scala { .api }
140
/**
141
* Packages compiled classes from the current shell session into a JAR file
142
* for execution on a Flink cluster
143
* @return File path to the created JAR file
144
*/
145
def writeFilesToDisk(): File
146
```
147
148
**Usage Example:**
149
```scala
150
// After defining functions and classes in the REPL
151
val jarFile = writeFilesToDisk()
152
println(s"Created JAR: ${jarFile.getAbsolutePath}")
153
```
154
155
### External JAR Management
156
157
Handles external JAR dependencies for extended functionality.
158
159
```scala { .api }
160
/**
161
* Returns array of external JAR file paths
162
* @return Array of JAR paths, empty array if none specified
163
*/
164
def getExternalJars(): Array[String]
165
```
166
167
### Welcome Message and Help
168
169
Displays informative welcome message with usage examples and environment information.
170
171
```scala { .api }
172
/**
173
* Displays ASCII art welcome message and usage examples
174
* Shows available environment variables and sample code snippets
175
*/
176
override def printWelcome(): Unit
177
```
178
179
**Welcome Message Content:**
180
- ASCII art Flink logo
181
- Environment variable descriptions (`benv`, `senv`)
182
- DataStream API examples
183
- Table API examples
184
- DataSet API examples (legacy)
185
186
### Temporary File Management
187
188
Manages temporary directories and files for compiled classes and JAR packaging.
189
190
```scala { .api }
191
/**
192
* Base temporary directory for shell operations
193
*/
194
private val tmpDirBase: File
195
196
/**
197
* Directory for compiled shell commands
198
*/
199
private val tmpDirShell: File
200
201
/**
202
* JAR file for packaged shell commands
203
*/
204
private val tmpJarShell: File
205
```
206
207
## Pre-imported Packages
208
209
The REPL automatically imports comprehensive Flink API packages:
210
211
### Core Filesystem and I/O
212
```scala
213
import org.apache.flink.core.fs._
214
import org.apache.flink.core.fs.local._
215
import org.apache.flink.api.common.io._
216
```
217
218
### Common API Components
219
```scala
220
import org.apache.flink.api.common.aggregators._
221
import org.apache.flink.api.common.accumulators._
222
import org.apache.flink.api.common.distributions._
223
import org.apache.flink.api.common.operators._
224
import org.apache.flink.api.common.functions._
225
```
226
227
### Scala APIs
228
```scala
229
import org.apache.flink.api.scala._
230
import org.apache.flink.api.scala.utils._
231
import org.apache.flink.streaming.api.scala._
232
import org.apache.flink.streaming.api.windowing.time._
233
```
234
235
### Table API and SQL
236
```scala
237
import org.apache.flink.table.api._
238
import org.apache.flink.table.api.bridge.scala._
239
import org.apache.flink.table.connector.ChangelogMode
240
import org.apache.flink.table.functions._
241
import org.apache.flink.types.Row
242
```
243
244
### Java API Components
245
```scala
246
import org.apache.flink.api.java.io._
247
import org.apache.flink.api.java.aggregation._
248
import org.apache.flink.api.java.functions._
249
import org.apache.flink.api.java.operators._
250
import org.apache.flink.api.java.sampling._
251
```
252
253
## Usage Patterns
254
255
### DataStream API Development
256
```scala
257
// Create a stream from elements
258
val dataStream = senv.fromElements(1, 2, 3, 4, 5)
259
260
// Apply transformations
261
val processed = dataStream
262
.filter(_ > 2)
263
.map(_ * 2)
264
.keyBy(identity)
265
.sum(0)
266
267
// Execute and collect results
268
processed.executeAndCollect().foreach(println)
269
```
270
271
### Table API Development
272
```scala
273
// Create table environment
274
val tenv = StreamTableEnvironment.create(senv)
275
276
// Create table from values
277
val table = tenv.fromValues(
278
row("Alice", 25),
279
row("Bob", 30),
280
row("Charlie", 35)
281
).as("name", "age")
282
283
// SQL queries
284
val result = tenv.sqlQuery("SELECT name, age FROM " + table + " WHERE age > 28")
285
result.execute().print()
286
```
287
288
### Batch Processing (DataSet API)
289
```scala
290
// Read from file
291
val dataset = benv.readTextFile("/path/to/input.txt")
292
293
// Word count example
294
val counts = dataset
295
.flatMap(_.toLowerCase.split("\\W+"))
296
.filter(_.nonEmpty)
297
.map((_, 1))
298
.groupBy(0)
299
.sum(1)
300
301
// Write results
302
counts.writeAsCsv("/path/to/output.csv")
303
benv.execute("Word Count Job")
304
```
305
306
### Custom Function Development
307
```scala
308
// Define custom function in REPL
309
class MyMapFunction extends MapFunction[String, String] {
310
override def map(value: String): String = {
311
value.toUpperCase + "!"
312
}
313
}
314
315
// Use custom function
316
val stream = senv.fromElements("hello", "world")
317
stream.map(new MyMapFunction()).print()
318
senv.execute("Custom Function Job")
319
```