0
# Mini Cluster (Testing/Embedded)
1
2
The Mini Cluster provides an embedded Flink cluster implementation designed for testing, development, and local execution scenarios. It allows you to run Flink jobs locally without setting up a full distributed cluster, making it ideal for unit tests, integration tests, and rapid development cycles.
3
4
## Core Components
5
6
### MiniCluster
7
8
The main class for creating and managing an embedded Flink cluster with configurable resources and services.
9
10
```java { .api }
11
public class MiniCluster {
12
public MiniCluster();
13
public MiniCluster(MiniClusterConfiguration config);
14
@Deprecated
15
public MiniCluster(Configuration config);
16
@Deprecated
17
public MiniCluster(Configuration config, boolean singleRpcService);
18
19
public void start() throws Exception;
20
public void close();
21
22
public void runDetached(JobGraph job) throws JobExecutionException;
23
public JobExecutionResult runJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException;
24
25
public boolean isRunning();
26
public Configuration getConfiguration();
27
}
28
```
29
30
### MiniClusterConfiguration
31
32
Configuration class that defines the setup and resource allocation for the mini cluster.
33
34
```java { .api }
35
public class MiniClusterConfiguration {
36
public static class Builder {
37
public Builder setNumTaskManagers(int numTaskManagers);
38
public Builder setNumSlotsPerTaskManager(int numSlotsPerTaskManager);
39
public Builder setRpcServiceSharing(RpcServiceSharing rpcServiceSharing);
40
public Builder setConfiguration(Configuration configuration);
41
42
public MiniClusterConfiguration build();
43
}
44
45
public int getNumTaskManagers();
46
public int getNumSlotsPerTaskManager();
47
public RpcServiceSharing getRpcServiceSharing();
48
public Configuration getConfiguration();
49
}
50
```
51
52
### RpcServiceSharing
53
54
Enumeration defining RPC service sharing strategies in the mini cluster.
55
56
```java { .api }
57
public enum RpcServiceSharing {
58
SHARED, // Single RPC service shared across all components
59
DEDICATED; // Separate RPC service for each component
60
}
61
```
62
63
## Job Execution Methods
64
65
### Detached Execution
66
67
Submits a job for execution without waiting for completion. The job runs asynchronously in the background.
68
69
```java { .api }
70
/**
71
* Starts a Flink job in detached mode. The method returns immediately after job submission.
72
* The job continues to run asynchronously in the cluster.
73
*
74
* @param job The Flink job to execute
75
* @throws JobExecutionException Thrown if anything went amiss during initial job launch,
76
* or if the job terminally failed.
77
*/
78
public void runDetached(JobGraph job) throws JobExecutionException;
79
```
80
81
### Blocking Execution
82
83
Submits a job and waits for its completion, returning the execution result.
84
85
```java { .api }
86
/**
87
* Starts a Flink job and waits until it completes or fails.
88
*
89
* @param job The Flink job to execute
90
* @return The result of the job execution
91
* @throws JobExecutionException Thrown if anything went amiss during initial job launch,
92
* or if the job terminally failed.
93
* @throws InterruptedException Thrown if the thread waiting for the job result is interrupted
94
*/
95
public JobExecutionResult runJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException;
96
```
97
98
## Usage Examples
99
100
### Basic Mini Cluster Setup
101
102
```java
103
import org.apache.flink.runtime.minicluster.MiniCluster;
104
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
105
import org.apache.flink.api.common.JobExecutionResult;
106
import org.apache.flink.runtime.jobgraph.JobGraph;
107
108
// Create basic mini cluster configuration
109
MiniClusterConfiguration config = new MiniClusterConfiguration.Builder()
110
.setNumTaskManagers(2)
111
.setNumSlotsPerTaskManager(4)
112
.build();
113
114
// Start the mini cluster
115
MiniCluster miniCluster = new MiniCluster(config);
116
miniCluster.start();
117
118
try {
119
// Submit and execute job synchronously
120
JobGraph jobGraph = createJobGraph();
121
JobExecutionResult result = miniCluster.runJobBlocking(jobGraph);
122
123
System.out.println("Job completed successfully in " + result.getNetRuntime() + " ms");
124
125
} finally {
126
// Always clean up
127
miniCluster.close();
128
}
129
```
130
131
### Advanced Configuration
132
133
```java
134
import org.apache.flink.configuration.Configuration;
135
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
136
137
// Create advanced configuration with custom Flink settings
138
Configuration flinkConfig = new Configuration();
139
flinkConfig.setString("taskmanager.memory.segment-size", "32768");
140
flinkConfig.setInteger("parallelism.default", 4);
141
142
// Build mini cluster with custom configuration
143
MiniClusterConfiguration config = new MiniClusterConfiguration.Builder()
144
.setNumTaskManagers(2)
145
.setNumSlotsPerTaskManager(8)
146
.setRpcServiceSharing(RpcServiceSharing.SHARED)
147
.setConfiguration(flinkConfig)
148
.build();
149
150
MiniCluster miniCluster = new MiniCluster(config);
151
miniCluster.start();
152
153
// Submit job for detached execution
154
JobGraph jobGraph = createAsyncJobGraph();
155
miniCluster.runDetached(jobGraph);
156
157
// Continue with other work while job runs...
158
```
159
160
### Testing Framework Integration
161
162
```java
163
import org.junit.jupiter.api.AfterEach;
164
import org.junit.jupiter.api.BeforeEach;
165
import org.junit.jupiter.api.Test;
166
167
public class FlinkJobTest {
168
private MiniCluster miniCluster;
169
170
@BeforeEach
171
public void setup() throws Exception {
172
Configuration config = new Configuration();
173
config.setString("execution.checkpointing.mode", "EXACTLY_ONCE");
174
config.setLong("execution.checkpointing.interval", 1000L);
175
176
MiniClusterConfiguration clusterConfig = new MiniClusterConfiguration.Builder()
177
.setNumTaskManagers(1)
178
.setNumSlotsPerTaskManager(4)
179
.setConfiguration(config)
180
.build();
181
182
miniCluster = new MiniCluster(clusterConfig);
183
miniCluster.start();
184
}
185
186
@AfterEach
187
public void teardown() throws Exception {
188
if (miniCluster != null) {
189
miniCluster.close();
190
}
191
}
192
193
@Test
194
public void testJobExecution() throws Exception {
195
// Create test job
196
JobGraph jobGraph = createTestJobGraph();
197
198
// Submit job and wait for completion
199
JobExecutionResult result = miniCluster.runJobBlocking(jobGraph);
200
201
// Verify successful execution
202
assertThat(result.getNetRuntime()).isGreaterThan(0);
203
204
// Verify accumulator results
205
Map<String, Object> accumulators = result.getAllAccumulatorResults();
206
assertThat(accumulators.get("records-processed")).isEqualTo(1000L);
207
}
208
209
@Test
210
public void testDetachedExecution() throws Exception {
211
JobGraph jobGraph = createLongRunningJobGraph();
212
213
// Submit job for detached execution
214
miniCluster.runDetached(jobGraph);
215
216
// Job is now running asynchronously
217
assertTrue(miniCluster.isRunning());
218
219
// Continue with other test logic...
220
}
221
}
222
```
223
224
### Default Constructor Usage
225
226
```java
227
// Create mini cluster with default configuration:
228
// - One JobManager
229
// - One TaskManager
230
// - One task slot per TaskManager
231
// - Shared RPC service
232
MiniCluster miniCluster = new MiniCluster();
233
miniCluster.start();
234
235
try {
236
JobGraph simpleJob = createSimpleJobGraph();
237
JobExecutionResult result = miniCluster.runJobBlocking(simpleJob);
238
239
System.out.println("Simple job completed: " + result.getNetRuntime() + " ms");
240
} finally {
241
miniCluster.close();
242
}
243
```
244
245
### Resource-Constrained Testing
246
247
```java
248
// Configure mini cluster for resource-constrained environments
249
Configuration resourceConfig = new Configuration();
250
resourceConfig.setString("taskmanager.memory.process.size", "512m");
251
resourceConfig.setString("taskmanager.memory.jvm-metaspace.size", "64m");
252
resourceConfig.setInteger("taskmanager.numberOfTaskSlots", 2);
253
254
MiniClusterConfiguration config = new MiniClusterConfiguration.Builder()
255
.setNumTaskManagers(1)
256
.setNumSlotsPerTaskManager(2)
257
.setConfiguration(resourceConfig)
258
.build();
259
260
MiniCluster miniCluster = new MiniCluster(config);
261
miniCluster.start();
262
263
// Submit resource-conscious job
264
JobGraph lightweightJob = createLightweightJobGraph();
265
JobExecutionResult result = miniCluster.runJobBlocking(lightweightJob);
266
```
267
268
## Configuration Patterns
269
270
### Shared vs Dedicated RPC Services
271
272
```java
273
// Shared RPC service (recommended for most testing scenarios)
274
MiniClusterConfiguration sharedConfig = new MiniClusterConfiguration.Builder()
275
.setRpcServiceSharing(RpcServiceSharing.SHARED)
276
.setNumTaskManagers(3)
277
.setNumSlotsPerTaskManager(2)
278
.build();
279
280
// Dedicated RPC services (for testing RPC isolation)
281
MiniClusterConfiguration dedicatedConfig = new MiniClusterConfiguration.Builder()
282
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
283
.setNumTaskManagers(2)
284
.setNumSlotsPerTaskManager(4)
285
.build();
286
```
287
288
### Custom Flink Configuration
289
290
```java
291
Configuration flinkConfig = new Configuration();
292
293
// Checkpointing configuration
294
flinkConfig.setString("execution.checkpointing.mode", "EXACTLY_ONCE");
295
flinkConfig.setLong("execution.checkpointing.interval", 5000L);
296
flinkConfig.setString("state.backend", "filesystem");
297
flinkConfig.setString("state.checkpoints.dir", "file:///tmp/test-checkpoints");
298
299
// Memory configuration
300
flinkConfig.setString("taskmanager.memory.process.size", "1024m");
301
flinkConfig.setString("taskmanager.memory.managed.fraction", "0.4");
302
303
// Networking configuration
304
flinkConfig.setString("taskmanager.network.memory.fraction", "0.1");
305
flinkConfig.setInteger("taskmanager.network.numberOfBuffers", 2048);
306
307
MiniClusterConfiguration config = new MiniClusterConfiguration.Builder()
308
.setConfiguration(flinkConfig)
309
.setNumTaskManagers(2)
310
.setNumSlotsPerTaskManager(4)
311
.build();
312
```
313
314
## Common Types
315
316
### JobExecutionException
317
318
Exception thrown during job execution failures.
319
320
```java { .api }
321
public class JobExecutionException extends FlinkException {
322
public JobExecutionException(JobID jobId, String msg);
323
public JobExecutionException(JobID jobId, String msg, Throwable cause);
324
public JobID getJobID();
325
}
326
```
327
328
### JobExecutionResult
329
330
Result object containing job execution information and accumulated results.
331
332
```java { .api }
333
public class JobExecutionResult implements Serializable {
334
public JobID getJobID();
335
public long getNetRuntime();
336
public Map<String, Object> getAllAccumulatorResults();
337
public <T> T getAccumulatorResult(String accumulatorName);
338
}
339
```