or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

command-line-interface.mdconfiguration.mdindex.mdinteractive-repl.md

interactive-repl.mddocs/

0

# Interactive REPL Environment

1

2

The FlinkILoop class provides an interactive Scala REPL environment with Flink-specific functionality, pre-configured execution environments, and automatic imports for seamless development experience.

3

4

## Imports

5

6

```scala

7

import org.apache.flink.configuration.Configuration

8

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

9

import org.apache.flink.api.java.{ScalaShellEnvironment, ScalaShellStreamEnvironment}

10

import java.io.{BufferedReader, File}

11

import scala.tools.nsc.interpreter.{ILoop, JPrintWriter}

12

```

13

14

## Capabilities

15

16

### REPL Initialization

17

18

Creates and configures the interactive Scala interpreter with Flink environments.

19

20

```scala { .api }

21

/**

22

* FlinkILoop primary constructor with full configuration

23

* @param flinkConfig Flink cluster configuration

24

* @param externalJars Optional array of external JAR file paths

25

* @param in0 Optional buffered reader for input (primarily for testing)

26

* @param out0 Print writer for output

27

*/

28

class FlinkILoop(

29

val flinkConfig: Configuration,

30

val externalJars: Option[Array[String]],

31

in0: Option[BufferedReader],

32

out0: JPrintWriter

33

) extends ILoop(in0, out0) {

34

35

/**

36

* Auxiliary constructor with explicit I/O streams

37

* @param flinkConfig Flink cluster configuration

38

* @param externalJars Optional array of external JAR file paths

39

* @param in0 Buffered reader for input

40

* @param out Print writer for output

41

*/

42

def this(

43

flinkConfig: Configuration,

44

externalJars: Option[Array[String]],

45

in0: BufferedReader,

46

out: JPrintWriter

47

)

48

49

/**

50

* Auxiliary constructor using standard console I/O

51

* @param flinkConfig Flink cluster configuration

52

* @param externalJars Optional array of external JAR file paths

53

*/

54

def this(flinkConfig: Configuration, externalJars: Option[Array[String]])

55

56

/**

57

* Auxiliary constructor without external JARs

58

* @param flinkConfig Flink cluster configuration

59

* @param in0 Buffered reader for input

60

* @param out Print writer for output

61

*/

62

def this(flinkConfig: Configuration, in0: BufferedReader, out: JPrintWriter)

63

}

64

```

65

66

### Execution Environments

67

68

Pre-configured Flink execution environments available as REPL variables.

69

70

```scala { .api }

71

/**

72

* Batch execution environment for DataSet API operations

73

*/

74

val scalaBenv: ExecutionEnvironment

75

76

/**

77

* Streaming execution environment for DataStream API operations

78

*/

79

val scalaSenv: StreamExecutionEnvironment

80

81

/**

82

* Remote batch environment (internal)

83

*/

84

private val remoteBenv: ScalaShellEnvironment

85

86

/**

87

* Remote streaming environment (internal)

88

*/

89

private val remoteSenv: ScalaShellStreamEnvironment

90

```

91

92

**Usage in REPL:**

93

```scala

94

// Batch operations using benv

95

val dataset = benv.readTextFile("/path/to/input.txt")

96

dataset.flatMap(_.split("\\s+")).writeAsText("/path/to/output")

97

benv.execute("Word Count Batch Job")

98

99

// Streaming operations using senv

100

val stream = senv.fromElements("hello", "world", "flink")

101

stream.flatMap(_.split("\\s+"))

102

.map((_, 1))

103

.keyBy(0)

104

.sum(1)

105

.print()

106

senv.execute("Word Count Stream Job")

107

```

108

109

### Interpreter Configuration

110

111

Sets up the Scala interpreter with Flink-specific imports and environment bindings.

112

113

```scala { .api }

114

/**

115

* Creates and configures the Scala interpreter with Flink imports and environments

116

* Automatically imports common Flink packages and binds execution environments

117

*/

118

override def createInterpreter(): Unit

119

```

120

121

**Automatically Imported Packages:**

122

- `org.apache.flink.core.fs._`

123

- `org.apache.flink.api.scala._`

124

- `org.apache.flink.streaming.api.scala._`

125

- `org.apache.flink.table.api._`

126

- `org.apache.flink.table.api.bridge.scala._`

127

- `org.apache.flink.api.common.functions._`

128

- `org.apache.flink.types.Row`

129

- And many more Flink API packages

130

131

**Pre-bound Variables:**

132

- `benv` - Batch ExecutionEnvironment

133

- `senv` - Stream ExecutionEnvironment

134

135

### JAR Compilation and Packaging

136

137

Compiles and packages REPL-generated classes for cluster execution.

138

139

```scala { .api }

140

/**

141

* Packages compiled classes from the current shell session into a JAR file

142

* for execution on a Flink cluster

143

* @return File path to the created JAR file

144

*/

145

def writeFilesToDisk(): File

146

```

147

148

**Usage Example:**

149

```scala

150

// After defining functions and classes in the REPL

151

val jarFile = writeFilesToDisk()

152

println(s"Created JAR: ${jarFile.getAbsolutePath}")

153

```

154

155

### External JAR Management

156

157

Handles external JAR dependencies for extended functionality.

158

159

```scala { .api }

160

/**

161

* Returns array of external JAR file paths

162

* @return Array of JAR paths, empty array if none specified

163

*/

164

def getExternalJars(): Array[String]

165

```

166

167

### Welcome Message and Help

168

169

Displays informative welcome message with usage examples and environment information.

170

171

```scala { .api }

172

/**

173

* Displays ASCII art welcome message and usage examples

174

* Shows available environment variables and sample code snippets

175

*/

176

override def printWelcome(): Unit

177

```

178

179

**Welcome Message Content:**

180

- ASCII art Flink logo

181

- Environment variable descriptions (`benv`, `senv`)

182

- DataStream API examples

183

- Table API examples

184

- DataSet API examples (legacy)

185

186

### Temporary File Management

187

188

Manages temporary directories and files for compiled classes and JAR packaging.

189

190

```scala { .api }

191

/**

192

* Base temporary directory for shell operations

193

*/

194

private val tmpDirBase: File

195

196

/**

197

* Directory for compiled shell commands

198

*/

199

private val tmpDirShell: File

200

201

/**

202

* JAR file for packaged shell commands

203

*/

204

private val tmpJarShell: File

205

```

206

207

## Pre-imported Packages

208

209

The REPL automatically imports comprehensive Flink API packages:

210

211

### Core Filesystem and I/O

212

```scala

213

import org.apache.flink.core.fs._

214

import org.apache.flink.core.fs.local._

215

import org.apache.flink.api.common.io._

216

```

217

218

### Common API Components

219

```scala

220

import org.apache.flink.api.common.aggregators._

221

import org.apache.flink.api.common.accumulators._

222

import org.apache.flink.api.common.distributions._

223

import org.apache.flink.api.common.operators._

224

import org.apache.flink.api.common.functions._

225

```

226

227

### Scala APIs

228

```scala

229

import org.apache.flink.api.scala._

230

import org.apache.flink.api.scala.utils._

231

import org.apache.flink.streaming.api.scala._

232

import org.apache.flink.streaming.api.windowing.time._

233

```

234

235

### Table API and SQL

236

```scala

237

import org.apache.flink.table.api._

238

import org.apache.flink.table.api.bridge.scala._

239

import org.apache.flink.table.connector.ChangelogMode

240

import org.apache.flink.table.functions._

241

import org.apache.flink.types.Row

242

```

243

244

### Java API Components

245

```scala

246

import org.apache.flink.api.java.io._

247

import org.apache.flink.api.java.aggregation._

248

import org.apache.flink.api.java.functions._

249

import org.apache.flink.api.java.operators._

250

import org.apache.flink.api.java.sampling._

251

```

252

253

## Usage Patterns

254

255

### DataStream API Development

256

```scala

257

// Create a stream from elements

258

val dataStream = senv.fromElements(1, 2, 3, 4, 5)

259

260

// Apply transformations

261

val processed = dataStream

262

.filter(_ > 2)

263

.map(_ * 2)

264

.keyBy(identity)

265

.sum(0)

266

267

// Execute and collect results

268

processed.executeAndCollect().foreach(println)

269

```

270

271

### Table API Development

272

```scala

273

// Create table environment

274

val tenv = StreamTableEnvironment.create(senv)

275

276

// Create table from values

277

val table = tenv.fromValues(

278

row("Alice", 25),

279

row("Bob", 30),

280

row("Charlie", 35)

281

).as("name", "age")

282

283

// SQL queries

284

val result = tenv.sqlQuery("SELECT name, age FROM " + table + " WHERE age > 28")

285

result.execute().print()

286

```

287

288

### Batch Processing (DataSet API)

289

```scala

290

// Read from file

291

val dataset = benv.readTextFile("/path/to/input.txt")

292

293

// Word count example

294

val counts = dataset

295

.flatMap(_.toLowerCase.split("\\W+"))

296

.filter(_.nonEmpty)

297

.map((_, 1))

298

.groupBy(0)

299

.sum(1)

300

301

// Write results

302

counts.writeAsCsv("/path/to/output.csv")

303

benv.execute("Word Count Job")

304

```

305

306

### Custom Function Development

307

```scala

308

// Define custom function in REPL

309

class MyMapFunction extends MapFunction[String, String] {

310

override def map(value: String): String = {

311

value.toUpperCase + "!"

312

}

313

}

314

315

// Use custom function

316

val stream = senv.fromElements("hello", "world")

317

stream.map(new MyMapFunction()).print()

318

senv.execute("Custom Function Job")

319

```