The runtime module of Apache Flink that provides the core execution engine, scheduling, fault tolerance, checkpointing, and resource management capabilities for distributed stream processing applications
—
Core APIs for defining and configuring Flink jobs as directed acyclic graphs (DAGs) of operations. The JobGraph represents the logical structure of a Flink job, while JobVertex represents individual operations within the graph.
The main container representing a complete Flink job with its configuration, vertices, and execution parameters.
/**
* Represents a Flink dataflow program at the low level that the JobManager accepts.
* All programs from higher level APIs are transformed into JobGraphs.
*/
public class JobGraph implements ExecutionPlan {
/** Create a new JobGraph with the given name */
public JobGraph(String jobName);
/** Create a new JobGraph with specific ID and name */
public JobGraph(JobID jobId, String jobName);
/** Add a vertex to this job graph */
public void addVertex(JobVertex vertex);
/** Get all vertices in this job graph */
public Collection<JobVertex> getVertices();
/** Get specific vertex by ID */
public JobVertex findVertexByID(JobVertexID id);
/** Get the job ID */
public JobID getJobID();
/** Get the job name */
public String getName();
/** Set the job type (BATCH or STREAMING) */
public void setJobType(JobType jobType);
/** Get the job type */
public JobType getJobType();
/** Set whether this job uses dynamic graph changes */
public void setDynamic(boolean dynamic);
/** Check if this job uses dynamic graph changes */
public boolean isDynamic();
/** Get the job configuration */
public Configuration getJobConfiguration();
/** Set the job configuration */
public void setJobConfiguration(Configuration jobConfiguration);
/** Enable checkpointing for this job */
public void setSnapshotSettings(JobCheckpointingSettings settings);
/** Get checkpointing settings */
public SerializedValue<JobCheckpointingSettings> getCheckpointingSettings();
/** Set savepoint restore settings */
public void setSavepointRestoreSettings(SavepointRestoreSettings settings);
/** Get savepoint restore settings */
public SavepointRestoreSettings getSavepointRestoreSettings();
}Usage Examples:
// Create a basic job graph
JobGraph jobGraph = new JobGraph("Data Processing Pipeline");
jobGraph.setJobType(JobType.STREAMING);
// Configure job-level settings
Configuration jobConfig = new Configuration();
jobConfig.setString(ConfigConstants.FLINK_TM_JVM_PARAMS, "-Xmx1024m");
jobGraph.setJobConfiguration(jobConfig);
// Enable checkpointing
JobCheckpointingSettings checkpointSettings = new JobCheckpointingSettings(
vertexIdsToTrigger,
vertexIdsToWaitFor,
vertexIdsToCommitTo,
new CheckpointCoordinatorConfiguration.Builder()
.setCheckpointInterval(5000L)
.setCheckpointTimeout(60000L)
.setMaxConcurrentCheckpoints(1)
.build(),
null
);
jobGraph.setSnapshotSettings(checkpointSettings);Represents a single operation (vertex) in the job graph, such as a source, transformation, or sink.
/**
* The base class for job vertices representing operations in the job graph.
*/
public class JobVertex implements Serializable {
/** Create a job vertex with default name */
public JobVertex(String name);
/** Create a job vertex with specific ID and name */
public JobVertex(String name, JobVertexID id);
/** Get the vertex ID */
public JobVertexID getId();
/** Get the vertex name */
public String getName();
/** Set the name of this vertex */
public void setName(String name);
/** Set the parallelism for this vertex */
public void setParallelism(int parallelism);
/** Get the parallelism of this vertex */
public int getParallelism();
/** Set the maximum parallelism for this vertex */
public void setMaxParallelism(int maxParallelism);
/** Get the maximum parallelism of this vertex */
public int getMaxParallelism();
/** Set the invokable class that implements the vertex logic */
public void setInvokableClass(Class<? extends TaskInvokable> invokable);
/** Get the invokable class */
public Class<? extends TaskInvokable> getInvokableClass();
/** Set minimum resource requirements */
public void setResources(ResourceSpec minResources, ResourceSpec preferredResources);
/** Get minimum resource requirements */
public ResourceSpec getMinResources();
/** Get preferred resource requirements */
public ResourceSpec getPreferredResources();
/** Add a new data set as input from another vertex */
public void connectNewDataSetAsInput(
JobVertex input,
DistributionPattern distPattern,
ResultPartitionType partitionType
);
/** Connect to an existing intermediate data set */
public void connectDataSetAsInput(
IntermediateDataSet dataSet,
DistributionPattern distPattern
);
/** Get all input edges */
public List<JobEdge> getInputs();
/** Get all produced data sets */
public List<IntermediateDataSet> getProducedDataSets();
/** Set slot sharing group for co-location */
public void setSlotSharingGroup(SlotSharingGroup slotSharingGroup);
/** Get slot sharing group */
public SlotSharingGroup getSlotSharingGroup();
/** Set co-location group for strict co-location */
public void setCoLocationGroup(CoLocationGroup coLocationGroup);
/** Get co-location group */
public CoLocationGroup getCoLocationGroup();
/** Add operator coordinator */
public void addOperatorCoordinator(SerializedValue<OperatorCoordinator.Provider> coordinator);
/** Get operator coordinators */
public List<SerializedValue<OperatorCoordinator.Provider>> getOperatorCoordinators();
}Usage Examples:
// Create source vertex
JobVertex sourceVertex = new JobVertex("Kafka Source");
sourceVertex.setParallelism(4);
sourceVertex.setInvokableClass(KafkaSourceTask.class);
// Set resource requirements
ResourceSpec minResources = ResourceSpec.newBuilder()
.setCpuCores(1.0)
.setHeapMemoryInMB(512)
.build();
ResourceSpec preferredResources = ResourceSpec.newBuilder()
.setCpuCores(2.0)
.setHeapMemoryInMB(1024)
.build();
sourceVertex.setResources(minResources, preferredResources);
// Create transformation vertex
JobVertex mapVertex = new JobVertex("Data Transformation");
mapVertex.setParallelism(8);
mapVertex.setInvokableClass(MapTask.class);
// Connect vertices
mapVertex.connectNewDataSetAsInput(
sourceVertex,
DistributionPattern.ALL_TO_ALL,
ResultPartitionType.PIPELINED
);
// Set up slot sharing for efficient resource usage
SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
sourceVertex.setSlotSharingGroup(slotSharingGroup);
mapVertex.setSlotSharingGroup(slotSharingGroup);
// Add vertices to job graph
jobGraph.addVertex(sourceVertex);
jobGraph.addVertex(mapVertex);Additional configuration options for job vertices including input/output formats and custom parameters.
/**
* Set input format for vertices that read from external sources
*/
public void setInputSplitSource(InputSplitSource<?> inputSplitSource);
/**
* Get input split source
*/
public InputSplitSource<?> getInputSplitSource();
/**
* Set configuration for this vertex
*/
public void setConfiguration(Configuration configuration);
/**
* Get vertex configuration
*/
public Configuration getConfiguration();
/**
* Set jar files needed by this vertex
*/
public void addJar(Path jarPath);
/**
* Get jar files
*/
public List<Path> getJarFiles();
/**
* Set classpath for this vertex
*/
public void addClasspaths(Collection<URL> classpaths);
/**
* Get classpaths
*/
public Collection<URL> getUserClasspaths();Utility methods for job graph manipulation and validation.
/**
* Utility class for job graph operations
*/
public class JobGraphUtils {
/** Validate job graph structure and configuration */
public static void validateJobGraph(JobGraph jobGraph);
/** Get topologically sorted vertices */
public static List<JobVertex> getVerticesTopologically(JobGraph jobGraph);
/** Check if job graph contains cycles */
public static boolean hasCycles(JobGraph jobGraph);
/** Calculate total resource requirements */
public static ResourceProfile calculateTotalResources(JobGraph jobGraph);
}// Job graph identifiers
public class JobVertexID implements Serializable {
public JobVertexID();
public JobVertexID(byte[] bytes);
public static JobVertexID generate();
public byte[] getBytes();
}
public class IntermediateDataSetID implements Serializable {
public IntermediateDataSetID();
public IntermediateDataSetID(byte[] bytes);
public static IntermediateDataSetID generate();
}
// Distribution patterns
public enum DistributionPattern {
/** Each producing task sends data to all consuming tasks */
ALL_TO_ALL,
/** Each producing task sends data to exactly one consuming task */
POINTWISE
}
// Result partition types
public enum ResultPartitionType {
/** Pipelined partitions for streaming data exchange */
PIPELINED,
/** Pipelined partitions with bounded buffer */
PIPELINED_BOUNDED,
/** Blocking partitions for batch processing */
BLOCKING,
/** Blocking partitions that can be consumed multiple times */
BLOCKING_PERSISTENT
}
// Job types
public enum JobType {
/** Batch processing job with finite input */
BATCH("BATCH"),
/** Streaming job with continuous data processing */
STREAMING("STREAMING");
private final String name;
JobType(String name) {
this.name = name;
}
public String getName() {
return name;
}
}
// Savepoint restore settings
public class SavepointRestoreSettings implements Serializable {
public static SavepointRestoreSettings none();
public static SavepointRestoreSettings forPath(String restorePath);
public static SavepointRestoreSettings forPath(String restorePath, boolean allowNonRestoredState);
public boolean restoreSavepoint();
public String getRestorePath();
public boolean allowNonRestoredState();
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-runtime