CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-runtime-2-10

Apache Flink runtime engine providing core distributed streaming dataflow execution, task scheduling, state management, and fault tolerance capabilities.

Pending
Overview
Eval results
Files

job-management.mddocs/

Job Execution and Management

The job execution and management capabilities provide the core APIs for creating, submitting, monitoring, and controlling Flink dataflow programs. These APIs are essential for any application that needs to execute distributed stream processing or batch jobs on a Flink cluster.

Core Job Components

JobGraph

The JobGraph is the central data structure representing a Flink dataflow program at the JobManager level. It defines the topology of operations and data flow between them.

public class JobGraph implements Serializable {
    public JobGraph(String jobName);
    public JobGraph(JobID jobId, String jobName);
    public JobGraph(JobVertex... vertices);
    
    public void addVertex(JobVertex vertex);
    public void addJar(Path jar);
    public void addBlob(BlobKey key);
    
    public JobVertex[] getVerticesAsArray();
    public List<JobVertex> getVerticesSortedTopologicallyFromSources() throws InvalidProgramException;
    public Iterable<JobVertex> getVertices();
    public int getNumberOfVertices();
    
    public String getName();
    public JobID getJobID();
    
    public Configuration getJobConfiguration();
    
    public void setScheduleMode(ScheduleMode scheduleMode);
    public ScheduleMode getScheduleMode();
    
    public void setSessionTimeout(long sessionTimeout);
    public long getSessionTimeout();
    
    public void setAllowQueuedScheduling(boolean allowQueuedScheduling);
    public boolean getAllowQueuedScheduling();
    
    public SavepointRestoreSettings getSavepointRestoreSettings();
    public void setSavepointRestoreSettings(SavepointRestoreSettings settings);
    
    public void setExecutionConfig(ExecutionConfig executionConfig) throws IOException;
    public SerializedValue<ExecutionConfig> getSerializedExecutionConfig();
    
    public JobCheckpointingSettings getCheckpointingSettings();
    public void setSnapshotSettings(JobCheckpointingSettings settings);
    
    public List<URL> getClasspaths();
    public void setClasspaths(List<URL> paths);
}

JobVertex

Represents individual operations or vertices in the job graph, each corresponding to a task in the execution.

public class JobVertex implements Serializable {
    public JobVertex(String name);
    public JobVertex(String name, JobVertexID id);
    
    public void setInvokableClass(Class<? extends AbstractInvokable> invokable);
    public Class<? extends AbstractInvokable> getInvokableClass();
    
    public void setParallelism(int parallelism);
    public int getParallelism();
    
    public void setMaxParallelism(int maxParallelism);
    public int getMaxParallelism();
    
    public JobVertexID getID();
    public String getName();
    
    public Configuration getConfiguration();
    public void setConfiguration(Configuration configuration);
    
    public List<JobEdge> getInputs();
    public List<IntermediateDataSet> getProducedDataSets();
}

JobEdge

Defines connections between job vertices, specifying how data flows between operations.

public class JobEdge implements Serializable {
    public JobEdge(IntermediateDataSet source, JobVertex target, DistributionPattern distributionPattern);
    
    public IntermediateDataSet getSource();
    public JobVertex getTarget();
    public DistributionPattern getDistributionPattern();
    
    public void setShipStrategy(ShipStrategyType shipStrategy);
    public ShipStrategyType getShipStrategy();
}

Job Client and Execution

JobClient

The main client class providing static methods for job submission, monitoring, and management. JobClient bridges between JobManager's asynchronous actor messages and synchronous method calls.

public class JobClient {
    public static ActorSystem startJobClientActorSystem(Configuration config) throws IOException;
    
    public static JobListeningContext submitJob(
        ActorSystem actorSystem, 
        Configuration config, 
        HighAvailabilityServices highAvailabilityServices, 
        ActorGateway jobManagerGateway, 
        JobGraph jobGraph, 
        Time timeout, 
        boolean sysoutLogUpdates, 
        ClassLoader userCodeClassLoader
    ) throws JobExecutionException;
    
    public static JobListeningContext attachToRunningJob(
        JobID jobID, 
        ActorGateway jobManagerGateway, 
        Configuration configuration, 
        ActorSystem actorSystem, 
        HighAvailabilityServices highAvailabilityServices, 
        Time timeout, 
        boolean sysoutLogUpdates
    );
    
    public static JobExecutionResult awaitJobResult(JobListeningContext listeningContext) 
        throws JobExecutionException;
    
    public static JobExecutionResult submitJobAndWait(
        ActorSystem actorSystem, 
        Configuration config, 
        HighAvailabilityServices highAvailabilityServices, 
        ActorGateway jobManagerGateway, 
        JobGraph jobGraph, 
        Time timeout, 
        boolean sysoutLogUpdates, 
        ClassLoader userCodeClassLoader
    ) throws JobExecutionException;
    
    public static void submitJobDetached(
        ActorGateway jobManagerGateway, 
        Configuration config, 
        JobGraph jobGraph, 
        Time timeout, 
        ClassLoader userCodeClassLoader
    ) throws JobExecutionException;
}

JobExecutionResult

Contains the results and metadata from a completed job execution.

public class JobExecutionResult implements Serializable {
    public JobExecutionResult(JobID jobID, long netRuntime, Map<String, Object> accumulatorResults);
    
    public JobID getJobID();
    public long getNetRuntime();
    public Map<String, Object> getAllAccumulatorResults();
    public <T> T getAccumulatorResult(String accumulatorName);
}

Job Status and Lifecycle

JobStatus

Enumeration of all possible job execution states.

public enum JobStatus {
    CREATED,      // Job has been created but not yet submitted
    RUNNING,      // Job is currently executing
    FAILING,      // Job is in the process of failing
    FAILED,       // Job has failed and is no longer executing
    CANCELLING,   // Job is being cancelled
    CANCELED,     // Job has been cancelled
    FINISHED,     // Job completed successfully
    RESTARTING,   // Job is restarting from a failure
    SUSPENDED;    // Job has been suspended (for savepoints)
    
    public boolean isGloballyTerminalState();
    public boolean isTerminalState();
}

ScheduleMode

Defines how job vertices are scheduled for execution.

public enum ScheduleMode {
    LAZY_FROM_SOURCES,  // Schedule vertices lazily when input data is available
    EAGER;              // Schedule all vertices immediately upon job start
}

Exception Handling

JobExecutionException

Base exception for job execution failures.

public class JobExecutionException extends FlinkException {
    public JobExecutionException(JobID jobId, String msg);
    public JobExecutionException(JobID jobId, String msg, Throwable cause);
    
    public JobID getJobID();
}

JobSubmissionException

Thrown when job submission fails.

public class JobSubmissionException extends FlinkException {
    public JobSubmissionException(String message);
    public JobSubmissionException(String message, Throwable cause);
}

JobException

General job-related exceptions.

public class JobException extends FlinkException {
    public JobException(String message);
    public JobException(String message, Throwable cause);
}

Usage Examples

Creating and Configuring a JobGraph

import org.apache.flink.runtime.jobgraph.*;
import org.apache.flink.configuration.Configuration;

// Create a new job
JobGraph jobGraph = new JobGraph("Data Processing Job");

// Configure job-level settings
Configuration jobConfig = new Configuration();
jobConfig.setString("job.checkpoint.dir", "file:///checkpoints");
jobGraph.setJobConfiguration(jobConfig);

// Enable checkpointing
jobGraph.setCheckpointingEnabled(true);
jobGraph.setCheckpointingInterval(30000); // 30 seconds

// Set scheduling mode
jobGraph.setScheduleMode(ScheduleMode.EAGER);

// Create vertices
JobVertex sourceVertex = new JobVertex("Data Source");
sourceVertex.setInvokableClass(MySourceTask.class);
sourceVertex.setParallelism(4);

JobVertex transformVertex = new JobVertex("Data Transform");
transformVertex.setInvokableClass(MyTransformTask.class);
transformVertex.setParallelism(8);

JobVertex sinkVertex = new JobVertex("Data Sink");
sinkVertex.setInvokableClass(MySinkTask.class);
sinkVertex.setParallelism(2);

// Add vertices to job
jobGraph.addVertex(sourceVertex);
jobGraph.addVertex(transformVertex);
jobGraph.addVertex(sinkVertex);

// Connect vertices with edges
IntermediateDataSet sourceOutput = sourceVertex.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
JobEdge sourceToTransform = new JobEdge(sourceOutput, transformVertex, DistributionPattern.ALL_TO_ALL);
jobGraph.addEdge(sourceToTransform);

IntermediateDataSet transformOutput = transformVertex.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
JobEdge transformToSink = new JobEdge(transformOutput, sinkVertex, DistributionPattern.FORWARD);
jobGraph.addEdge(transformToSink);

Job Submission and Monitoring

import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.jobgraph.JobStatus;

// Submit job (using a cluster client or mini cluster)
JobClient jobClient = clusterClient.submitJob(jobGraph);

// Monitor job status
JobStatus status = jobClient.getJobStatus();
System.out.println("Job status: " + status);

// Wait for completion and get results
try {
    JobExecutionResult result = jobClient.getJobExecutionResult();
    System.out.println("Job completed in " + result.getNetRuntime() + " ms");
    
    // Access accumulator results
    Map<String, Object> accumulators = result.getAllAccumulatorResults();
    Long recordCount = (Long) accumulators.get("records-processed");
    
} catch (JobExecutionException e) {
    System.err.println("Job failed: " + e.getMessage());
    // Handle job failure
}

Handling Job Cancellation

// Cancel a running job
try {
    jobClient.cancel();
    System.out.println("Job cancellation initiated");
    
    // Wait for cancellation to complete
    while (true) {
        JobStatus status = jobClient.getJobStatus();
        if (status == JobStatus.CANCELED || status == JobStatus.FAILED) {
            System.out.println("Job terminated with status: " + status);
            break;
        }
        Thread.sleep(1000);
    }
    
} catch (JobExecutionException e) {
    System.err.println("Failed to cancel job: " + e.getMessage());
}

Common Patterns

Configuring Job Parameters

// Set job-level configuration
Configuration jobConfig = new Configuration();
jobConfig.setString("state.backend", "filesystem");
jobConfig.setString("state.checkpoints.dir", "file:///checkpoints");
jobConfig.setLong("execution.checkpointing.interval", 60000L);
jobConfig.setInteger("parallelism.default", 4);

jobGraph.setJobConfiguration(jobConfig);

Setting Up Fault Tolerance

// Enable checkpointing for fault tolerance
jobGraph.setCheckpointingEnabled(true);
jobGraph.setCheckpointingInterval(30000); // Checkpoint every 30 seconds

// Configure checkpoint settings in job configuration
Configuration jobConfig = jobGraph.getJobConfiguration();
jobConfig.setString("execution.checkpointing.mode", "EXACTLY_ONCE");
jobConfig.setLong("execution.checkpointing.timeout", 600000L); // 10 minutes
jobConfig.setInteger("execution.checkpointing.max-concurrent-checkpoints", 1);

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-runtime-2-10

docs

data-exchange.md

execution-graph.md

high-availability.md

index.md

job-management.md

message-passing.md

metrics.md

mini-cluster.md

rpc-framework.md

state-management.md

task-execution.md

tile.json