or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-exchange.mdexecution-graph.mdhigh-availability.mdindex.mdjob-management.mdmessage-passing.mdmetrics.mdmini-cluster.mdrpc-framework.mdstate-management.mdtask-execution.md

mini-cluster.mddocs/

0

# Mini Cluster (Testing/Embedded)

1

2

The Mini Cluster provides an embedded Flink cluster implementation designed for testing, development, and local execution scenarios. It allows you to run Flink jobs locally without setting up a full distributed cluster, making it ideal for unit tests, integration tests, and rapid development cycles.

3

4

## Core Components

5

6

### MiniCluster

7

8

The main class for creating and managing an embedded Flink cluster with configurable resources and services.

9

10

```java { .api }

11

public class MiniCluster {

12

public MiniCluster();

13

public MiniCluster(MiniClusterConfiguration config);

14

@Deprecated

15

public MiniCluster(Configuration config);

16

@Deprecated

17

public MiniCluster(Configuration config, boolean singleRpcService);

18

19

public void start() throws Exception;

20

public void close();

21

22

public void runDetached(JobGraph job) throws JobExecutionException;

23

public JobExecutionResult runJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException;

24

25

public boolean isRunning();

26

public Configuration getConfiguration();

27

}

28

```

29

30

### MiniClusterConfiguration

31

32

Configuration class that defines the setup and resource allocation for the mini cluster.

33

34

```java { .api }

35

public class MiniClusterConfiguration {

36

public static class Builder {

37

public Builder setNumTaskManagers(int numTaskManagers);

38

public Builder setNumSlotsPerTaskManager(int numSlotsPerTaskManager);

39

public Builder setRpcServiceSharing(RpcServiceSharing rpcServiceSharing);

40

public Builder setConfiguration(Configuration configuration);

41

42

public MiniClusterConfiguration build();

43

}

44

45

public int getNumTaskManagers();

46

public int getNumSlotsPerTaskManager();

47

public RpcServiceSharing getRpcServiceSharing();

48

public Configuration getConfiguration();

49

}

50

```

51

52

### RpcServiceSharing

53

54

Enumeration defining RPC service sharing strategies in the mini cluster.

55

56

```java { .api }

57

public enum RpcServiceSharing {

58

SHARED, // Single RPC service shared across all components

59

DEDICATED; // Separate RPC service for each component

60

}

61

```

62

63

## Job Execution Methods

64

65

### Detached Execution

66

67

Submits a job for execution without waiting for completion. The job runs asynchronously in the background.

68

69

```java { .api }

70

/**

71

* Starts a Flink job in detached mode. The method returns immediately after job submission.

72

* The job continues to run asynchronously in the cluster.

73

*

74

* @param job The Flink job to execute

75

* @throws JobExecutionException Thrown if anything went amiss during initial job launch,

76

* or if the job terminally failed.

77

*/

78

public void runDetached(JobGraph job) throws JobExecutionException;

79

```

80

81

### Blocking Execution

82

83

Submits a job and waits for its completion, returning the execution result.

84

85

```java { .api }

86

/**

87

* Starts a Flink job and waits until it completes or fails.

88

*

89

* @param job The Flink job to execute

90

* @return The result of the job execution

91

* @throws JobExecutionException Thrown if anything went amiss during initial job launch,

92

* or if the job terminally failed.

93

* @throws InterruptedException Thrown if the thread waiting for the job result is interrupted

94

*/

95

public JobExecutionResult runJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException;

96

```

97

98

## Usage Examples

99

100

### Basic Mini Cluster Setup

101

102

```java

103

import org.apache.flink.runtime.minicluster.MiniCluster;

104

import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;

105

import org.apache.flink.api.common.JobExecutionResult;

106

import org.apache.flink.runtime.jobgraph.JobGraph;

107

108

// Create basic mini cluster configuration

109

MiniClusterConfiguration config = new MiniClusterConfiguration.Builder()

110

.setNumTaskManagers(2)

111

.setNumSlotsPerTaskManager(4)

112

.build();

113

114

// Start the mini cluster

115

MiniCluster miniCluster = new MiniCluster(config);

116

miniCluster.start();

117

118

try {

119

// Submit and execute job synchronously

120

JobGraph jobGraph = createJobGraph();

121

JobExecutionResult result = miniCluster.runJobBlocking(jobGraph);

122

123

System.out.println("Job completed successfully in " + result.getNetRuntime() + " ms");

124

125

} finally {

126

// Always clean up

127

miniCluster.close();

128

}

129

```

130

131

### Advanced Configuration

132

133

```java

134

import org.apache.flink.configuration.Configuration;

135

import org.apache.flink.runtime.minicluster.RpcServiceSharing;

136

137

// Create advanced configuration with custom Flink settings

138

Configuration flinkConfig = new Configuration();

139

flinkConfig.setString("taskmanager.memory.segment-size", "32768");

140

flinkConfig.setInteger("parallelism.default", 4);

141

142

// Build mini cluster with custom configuration

143

MiniClusterConfiguration config = new MiniClusterConfiguration.Builder()

144

.setNumTaskManagers(2)

145

.setNumSlotsPerTaskManager(8)

146

.setRpcServiceSharing(RpcServiceSharing.SHARED)

147

.setConfiguration(flinkConfig)

148

.build();

149

150

MiniCluster miniCluster = new MiniCluster(config);

151

miniCluster.start();

152

153

// Submit job for detached execution

154

JobGraph jobGraph = createAsyncJobGraph();

155

miniCluster.runDetached(jobGraph);

156

157

// Continue with other work while job runs...

158

```

159

160

### Testing Framework Integration

161

162

```java

163

import org.junit.jupiter.api.AfterEach;

164

import org.junit.jupiter.api.BeforeEach;

165

import org.junit.jupiter.api.Test;

166

167

public class FlinkJobTest {

168

private MiniCluster miniCluster;

169

170

@BeforeEach

171

public void setup() throws Exception {

172

Configuration config = new Configuration();

173

config.setString("execution.checkpointing.mode", "EXACTLY_ONCE");

174

config.setLong("execution.checkpointing.interval", 1000L);

175

176

MiniClusterConfiguration clusterConfig = new MiniClusterConfiguration.Builder()

177

.setNumTaskManagers(1)

178

.setNumSlotsPerTaskManager(4)

179

.setConfiguration(config)

180

.build();

181

182

miniCluster = new MiniCluster(clusterConfig);

183

miniCluster.start();

184

}

185

186

@AfterEach

187

public void teardown() throws Exception {

188

if (miniCluster != null) {

189

miniCluster.close();

190

}

191

}

192

193

@Test

194

public void testJobExecution() throws Exception {

195

// Create test job

196

JobGraph jobGraph = createTestJobGraph();

197

198

// Submit job and wait for completion

199

JobExecutionResult result = miniCluster.runJobBlocking(jobGraph);

200

201

// Verify successful execution

202

assertThat(result.getNetRuntime()).isGreaterThan(0);

203

204

// Verify accumulator results

205

Map<String, Object> accumulators = result.getAllAccumulatorResults();

206

assertThat(accumulators.get("records-processed")).isEqualTo(1000L);

207

}

208

209

@Test

210

public void testDetachedExecution() throws Exception {

211

JobGraph jobGraph = createLongRunningJobGraph();

212

213

// Submit job for detached execution

214

miniCluster.runDetached(jobGraph);

215

216

// Job is now running asynchronously

217

assertTrue(miniCluster.isRunning());

218

219

// Continue with other test logic...

220

}

221

}

222

```

223

224

### Default Constructor Usage

225

226

```java

227

// Create mini cluster with default configuration:

228

// - One JobManager

229

// - One TaskManager

230

// - One task slot per TaskManager

231

// - Shared RPC service

232

MiniCluster miniCluster = new MiniCluster();

233

miniCluster.start();

234

235

try {

236

JobGraph simpleJob = createSimpleJobGraph();

237

JobExecutionResult result = miniCluster.runJobBlocking(simpleJob);

238

239

System.out.println("Simple job completed: " + result.getNetRuntime() + " ms");

240

} finally {

241

miniCluster.close();

242

}

243

```

244

245

### Resource-Constrained Testing

246

247

```java

248

// Configure mini cluster for resource-constrained environments

249

Configuration resourceConfig = new Configuration();

250

resourceConfig.setString("taskmanager.memory.process.size", "512m");

251

resourceConfig.setString("taskmanager.memory.jvm-metaspace.size", "64m");

252

resourceConfig.setInteger("taskmanager.numberOfTaskSlots", 2);

253

254

MiniClusterConfiguration config = new MiniClusterConfiguration.Builder()

255

.setNumTaskManagers(1)

256

.setNumSlotsPerTaskManager(2)

257

.setConfiguration(resourceConfig)

258

.build();

259

260

MiniCluster miniCluster = new MiniCluster(config);

261

miniCluster.start();

262

263

// Submit resource-conscious job

264

JobGraph lightweightJob = createLightweightJobGraph();

265

JobExecutionResult result = miniCluster.runJobBlocking(lightweightJob);

266

```

267

268

## Configuration Patterns

269

270

### Shared vs Dedicated RPC Services

271

272

```java

273

// Shared RPC service (recommended for most testing scenarios)

274

MiniClusterConfiguration sharedConfig = new MiniClusterConfiguration.Builder()

275

.setRpcServiceSharing(RpcServiceSharing.SHARED)

276

.setNumTaskManagers(3)

277

.setNumSlotsPerTaskManager(2)

278

.build();

279

280

// Dedicated RPC services (for testing RPC isolation)

281

MiniClusterConfiguration dedicatedConfig = new MiniClusterConfiguration.Builder()

282

.setRpcServiceSharing(RpcServiceSharing.DEDICATED)

283

.setNumTaskManagers(2)

284

.setNumSlotsPerTaskManager(4)

285

.build();

286

```

287

288

### Custom Flink Configuration

289

290

```java

291

Configuration flinkConfig = new Configuration();

292

293

// Checkpointing configuration

294

flinkConfig.setString("execution.checkpointing.mode", "EXACTLY_ONCE");

295

flinkConfig.setLong("execution.checkpointing.interval", 5000L);

296

flinkConfig.setString("state.backend", "filesystem");

297

flinkConfig.setString("state.checkpoints.dir", "file:///tmp/test-checkpoints");

298

299

// Memory configuration

300

flinkConfig.setString("taskmanager.memory.process.size", "1024m");

301

flinkConfig.setString("taskmanager.memory.managed.fraction", "0.4");

302

303

// Networking configuration

304

flinkConfig.setString("taskmanager.network.memory.fraction", "0.1");

305

flinkConfig.setInteger("taskmanager.network.numberOfBuffers", 2048);

306

307

MiniClusterConfiguration config = new MiniClusterConfiguration.Builder()

308

.setConfiguration(flinkConfig)

309

.setNumTaskManagers(2)

310

.setNumSlotsPerTaskManager(4)

311

.build();

312

```

313

314

## Common Types

315

316

### JobExecutionException

317

318

Exception thrown during job execution failures.

319

320

```java { .api }

321

public class JobExecutionException extends FlinkException {

322

public JobExecutionException(JobID jobId, String msg);

323

public JobExecutionException(JobID jobId, String msg, Throwable cause);

324

public JobID getJobID();

325

}

326

```

327

328

### JobExecutionResult

329

330

Result object containing job execution information and accumulated results.

331

332

```java { .api }

333

public class JobExecutionResult implements Serializable {

334

public JobID getJobID();

335

public long getNetRuntime();

336

public Map<String, Object> getAllAccumulatorResults();

337

public <T> T getAccumulatorResult(String accumulatorName);

338

}

339

```