Apache Flink runtime engine providing core distributed streaming dataflow execution, task scheduling, state management, and fault tolerance capabilities.
—
The job execution and management capabilities provide the core APIs for creating, submitting, monitoring, and controlling Flink dataflow programs. These APIs are essential for any application that needs to execute distributed stream processing or batch jobs on a Flink cluster.
The JobGraph is the central data structure representing a Flink dataflow program at the JobManager level. It defines the topology of operations and data flow between them.
public class JobGraph implements Serializable {
public JobGraph(String jobName);
public JobGraph(JobID jobId, String jobName);
public JobGraph(JobVertex... vertices);
public void addVertex(JobVertex vertex);
public void addJar(Path jar);
public void addBlob(BlobKey key);
public JobVertex[] getVerticesAsArray();
public List<JobVertex> getVerticesSortedTopologicallyFromSources() throws InvalidProgramException;
public Iterable<JobVertex> getVertices();
public int getNumberOfVertices();
public String getName();
public JobID getJobID();
public Configuration getJobConfiguration();
public void setScheduleMode(ScheduleMode scheduleMode);
public ScheduleMode getScheduleMode();
public void setSessionTimeout(long sessionTimeout);
public long getSessionTimeout();
public void setAllowQueuedScheduling(boolean allowQueuedScheduling);
public boolean getAllowQueuedScheduling();
public SavepointRestoreSettings getSavepointRestoreSettings();
public void setSavepointRestoreSettings(SavepointRestoreSettings settings);
public void setExecutionConfig(ExecutionConfig executionConfig) throws IOException;
public SerializedValue<ExecutionConfig> getSerializedExecutionConfig();
public JobCheckpointingSettings getCheckpointingSettings();
public void setSnapshotSettings(JobCheckpointingSettings settings);
public List<URL> getClasspaths();
public void setClasspaths(List<URL> paths);
}Represents individual operations or vertices in the job graph, each corresponding to a task in the execution.
public class JobVertex implements Serializable {
public JobVertex(String name);
public JobVertex(String name, JobVertexID id);
public void setInvokableClass(Class<? extends AbstractInvokable> invokable);
public Class<? extends AbstractInvokable> getInvokableClass();
public void setParallelism(int parallelism);
public int getParallelism();
public void setMaxParallelism(int maxParallelism);
public int getMaxParallelism();
public JobVertexID getID();
public String getName();
public Configuration getConfiguration();
public void setConfiguration(Configuration configuration);
public List<JobEdge> getInputs();
public List<IntermediateDataSet> getProducedDataSets();
}Defines connections between job vertices, specifying how data flows between operations.
public class JobEdge implements Serializable {
public JobEdge(IntermediateDataSet source, JobVertex target, DistributionPattern distributionPattern);
public IntermediateDataSet getSource();
public JobVertex getTarget();
public DistributionPattern getDistributionPattern();
public void setShipStrategy(ShipStrategyType shipStrategy);
public ShipStrategyType getShipStrategy();
}The main client class providing static methods for job submission, monitoring, and management. JobClient bridges between JobManager's asynchronous actor messages and synchronous method calls.
public class JobClient {
public static ActorSystem startJobClientActorSystem(Configuration config) throws IOException;
public static JobListeningContext submitJob(
ActorSystem actorSystem,
Configuration config,
HighAvailabilityServices highAvailabilityServices,
ActorGateway jobManagerGateway,
JobGraph jobGraph,
Time timeout,
boolean sysoutLogUpdates,
ClassLoader userCodeClassLoader
) throws JobExecutionException;
public static JobListeningContext attachToRunningJob(
JobID jobID,
ActorGateway jobManagerGateway,
Configuration configuration,
ActorSystem actorSystem,
HighAvailabilityServices highAvailabilityServices,
Time timeout,
boolean sysoutLogUpdates
);
public static JobExecutionResult awaitJobResult(JobListeningContext listeningContext)
throws JobExecutionException;
public static JobExecutionResult submitJobAndWait(
ActorSystem actorSystem,
Configuration config,
HighAvailabilityServices highAvailabilityServices,
ActorGateway jobManagerGateway,
JobGraph jobGraph,
Time timeout,
boolean sysoutLogUpdates,
ClassLoader userCodeClassLoader
) throws JobExecutionException;
public static void submitJobDetached(
ActorGateway jobManagerGateway,
Configuration config,
JobGraph jobGraph,
Time timeout,
ClassLoader userCodeClassLoader
) throws JobExecutionException;
}Contains the results and metadata from a completed job execution.
public class JobExecutionResult implements Serializable {
public JobExecutionResult(JobID jobID, long netRuntime, Map<String, Object> accumulatorResults);
public JobID getJobID();
public long getNetRuntime();
public Map<String, Object> getAllAccumulatorResults();
public <T> T getAccumulatorResult(String accumulatorName);
}Enumeration of all possible job execution states.
public enum JobStatus {
CREATED, // Job has been created but not yet submitted
RUNNING, // Job is currently executing
FAILING, // Job is in the process of failing
FAILED, // Job has failed and is no longer executing
CANCELLING, // Job is being cancelled
CANCELED, // Job has been cancelled
FINISHED, // Job completed successfully
RESTARTING, // Job is restarting from a failure
SUSPENDED; // Job has been suspended (for savepoints)
public boolean isGloballyTerminalState();
public boolean isTerminalState();
}Defines how job vertices are scheduled for execution.
public enum ScheduleMode {
LAZY_FROM_SOURCES, // Schedule vertices lazily when input data is available
EAGER; // Schedule all vertices immediately upon job start
}Base exception for job execution failures.
public class JobExecutionException extends FlinkException {
public JobExecutionException(JobID jobId, String msg);
public JobExecutionException(JobID jobId, String msg, Throwable cause);
public JobID getJobID();
}Thrown when job submission fails.
public class JobSubmissionException extends FlinkException {
public JobSubmissionException(String message);
public JobSubmissionException(String message, Throwable cause);
}General job-related exceptions.
public class JobException extends FlinkException {
public JobException(String message);
public JobException(String message, Throwable cause);
}import org.apache.flink.runtime.jobgraph.*;
import org.apache.flink.configuration.Configuration;
// Create a new job
JobGraph jobGraph = new JobGraph("Data Processing Job");
// Configure job-level settings
Configuration jobConfig = new Configuration();
jobConfig.setString("job.checkpoint.dir", "file:///checkpoints");
jobGraph.setJobConfiguration(jobConfig);
// Enable checkpointing
jobGraph.setCheckpointingEnabled(true);
jobGraph.setCheckpointingInterval(30000); // 30 seconds
// Set scheduling mode
jobGraph.setScheduleMode(ScheduleMode.EAGER);
// Create vertices
JobVertex sourceVertex = new JobVertex("Data Source");
sourceVertex.setInvokableClass(MySourceTask.class);
sourceVertex.setParallelism(4);
JobVertex transformVertex = new JobVertex("Data Transform");
transformVertex.setInvokableClass(MyTransformTask.class);
transformVertex.setParallelism(8);
JobVertex sinkVertex = new JobVertex("Data Sink");
sinkVertex.setInvokableClass(MySinkTask.class);
sinkVertex.setParallelism(2);
// Add vertices to job
jobGraph.addVertex(sourceVertex);
jobGraph.addVertex(transformVertex);
jobGraph.addVertex(sinkVertex);
// Connect vertices with edges
IntermediateDataSet sourceOutput = sourceVertex.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
JobEdge sourceToTransform = new JobEdge(sourceOutput, transformVertex, DistributionPattern.ALL_TO_ALL);
jobGraph.addEdge(sourceToTransform);
IntermediateDataSet transformOutput = transformVertex.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
JobEdge transformToSink = new JobEdge(transformOutput, sinkVertex, DistributionPattern.FORWARD);
jobGraph.addEdge(transformToSink);import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.jobgraph.JobStatus;
// Submit job (using a cluster client or mini cluster)
JobClient jobClient = clusterClient.submitJob(jobGraph);
// Monitor job status
JobStatus status = jobClient.getJobStatus();
System.out.println("Job status: " + status);
// Wait for completion and get results
try {
JobExecutionResult result = jobClient.getJobExecutionResult();
System.out.println("Job completed in " + result.getNetRuntime() + " ms");
// Access accumulator results
Map<String, Object> accumulators = result.getAllAccumulatorResults();
Long recordCount = (Long) accumulators.get("records-processed");
} catch (JobExecutionException e) {
System.err.println("Job failed: " + e.getMessage());
// Handle job failure
}// Cancel a running job
try {
jobClient.cancel();
System.out.println("Job cancellation initiated");
// Wait for cancellation to complete
while (true) {
JobStatus status = jobClient.getJobStatus();
if (status == JobStatus.CANCELED || status == JobStatus.FAILED) {
System.out.println("Job terminated with status: " + status);
break;
}
Thread.sleep(1000);
}
} catch (JobExecutionException e) {
System.err.println("Failed to cancel job: " + e.getMessage());
}// Set job-level configuration
Configuration jobConfig = new Configuration();
jobConfig.setString("state.backend", "filesystem");
jobConfig.setString("state.checkpoints.dir", "file:///checkpoints");
jobConfig.setLong("execution.checkpointing.interval", 60000L);
jobConfig.setInteger("parallelism.default", 4);
jobGraph.setJobConfiguration(jobConfig);// Enable checkpointing for fault tolerance
jobGraph.setCheckpointingEnabled(true);
jobGraph.setCheckpointingInterval(30000); // Checkpoint every 30 seconds
// Configure checkpoint settings in job configuration
Configuration jobConfig = jobGraph.getJobConfiguration();
jobConfig.setString("execution.checkpointing.mode", "EXACTLY_ONCE");
jobConfig.setLong("execution.checkpointing.timeout", 600000L); // 10 minutes
jobConfig.setInteger("execution.checkpointing.max-concurrent-checkpoints", 1);Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-runtime-2-10