CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-runtime

The runtime module of Apache Flink that provides the core execution engine, scheduling, fault tolerance, checkpointing, and resource management capabilities for distributed stream processing applications

Pending
Overview
Eval results
Files

job-graph.mddocs/

Job Graph Management

Core APIs for defining and configuring Flink jobs as directed acyclic graphs (DAGs) of operations. The JobGraph represents the logical structure of a Flink job, while JobVertex represents individual operations within the graph.

Capabilities

JobGraph

The main container representing a complete Flink job with its configuration, vertices, and execution parameters.

/**
 * Represents a Flink dataflow program at the low level that the JobManager accepts.
 * All programs from higher level APIs are transformed into JobGraphs.
 */
public class JobGraph implements ExecutionPlan {
    /** Create a new JobGraph with the given name */
    public JobGraph(String jobName);
    
    /** Create a new JobGraph with specific ID and name */
    public JobGraph(JobID jobId, String jobName);
    
    /** Add a vertex to this job graph */
    public void addVertex(JobVertex vertex);
    
    /** Get all vertices in this job graph */
    public Collection<JobVertex> getVertices();
    
    /** Get specific vertex by ID */
    public JobVertex findVertexByID(JobVertexID id);
    
    /** Get the job ID */
    public JobID getJobID();
    
    /** Get the job name */
    public String getName();
    
    /** Set the job type (BATCH or STREAMING) */
    public void setJobType(JobType jobType);
    
    /** Get the job type */
    public JobType getJobType();
    
    /** Set whether this job uses dynamic graph changes */
    public void setDynamic(boolean dynamic);
    
    /** Check if this job uses dynamic graph changes */
    public boolean isDynamic();
    
    /** Get the job configuration */
    public Configuration getJobConfiguration();
    
    /** Set the job configuration */
    public void setJobConfiguration(Configuration jobConfiguration);
    
    /** Enable checkpointing for this job */
    public void setSnapshotSettings(JobCheckpointingSettings settings);
    
    /** Get checkpointing settings */
    public SerializedValue<JobCheckpointingSettings> getCheckpointingSettings();
    
    /** Set savepoint restore settings */
    public void setSavepointRestoreSettings(SavepointRestoreSettings settings);
    
    /** Get savepoint restore settings */
    public SavepointRestoreSettings getSavepointRestoreSettings();
}

Usage Examples:

// Create a basic job graph
JobGraph jobGraph = new JobGraph("Data Processing Pipeline");
jobGraph.setJobType(JobType.STREAMING);

// Configure job-level settings
Configuration jobConfig = new Configuration();
jobConfig.setString(ConfigConstants.FLINK_TM_JVM_PARAMS, "-Xmx1024m");
jobGraph.setJobConfiguration(jobConfig);

// Enable checkpointing
JobCheckpointingSettings checkpointSettings = new JobCheckpointingSettings(
    vertexIdsToTrigger,
    vertexIdsToWaitFor,
    vertexIdsToCommitTo,
    new CheckpointCoordinatorConfiguration.Builder()
        .setCheckpointInterval(5000L)
        .setCheckpointTimeout(60000L)
        .setMaxConcurrentCheckpoints(1)
        .build(),
    null
);
jobGraph.setSnapshotSettings(checkpointSettings);

JobVertex

Represents a single operation (vertex) in the job graph, such as a source, transformation, or sink.

/**
 * The base class for job vertices representing operations in the job graph.
 */
public class JobVertex implements Serializable {
    /** Create a job vertex with default name */
    public JobVertex(String name);
    
    /** Create a job vertex with specific ID and name */
    public JobVertex(String name, JobVertexID id);
    
    /** Get the vertex ID */
    public JobVertexID getId();
    
    /** Get the vertex name */
    public String getName();
    
    /** Set the name of this vertex */
    public void setName(String name);
    
    /** Set the parallelism for this vertex */
    public void setParallelism(int parallelism);
    
    /** Get the parallelism of this vertex */
    public int getParallelism();
    
    /** Set the maximum parallelism for this vertex */
    public void setMaxParallelism(int maxParallelism);
    
    /** Get the maximum parallelism of this vertex */
    public int getMaxParallelism();
    
    /** Set the invokable class that implements the vertex logic */
    public void setInvokableClass(Class<? extends TaskInvokable> invokable);
    
    /** Get the invokable class */
    public Class<? extends TaskInvokable> getInvokableClass();
    
    /** Set minimum resource requirements */
    public void setResources(ResourceSpec minResources, ResourceSpec preferredResources);
    
    /** Get minimum resource requirements */
    public ResourceSpec getMinResources();
    
    /** Get preferred resource requirements */
    public ResourceSpec getPreferredResources();
    
    /** Add a new data set as input from another vertex */
    public void connectNewDataSetAsInput(
        JobVertex input,
        DistributionPattern distPattern,
        ResultPartitionType partitionType
    );
    
    /** Connect to an existing intermediate data set */
    public void connectDataSetAsInput(
        IntermediateDataSet dataSet,
        DistributionPattern distPattern
    );
    
    /** Get all input edges */
    public List<JobEdge> getInputs();
    
    /** Get all produced data sets */
    public List<IntermediateDataSet> getProducedDataSets();
    
    /** Set slot sharing group for co-location */
    public void setSlotSharingGroup(SlotSharingGroup slotSharingGroup);
    
    /** Get slot sharing group */
    public SlotSharingGroup getSlotSharingGroup();
    
    /** Set co-location group for strict co-location */
    public void setCoLocationGroup(CoLocationGroup coLocationGroup);
    
    /** Get co-location group */
    public CoLocationGroup getCoLocationGroup();
    
    /** Add operator coordinator */
    public void addOperatorCoordinator(SerializedValue<OperatorCoordinator.Provider> coordinator);
    
    /** Get operator coordinators */
    public List<SerializedValue<OperatorCoordinator.Provider>> getOperatorCoordinators();
}

Usage Examples:

// Create source vertex
JobVertex sourceVertex = new JobVertex("Kafka Source");
sourceVertex.setParallelism(4);
sourceVertex.setInvokableClass(KafkaSourceTask.class);

// Set resource requirements
ResourceSpec minResources = ResourceSpec.newBuilder()
    .setCpuCores(1.0)
    .setHeapMemoryInMB(512)
    .build();
ResourceSpec preferredResources = ResourceSpec.newBuilder()
    .setCpuCores(2.0)
    .setHeapMemoryInMB(1024)
    .build();
sourceVertex.setResources(minResources, preferredResources);

// Create transformation vertex
JobVertex mapVertex = new JobVertex("Data Transformation");
mapVertex.setParallelism(8);
mapVertex.setInvokableClass(MapTask.class);

// Connect vertices
mapVertex.connectNewDataSetAsInput(
    sourceVertex,
    DistributionPattern.ALL_TO_ALL,
    ResultPartitionType.PIPELINED
);

// Set up slot sharing for efficient resource usage
SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
sourceVertex.setSlotSharingGroup(slotSharingGroup);
mapVertex.setSlotSharingGroup(slotSharingGroup);

// Add vertices to job graph
jobGraph.addVertex(sourceVertex);
jobGraph.addVertex(mapVertex);

JobVertex Configuration

Additional configuration options for job vertices including input/output formats and custom parameters.

/**
 * Set input format for vertices that read from external sources
 */
public void setInputSplitSource(InputSplitSource<?> inputSplitSource);

/**
 * Get input split source
 */
public InputSplitSource<?> getInputSplitSource();

/**
 * Set configuration for this vertex
 */
public void setConfiguration(Configuration configuration);

/**
 * Get vertex configuration
 */
public Configuration getConfiguration();

/**
 * Set jar files needed by this vertex
 */
public void addJar(Path jarPath);

/**
 * Get jar files
 */
public List<Path> getJarFiles();

/**
 * Set classpath for this vertex
 */
public void addClasspaths(Collection<URL> classpaths);

/**
 * Get classpaths
 */
public Collection<URL> getUserClasspaths();

JobGraph Utilities

Utility methods for job graph manipulation and validation.

/**
 * Utility class for job graph operations
 */
public class JobGraphUtils {
    /** Validate job graph structure and configuration */
    public static void validateJobGraph(JobGraph jobGraph);
    
    /** Get topologically sorted vertices */
    public static List<JobVertex> getVerticesTopologically(JobGraph jobGraph);
    
    /** Check if job graph contains cycles */
    public static boolean hasCycles(JobGraph jobGraph);
    
    /** Calculate total resource requirements */
    public static ResourceProfile calculateTotalResources(JobGraph jobGraph);
}

Types

// Job graph identifiers
public class JobVertexID implements Serializable {
    public JobVertexID();
    public JobVertexID(byte[] bytes);
    public static JobVertexID generate();
    public byte[] getBytes();
}

public class IntermediateDataSetID implements Serializable {
    public IntermediateDataSetID();
    public IntermediateDataSetID(byte[] bytes);
    public static IntermediateDataSetID generate();
}

// Distribution patterns
public enum DistributionPattern {
    /** Each producing task sends data to all consuming tasks */
    ALL_TO_ALL,
    /** Each producing task sends data to exactly one consuming task */
    POINTWISE
}

// Result partition types
public enum ResultPartitionType {
    /** Pipelined partitions for streaming data exchange */
    PIPELINED,
    /** Pipelined partitions with bounded buffer */
    PIPELINED_BOUNDED,
    /** Blocking partitions for batch processing */
    BLOCKING,
    /** Blocking partitions that can be consumed multiple times */
    BLOCKING_PERSISTENT
}

// Job types
public enum JobType {
    /** Batch processing job with finite input */
    BATCH("BATCH"),
    /** Streaming job with continuous data processing */
    STREAMING("STREAMING");
    
    private final String name;
    
    JobType(String name) {
        this.name = name;
    }
    
    public String getName() {
        return name;
    }
}

// Savepoint restore settings
public class SavepointRestoreSettings implements Serializable {
    public static SavepointRestoreSettings none();
    public static SavepointRestoreSettings forPath(String restorePath);
    public static SavepointRestoreSettings forPath(String restorePath, boolean allowNonRestoredState);
    
    public boolean restoreSavepoint();
    public String getRestorePath();
    public boolean allowNonRestoredState();
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-runtime

docs

execution-scheduling.md

high-availability.md

index.md

job-graph.md

resource-management.md

state-checkpointing.md

task-execution.md

tile.json