CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-runtime-2-10

Apache Flink runtime engine providing core distributed streaming dataflow execution, task scheduling, state management, and fault tolerance capabilities.

Pending
Overview
Eval results
Files

mini-cluster.mddocs/

Mini Cluster (Testing/Embedded)

The Mini Cluster provides an embedded Flink cluster implementation designed for testing, development, and local execution scenarios. It allows you to run Flink jobs locally without setting up a full distributed cluster, making it ideal for unit tests, integration tests, and rapid development cycles.

Core Components

MiniCluster

The main class for creating and managing an embedded Flink cluster with configurable resources and services.

public class MiniCluster {
    public MiniCluster();
    public MiniCluster(MiniClusterConfiguration config);
    @Deprecated
    public MiniCluster(Configuration config);
    @Deprecated  
    public MiniCluster(Configuration config, boolean singleRpcService);
    
    public void start() throws Exception;
    public void close();
    
    public void runDetached(JobGraph job) throws JobExecutionException;
    public JobExecutionResult runJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException;
    
    public boolean isRunning();
    public Configuration getConfiguration();
}

MiniClusterConfiguration

Configuration class that defines the setup and resource allocation for the mini cluster.

public class MiniClusterConfiguration {
    public static class Builder {
        public Builder setNumTaskManagers(int numTaskManagers);
        public Builder setNumSlotsPerTaskManager(int numSlotsPerTaskManager);
        public Builder setRpcServiceSharing(RpcServiceSharing rpcServiceSharing);
        public Builder setConfiguration(Configuration configuration);
        
        public MiniClusterConfiguration build();
    }
    
    public int getNumTaskManagers();
    public int getNumSlotsPerTaskManager();
    public RpcServiceSharing getRpcServiceSharing();
    public Configuration getConfiguration();
}

RpcServiceSharing

Enumeration defining RPC service sharing strategies in the mini cluster.

public enum RpcServiceSharing {
    SHARED,      // Single RPC service shared across all components
    DEDICATED;   // Separate RPC service for each component
}

Job Execution Methods

Detached Execution

Submits a job for execution without waiting for completion. The job runs asynchronously in the background.

/**
 * Starts a Flink job in detached mode. The method returns immediately after job submission.
 * The job continues to run asynchronously in the cluster.
 * 
 * @param job The Flink job to execute
 * @throws JobExecutionException Thrown if anything went amiss during initial job launch,
 *         or if the job terminally failed.
 */
public void runDetached(JobGraph job) throws JobExecutionException;

Blocking Execution

Submits a job and waits for its completion, returning the execution result.

/**
 * Starts a Flink job and waits until it completes or fails.
 * 
 * @param job The Flink job to execute
 * @return The result of the job execution
 * @throws JobExecutionException Thrown if anything went amiss during initial job launch,
 *         or if the job terminally failed.
 * @throws InterruptedException Thrown if the thread waiting for the job result is interrupted
 */
public JobExecutionResult runJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException;

Usage Examples

Basic Mini Cluster Setup

import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.runtime.jobgraph.JobGraph;

// Create basic mini cluster configuration
MiniClusterConfiguration config = new MiniClusterConfiguration.Builder()
    .setNumTaskManagers(2)
    .setNumSlotsPerTaskManager(4)
    .build();

// Start the mini cluster
MiniCluster miniCluster = new MiniCluster(config);
miniCluster.start();

try {
    // Submit and execute job synchronously
    JobGraph jobGraph = createJobGraph();
    JobExecutionResult result = miniCluster.runJobBlocking(jobGraph);
    
    System.out.println("Job completed successfully in " + result.getNetRuntime() + " ms");
    
} finally {
    // Always clean up
    miniCluster.close();
}

Advanced Configuration

import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;

// Create advanced configuration with custom Flink settings
Configuration flinkConfig = new Configuration();
flinkConfig.setString("taskmanager.memory.segment-size", "32768");
flinkConfig.setInteger("parallelism.default", 4);

// Build mini cluster with custom configuration
MiniClusterConfiguration config = new MiniClusterConfiguration.Builder()
    .setNumTaskManagers(2)
    .setNumSlotsPerTaskManager(8)
    .setRpcServiceSharing(RpcServiceSharing.SHARED)
    .setConfiguration(flinkConfig)
    .build();

MiniCluster miniCluster = new MiniCluster(config);
miniCluster.start();

// Submit job for detached execution
JobGraph jobGraph = createAsyncJobGraph();
miniCluster.runDetached(jobGraph);

// Continue with other work while job runs...

Testing Framework Integration

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class FlinkJobTest {
    private MiniCluster miniCluster;
    
    @BeforeEach
    public void setup() throws Exception {
        Configuration config = new Configuration();
        config.setString("execution.checkpointing.mode", "EXACTLY_ONCE");
        config.setLong("execution.checkpointing.interval", 1000L);
        
        MiniClusterConfiguration clusterConfig = new MiniClusterConfiguration.Builder()
            .setNumTaskManagers(1)
            .setNumSlotsPerTaskManager(4)
            .setConfiguration(config)
            .build();
            
        miniCluster = new MiniCluster(clusterConfig);
        miniCluster.start();
    }
    
    @AfterEach
    public void teardown() throws Exception {
        if (miniCluster != null) {
            miniCluster.close();
        }
    }
    
    @Test
    public void testJobExecution() throws Exception {
        // Create test job
        JobGraph jobGraph = createTestJobGraph();
        
        // Submit job and wait for completion
        JobExecutionResult result = miniCluster.runJobBlocking(jobGraph);
        
        // Verify successful execution
        assertThat(result.getNetRuntime()).isGreaterThan(0);
        
        // Verify accumulator results
        Map<String, Object> accumulators = result.getAllAccumulatorResults();
        assertThat(accumulators.get("records-processed")).isEqualTo(1000L);
    }
    
    @Test
    public void testDetachedExecution() throws Exception {
        JobGraph jobGraph = createLongRunningJobGraph();
        
        // Submit job for detached execution
        miniCluster.runDetached(jobGraph);
        
        // Job is now running asynchronously
        assertTrue(miniCluster.isRunning());
        
        // Continue with other test logic...
    }
}

Default Constructor Usage

// Create mini cluster with default configuration:
// - One JobManager
// - One TaskManager  
// - One task slot per TaskManager
// - Shared RPC service
MiniCluster miniCluster = new MiniCluster();
miniCluster.start();

try {
    JobGraph simpleJob = createSimpleJobGraph();
    JobExecutionResult result = miniCluster.runJobBlocking(simpleJob);
    
    System.out.println("Simple job completed: " + result.getNetRuntime() + " ms");
} finally {
    miniCluster.close();
}

Resource-Constrained Testing

// Configure mini cluster for resource-constrained environments
Configuration resourceConfig = new Configuration();
resourceConfig.setString("taskmanager.memory.process.size", "512m");
resourceConfig.setString("taskmanager.memory.jvm-metaspace.size", "64m");
resourceConfig.setInteger("taskmanager.numberOfTaskSlots", 2);

MiniClusterConfiguration config = new MiniClusterConfiguration.Builder()
    .setNumTaskManagers(1)
    .setNumSlotsPerTaskManager(2)
    .setConfiguration(resourceConfig)
    .build();

MiniCluster miniCluster = new MiniCluster(config);
miniCluster.start();

// Submit resource-conscious job
JobGraph lightweightJob = createLightweightJobGraph();
JobExecutionResult result = miniCluster.runJobBlocking(lightweightJob);

Configuration Patterns

Shared vs Dedicated RPC Services

// Shared RPC service (recommended for most testing scenarios)
MiniClusterConfiguration sharedConfig = new MiniClusterConfiguration.Builder()
    .setRpcServiceSharing(RpcServiceSharing.SHARED)
    .setNumTaskManagers(3)
    .setNumSlotsPerTaskManager(2)
    .build();

// Dedicated RPC services (for testing RPC isolation)
MiniClusterConfiguration dedicatedConfig = new MiniClusterConfiguration.Builder()
    .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
    .setNumTaskManagers(2)
    .setNumSlotsPerTaskManager(4)
    .build();

Custom Flink Configuration

Configuration flinkConfig = new Configuration();

// Checkpointing configuration
flinkConfig.setString("execution.checkpointing.mode", "EXACTLY_ONCE");
flinkConfig.setLong("execution.checkpointing.interval", 5000L);
flinkConfig.setString("state.backend", "filesystem");
flinkConfig.setString("state.checkpoints.dir", "file:///tmp/test-checkpoints");

// Memory configuration
flinkConfig.setString("taskmanager.memory.process.size", "1024m");
flinkConfig.setString("taskmanager.memory.managed.fraction", "0.4");

// Networking configuration
flinkConfig.setString("taskmanager.network.memory.fraction", "0.1");
flinkConfig.setInteger("taskmanager.network.numberOfBuffers", 2048);

MiniClusterConfiguration config = new MiniClusterConfiguration.Builder()
    .setConfiguration(flinkConfig)
    .setNumTaskManagers(2)
    .setNumSlotsPerTaskManager(4)
    .build();

Common Types

JobExecutionException

Exception thrown during job execution failures.

public class JobExecutionException extends FlinkException {
    public JobExecutionException(JobID jobId, String msg);
    public JobExecutionException(JobID jobId, String msg, Throwable cause);
    public JobID getJobID();
}

JobExecutionResult

Result object containing job execution information and accumulated results.

public class JobExecutionResult implements Serializable {
    public JobID getJobID();
    public long getNetRuntime();
    public Map<String, Object> getAllAccumulatorResults();
    public <T> T getAccumulatorResult(String accumulatorName);
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-runtime-2-10

docs

data-exchange.md

execution-graph.md

high-availability.md

index.md

job-management.md

message-passing.md

metrics.md

mini-cluster.md

rpc-framework.md

state-management.md

task-execution.md

tile.json