Apache Flink runtime engine providing core distributed streaming dataflow execution, task scheduling, state management, and fault tolerance capabilities.
—
The Mini Cluster provides an embedded Flink cluster implementation designed for testing, development, and local execution scenarios. It allows you to run Flink jobs locally without setting up a full distributed cluster, making it ideal for unit tests, integration tests, and rapid development cycles.
The main class for creating and managing an embedded Flink cluster with configurable resources and services.
public class MiniCluster {
public MiniCluster();
public MiniCluster(MiniClusterConfiguration config);
@Deprecated
public MiniCluster(Configuration config);
@Deprecated
public MiniCluster(Configuration config, boolean singleRpcService);
public void start() throws Exception;
public void close();
public void runDetached(JobGraph job) throws JobExecutionException;
public JobExecutionResult runJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException;
public boolean isRunning();
public Configuration getConfiguration();
}Configuration class that defines the setup and resource allocation for the mini cluster.
public class MiniClusterConfiguration {
public static class Builder {
public Builder setNumTaskManagers(int numTaskManagers);
public Builder setNumSlotsPerTaskManager(int numSlotsPerTaskManager);
public Builder setRpcServiceSharing(RpcServiceSharing rpcServiceSharing);
public Builder setConfiguration(Configuration configuration);
public MiniClusterConfiguration build();
}
public int getNumTaskManagers();
public int getNumSlotsPerTaskManager();
public RpcServiceSharing getRpcServiceSharing();
public Configuration getConfiguration();
}Enumeration defining RPC service sharing strategies in the mini cluster.
public enum RpcServiceSharing {
SHARED, // Single RPC service shared across all components
DEDICATED; // Separate RPC service for each component
}Submits a job for execution without waiting for completion. The job runs asynchronously in the background.
/**
* Starts a Flink job in detached mode. The method returns immediately after job submission.
* The job continues to run asynchronously in the cluster.
*
* @param job The Flink job to execute
* @throws JobExecutionException Thrown if anything went amiss during initial job launch,
* or if the job terminally failed.
*/
public void runDetached(JobGraph job) throws JobExecutionException;Submits a job and waits for its completion, returning the execution result.
/**
* Starts a Flink job and waits until it completes or fails.
*
* @param job The Flink job to execute
* @return The result of the job execution
* @throws JobExecutionException Thrown if anything went amiss during initial job launch,
* or if the job terminally failed.
* @throws InterruptedException Thrown if the thread waiting for the job result is interrupted
*/
public JobExecutionResult runJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException;import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.runtime.jobgraph.JobGraph;
// Create basic mini cluster configuration
MiniClusterConfiguration config = new MiniClusterConfiguration.Builder()
.setNumTaskManagers(2)
.setNumSlotsPerTaskManager(4)
.build();
// Start the mini cluster
MiniCluster miniCluster = new MiniCluster(config);
miniCluster.start();
try {
// Submit and execute job synchronously
JobGraph jobGraph = createJobGraph();
JobExecutionResult result = miniCluster.runJobBlocking(jobGraph);
System.out.println("Job completed successfully in " + result.getNetRuntime() + " ms");
} finally {
// Always clean up
miniCluster.close();
}import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
// Create advanced configuration with custom Flink settings
Configuration flinkConfig = new Configuration();
flinkConfig.setString("taskmanager.memory.segment-size", "32768");
flinkConfig.setInteger("parallelism.default", 4);
// Build mini cluster with custom configuration
MiniClusterConfiguration config = new MiniClusterConfiguration.Builder()
.setNumTaskManagers(2)
.setNumSlotsPerTaskManager(8)
.setRpcServiceSharing(RpcServiceSharing.SHARED)
.setConfiguration(flinkConfig)
.build();
MiniCluster miniCluster = new MiniCluster(config);
miniCluster.start();
// Submit job for detached execution
JobGraph jobGraph = createAsyncJobGraph();
miniCluster.runDetached(jobGraph);
// Continue with other work while job runs...import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
public class FlinkJobTest {
private MiniCluster miniCluster;
@BeforeEach
public void setup() throws Exception {
Configuration config = new Configuration();
config.setString("execution.checkpointing.mode", "EXACTLY_ONCE");
config.setLong("execution.checkpointing.interval", 1000L);
MiniClusterConfiguration clusterConfig = new MiniClusterConfiguration.Builder()
.setNumTaskManagers(1)
.setNumSlotsPerTaskManager(4)
.setConfiguration(config)
.build();
miniCluster = new MiniCluster(clusterConfig);
miniCluster.start();
}
@AfterEach
public void teardown() throws Exception {
if (miniCluster != null) {
miniCluster.close();
}
}
@Test
public void testJobExecution() throws Exception {
// Create test job
JobGraph jobGraph = createTestJobGraph();
// Submit job and wait for completion
JobExecutionResult result = miniCluster.runJobBlocking(jobGraph);
// Verify successful execution
assertThat(result.getNetRuntime()).isGreaterThan(0);
// Verify accumulator results
Map<String, Object> accumulators = result.getAllAccumulatorResults();
assertThat(accumulators.get("records-processed")).isEqualTo(1000L);
}
@Test
public void testDetachedExecution() throws Exception {
JobGraph jobGraph = createLongRunningJobGraph();
// Submit job for detached execution
miniCluster.runDetached(jobGraph);
// Job is now running asynchronously
assertTrue(miniCluster.isRunning());
// Continue with other test logic...
}
}// Create mini cluster with default configuration:
// - One JobManager
// - One TaskManager
// - One task slot per TaskManager
// - Shared RPC service
MiniCluster miniCluster = new MiniCluster();
miniCluster.start();
try {
JobGraph simpleJob = createSimpleJobGraph();
JobExecutionResult result = miniCluster.runJobBlocking(simpleJob);
System.out.println("Simple job completed: " + result.getNetRuntime() + " ms");
} finally {
miniCluster.close();
}// Configure mini cluster for resource-constrained environments
Configuration resourceConfig = new Configuration();
resourceConfig.setString("taskmanager.memory.process.size", "512m");
resourceConfig.setString("taskmanager.memory.jvm-metaspace.size", "64m");
resourceConfig.setInteger("taskmanager.numberOfTaskSlots", 2);
MiniClusterConfiguration config = new MiniClusterConfiguration.Builder()
.setNumTaskManagers(1)
.setNumSlotsPerTaskManager(2)
.setConfiguration(resourceConfig)
.build();
MiniCluster miniCluster = new MiniCluster(config);
miniCluster.start();
// Submit resource-conscious job
JobGraph lightweightJob = createLightweightJobGraph();
JobExecutionResult result = miniCluster.runJobBlocking(lightweightJob);// Shared RPC service (recommended for most testing scenarios)
MiniClusterConfiguration sharedConfig = new MiniClusterConfiguration.Builder()
.setRpcServiceSharing(RpcServiceSharing.SHARED)
.setNumTaskManagers(3)
.setNumSlotsPerTaskManager(2)
.build();
// Dedicated RPC services (for testing RPC isolation)
MiniClusterConfiguration dedicatedConfig = new MiniClusterConfiguration.Builder()
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
.setNumTaskManagers(2)
.setNumSlotsPerTaskManager(4)
.build();Configuration flinkConfig = new Configuration();
// Checkpointing configuration
flinkConfig.setString("execution.checkpointing.mode", "EXACTLY_ONCE");
flinkConfig.setLong("execution.checkpointing.interval", 5000L);
flinkConfig.setString("state.backend", "filesystem");
flinkConfig.setString("state.checkpoints.dir", "file:///tmp/test-checkpoints");
// Memory configuration
flinkConfig.setString("taskmanager.memory.process.size", "1024m");
flinkConfig.setString("taskmanager.memory.managed.fraction", "0.4");
// Networking configuration
flinkConfig.setString("taskmanager.network.memory.fraction", "0.1");
flinkConfig.setInteger("taskmanager.network.numberOfBuffers", 2048);
MiniClusterConfiguration config = new MiniClusterConfiguration.Builder()
.setConfiguration(flinkConfig)
.setNumTaskManagers(2)
.setNumSlotsPerTaskManager(4)
.build();Exception thrown during job execution failures.
public class JobExecutionException extends FlinkException {
public JobExecutionException(JobID jobId, String msg);
public JobExecutionException(JobID jobId, String msg, Throwable cause);
public JobID getJobID();
}Result object containing job execution information and accumulated results.
public class JobExecutionResult implements Serializable {
public JobID getJobID();
public long getNetRuntime();
public Map<String, Object> getAllAccumulatorResults();
public <T> T getAccumulatorResult(String accumulatorName);
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-runtime-2-10