or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-flink--flink-scala-shell-2-11

Interactive Scala shell for Apache Flink that provides a REPL environment for developing and testing Flink applications

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-scala-shell_2.11@1.14.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-scala-shell-2-11@1.14.0

0

# Flink Scala Shell

1

2

Flink Scala Shell is an interactive REPL environment for Apache Flink that enables developers to interactively develop, test, and experiment with Flink applications written in Scala. It provides access to both DataStream and DataSet APIs with support for local, remote, and YARN cluster execution modes.

3

4

## Package Information

5

6

- **Package Name**: flink-scala-shell_2.11

7

- **Package Type**: maven

8

- **Language**: Scala

9

- **Version**: 1.14.6

10

- **Installation**: Add as Maven dependency or use as part of Flink distribution

11

12

```xml

13

<dependency>

14

<groupId>org.apache.flink</groupId>

15

<artifactId>flink-scala-shell_2.11</artifactId>

16

<version>1.14.6</version>

17

</dependency>

18

```

19

20

## Core Imports

21

22

The shell automatically provides these pre-imported packages and environment variables:

23

24

```scala

25

// Core Flink APIs are pre-imported:

26

import org.apache.flink.api.scala._

27

import org.apache.flink.streaming.api.scala._

28

import org.apache.flink.table.api._

29

import org.apache.flink.table.api.bridge.scala._

30

import org.apache.flink.configuration.Configuration

31

import org.apache.flink.client.program.ClusterClient

32

import org.apache.flink.annotation.Internal

33

import java.io.{BufferedReader, File}

34

import scala.tools.nsc.interpreter.{ILoop, JPrintWriter}

35

36

// Environment variables are pre-configured:

37

// benv: ExecutionEnvironment (for DataSet API)

38

// senv: StreamExecutionEnvironment (for DataStream API)

39

```

40

41

## Basic Usage

42

43

### Starting the Shell

44

45

```bash

46

# Local cluster mode

47

start-scala-shell.sh local

48

49

# Remote cluster mode

50

start-scala-shell.sh remote <host> <port>

51

52

# YARN cluster mode

53

start-scala-shell.sh yarn

54

```

55

56

### Interactive Development

57

58

```scala

59

// DataStream API - Streaming operations

60

val dataStream = senv.fromElements(1, 2, 3, 4)

61

dataStream

62

.countWindowAll(2)

63

.sum(0)

64

.executeAndCollect()

65

.foreach(println)

66

67

// Table API - SQL operations

68

val tenv = StreamTableEnvironment.create(senv)

69

val table = tenv.fromValues(row("Alice", 1), row("Bob", 2)).as("name", "score")

70

table

71

.groupBy($"name")

72

.select($"name", $"score".sum)

73

.execute()

74

.print()

75

76

// DataSet API - Batch operations (legacy)

77

val dataSet = benv.readTextFile("/path/to/data")

78

dataSet.writeAsText("/path/to/output")

79

benv.execute("My batch program")

80

```

81

82

## Architecture

83

84

The Flink Scala Shell consists of several key components:

85

86

- **FlinkShell**: Main entry point handling command-line parsing and cluster connection

87

- **FlinkILoop**: Interactive REPL extending Scala's ILoop with Flink-specific functionality

88

- **Execution Modes**: Support for local, remote, and YARN cluster execution

89

- **Environment Setup**: Automatic configuration of batch (benv) and streaming (senv) environments

90

- **JAR Packaging**: Compilation and packaging of REPL code for cluster execution

91

92

## Capabilities

93

94

### Command-Line Interface

95

96

Core command-line interface for starting the shell with different execution modes and configuration options.

97

98

```scala { .api }

99

object FlinkShell {

100

def main(args: Array[String]): Unit

101

def startShell(config: Config): Unit

102

@Internal def ensureYarnConfig(config: Config): YarnConfig

103

@Internal def fetchConnectionInfo(config: Config, flinkConfig: Configuration): (Configuration, Option[ClusterClient[_]])

104

def parseArgList(config: Config, mode: String): Array[String]

105

}

106

```

107

108

[Command-Line Interface](./command-line-interface.md)

109

110

### Interactive REPL Environment

111

112

Interactive Scala REPL environment with Flink-specific functionality and pre-configured execution environments.

113

114

```scala { .api }

115

class FlinkILoop(

116

val flinkConfig: Configuration,

117

val externalJars: Option[Array[String]],

118

in0: Option[BufferedReader],

119

out0: JPrintWriter

120

) extends ILoop(in0, out0) {

121

val scalaBenv: ExecutionEnvironment

122

val scalaSenv: StreamExecutionEnvironment

123

124

def this(flinkConfig: Configuration, externalJars: Option[Array[String]], in0: BufferedReader, out: JPrintWriter)

125

def this(flinkConfig: Configuration, externalJars: Option[Array[String]])

126

def this(flinkConfig: Configuration, in0: BufferedReader, out: JPrintWriter)

127

128

override def createInterpreter(): Unit

129

def writeFilesToDisk(): File

130

override def printWelcome(): Unit

131

def getExternalJars(): Array[String]

132

}

133

```

134

135

[Interactive REPL Environment](./interactive-repl.md)

136

137

### Configuration Management

138

139

Configuration system for managing cluster connections, execution modes, and YARN settings.

140

141

```scala { .api }

142

case class Config(

143

host: Option[String] = None,

144

port: Option[Int] = None,

145

externalJars: Option[Array[String]] = None,

146

executionMode: ExecutionMode.Value = ExecutionMode.UNDEFINED,

147

yarnConfig: Option[YarnConfig] = None,

148

configDir: Option[String] = None

149

)

150

151

case class YarnConfig(

152

jobManagerMemory: Option[String] = None,

153

name: Option[String] = None,

154

queue: Option[String] = None,

155

slots: Option[Int] = None,

156

taskManagerMemory: Option[String] = None

157

)

158

159

object ExecutionMode extends Enumeration {

160

val UNDEFINED, LOCAL, REMOTE, YARN = Value

161

}

162

```

163

164

[Configuration Management](./configuration.md)

165

166

## Types

167

168

```scala { .api }

169

object ExecutionMode extends Enumeration {

170

val UNDEFINED: ExecutionMode.Value

171

val LOCAL: ExecutionMode.Value

172

val REMOTE: ExecutionMode.Value

173

val YARN: ExecutionMode.Value

174

}

175

176

case class Config(

177

host: Option[String],

178

port: Option[Int],

179

externalJars: Option[Array[String]],

180

executionMode: ExecutionMode.Value,

181

yarnConfig: Option[YarnConfig],

182

configDir: Option[String]

183

)

184

185

case class YarnConfig(

186

jobManagerMemory: Option[String],

187

name: Option[String],

188

queue: Option[String],

189

slots: Option[Int],

190

taskManagerMemory: Option[String]

191

)

192

```