CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-core

Apache Flink Core runtime components, type system, and foundational APIs for stream processing applications

Pending
Overview
Eval results
Files

execution-jobs.mddocs/

Execution and Jobs

Apache Flink Core provides comprehensive APIs for job execution, runtime contexts, and job lifecycle management. These components enable applications to interact with the Flink runtime, access execution metadata, and manage distributed job execution.

Job Execution

JobExecutionResult

Access results and statistics after job completion.

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class JobExecutionExample {
    
    public static void basicJobExecution() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Build your job pipeline
        env.fromElements(1, 2, 3, 4, 5)
           .map(x -> x * 2)
           .print();
        
        // Execute and get result
        JobExecutionResult result = env.execute("My Job");
        
        // Access execution information
        long netRuntime = result.getNetRuntime();
        String jobName = result.getJobName();
        JobID jobId = result.getJobID();
        
        System.out.println("Job '" + jobName + "' completed in " + netRuntime + "ms");
        System.out.println("Job ID: " + jobId);
        
        // Access accumulators (if any were used)
        Map<String, Object> accumulators = result.getAllAccumulatorResults();
        for (Map.Entry<String, Object> entry : accumulators.entrySet()) {
            System.out.println("Accumulator " + entry.getKey() + ": " + entry.getValue());
        }
    }
    
    public static void detachedJobExecution() throws Exception {
        Configuration config = new Configuration();
        config.setBoolean(DeploymentOptions.ATTACHED, false); // Detached mode
        
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(config);
        
        // Build pipeline
        env.fromElements("hello", "world", "flink")
           .map(String::toUpperCase)
           .print();
        
        // Execute in detached mode
        JobExecutionResult result = env.execute("Detached Job");
        
        // In detached mode, result is available immediately but job runs asynchronously
        if (result instanceof DetachedJobExecutionResult) {
            System.out.println("Job submitted in detached mode");
            System.out.println("Job ID: " + result.getJobID());
        }
    }
}

JobClient Interface

Interact with running jobs programmatically.

import org.apache.flink.core.execution.JobClient;
import org.apache.flink.api.common.JobStatus;

public class JobClientExample {
    
    public static void jobClientUsage() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Build pipeline
        env.fromElements(1, 2, 3, 4, 5)
           .map(x -> {
               // Simulate long-running computation
               Thread.sleep(1000);
               return x * 2;
           })
           .print();
        
        // Execute and get JobClient
        JobClient jobClient = env.executeAsync("Long Running Job");
        
        // Monitor job status
        CompletableFuture<JobStatus> statusFuture = jobClient.getJobStatus();
        JobStatus status = statusFuture.get();
        System.out.println("Initial job status: " + status);
        
        // Get job execution result asynchronously
        CompletableFuture<JobExecutionResult> resultFuture = jobClient.getJobExecutionResult();
        
        // Cancel job if needed
        if (shouldCancelJob()) {
            CompletableFuture<Void> cancelFuture = jobClient.cancel();
            cancelFuture.get(); // Wait for cancellation
            System.out.println("Job cancelled");
        } else {
            // Wait for job completion
            JobExecutionResult result = resultFuture.get();
            System.out.println("Job completed: " + result.getJobName());
        }
    }
    
    public static void jobClientWithTimeout() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Configure job
        env.fromCollection(generateLargeDataset())
           .keyBy(Record::getKey)
           .sum("value")
           .print();
        
        JobClient jobClient = env.executeAsync("Batch Processing Job");
        
        try {
            // Wait for completion with timeout
            JobExecutionResult result = jobClient.getJobExecutionResult()
                .get(10, TimeUnit.MINUTES);
            
            System.out.println("Job completed successfully");
            
        } catch (TimeoutException e) {
            System.out.println("Job is taking too long, cancelling...");
            jobClient.cancel();
            
        } catch (ExecutionException e) {
            System.out.println("Job failed: " + e.getCause().getMessage());
        }
    }
    
    private static boolean shouldCancelJob() {
        // Implementation-specific logic to determine if job should be cancelled
        return false;
    }
    
    private static List<Record> generateLargeDataset() {
        // Generate test data
        return IntStream.range(0, 10000)
            .mapToObj(i -> new Record("key" + (i % 100), i))
            .collect(Collectors.toList());
    }
}

Runtime Context

Accessing Runtime Information

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.OpenContext;

public class RuntimeContextExample extends RichMapFunction<String, String> {
    
    @Override
    public void open(OpenContext openContext) throws Exception {
        RuntimeContext ctx = getRuntimeContext();
        
        // Basic runtime information
        String taskName = ctx.getTaskName();
        int subtaskIndex = ctx.getIndexOfThisSubtask();
        int numberOfParallelSubtasks = ctx.getNumberOfParallelSubtasks();
        int attemptNumber = ctx.getAttemptNumber();
        
        System.out.println("Task: " + taskName);
        System.out.println("Subtask: " + subtaskIndex + "/" + numberOfParallelSubtasks);
        System.out.println("Attempt: " + attemptNumber);
        
        // Execution configuration
        ExecutionConfig execConfig = ctx.getExecutionConfig();
        int parallelism = execConfig.getParallelism();
        boolean closureCleanerEnabled = execConfig.isClosureCleanerEnabled();
        
        // Distributed cache
        File cachedFile = ctx.getDistributedCache().getFile("my-config-file");
        if (cachedFile != null && cachedFile.exists()) {
            // Use cached file
            loadConfigurationFromFile(cachedFile);
        }
        
        // Job information
        JobInfo jobInfo = ctx.getJobInfo();
        String jobName = jobInfo.getJobName();
        JobID jobId = jobInfo.getJobId();
        
        System.out.println("Job: " + jobName + " (" + jobId + ")");
    }
    
    @Override
    public String map(String value) throws Exception {
        RuntimeContext ctx = getRuntimeContext();
        
        // Access runtime context during processing
        int subtaskIndex = ctx.getIndexOfThisSubtask();
        
        return "[Subtask-" + subtaskIndex + "] " + value;
    }
    
    private void loadConfigurationFromFile(File configFile) {
        // Load configuration from cached file
    }
}

Metrics and Accumulators

import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Meter;

public class MetricsAndAccumulatorsFunction extends RichMapFunction<String, String> {
    
    // Accumulators for job-level statistics
    private IntCounter processedRecords;
    private LongCounter totalBytes;
    
    // Metrics for runtime monitoring
    private Counter metricsCounter;
    private Histogram processingLatency;
    private Meter throughput;
    
    @Override
    public void open(OpenContext openContext) throws Exception {
        RuntimeContext ctx = getRuntimeContext();
        
        // Initialize accumulators
        processedRecords = new IntCounter();
        totalBytes = new LongCounter();
        
        ctx.addAccumulator("processed-records", processedRecords);
        ctx.addAccumulator("total-bytes", totalBytes);
        
        // Initialize metrics
        MetricGroup metricGroup = ctx.getMetricGroup()
            .addGroup("my-operator")
            .addGroup("subtask", String.valueOf(ctx.getIndexOfThisSubtask()));
        
        metricsCounter = metricGroup.counter("records-processed");
        processingLatency = metricGroup.histogram("processing-latency");
        throughput = metricGroup.meter("throughput");
        
        // Custom gauge
        metricGroup.gauge("queue-size", () -> getCurrentQueueSize());
    }
    
    @Override
    public String map(String value) throws Exception {
        long startTime = System.nanoTime();
        
        try {
            // Process the value
            String result = processValue(value);
            
            // Update accumulators
            processedRecords.add(1);
            totalBytes.add(value.getBytes().length);
            
            // Update metrics
            metricsCounter.inc();
            throughput.markEvent();
            
            return result;
            
        } finally {
            // Record processing latency
            long latency = System.nanoTime() - startTime;
            processingLatency.update(latency / 1_000_000); // Convert to milliseconds
        }
    }
    
    private String processValue(String value) {
        // Simulate processing
        return value.toUpperCase();
    }
    
    private int getCurrentQueueSize() {
        // Return current queue size for gauge metric
        return 0; // Implementation specific
    }
}

Job Lifecycle Management

Job Listeners and Hooks

import org.apache.flink.core.execution.JobListener;
import org.apache.flink.core.execution.JobStatusChangedListener;

// Job listener for execution events
public class CustomJobListener implements JobListener {
    
    @Override
    public void onJobSubmitted(JobClient jobClient, Throwable throwable) {
        if (throwable == null) {
            System.out.println("Job submitted successfully: " + jobClient.getJobID());
            
            // Start monitoring job
            startJobMonitoring(jobClient);
            
        } else {
            System.err.println("Job submission failed: " + throwable.getMessage());
            handleJobSubmissionFailure(throwable);
        }
    }
    
    @Override
    public void onJobExecuted(JobExecutionResult jobExecutionResult, Throwable throwable) {
        if (throwable == null) {
            System.out.println("Job executed successfully: " + jobExecutionResult.getJobName());
            System.out.println("Runtime: " + jobExecutionResult.getNetRuntime() + "ms");
            
            // Process job results
            processJobResults(jobExecutionResult);
            
        } else {
            System.err.println("Job execution failed: " + throwable.getMessage());
            handleJobExecutionFailure(throwable);
        }
    }
    
    private void startJobMonitoring(JobClient jobClient) {
        // Start background monitoring
        CompletableFuture.runAsync(() -> {
            try {
                while (true) {
                    JobStatus status = jobClient.getJobStatus().get();
                    System.out.println("Job status: " + status);
                    
                    if (status.isTerminalState()) {
                        break;
                    }
                    
                    Thread.sleep(5000); // Check every 5 seconds
                }
            } catch (Exception e) {
                System.err.println("Job monitoring failed: " + e.getMessage());
            }
        });
    }
    
    private void handleJobSubmissionFailure(Throwable throwable) {
        // Handle submission failure (e.g., notify admin, retry, etc.)
    }
    
    private void processJobResults(JobExecutionResult result) {
        // Process successful job results
        Map<String, Object> accumulators = result.getAllAccumulatorResults();
        for (Map.Entry<String, Object> entry : accumulators.entrySet()) {
            System.out.println("Final " + entry.getKey() + ": " + entry.getValue());
        }
    }
    
    private void handleJobExecutionFailure(Throwable throwable) {
        // Handle execution failure
    }
}

// Job status change listener
public class JobStatusChangeListener implements JobStatusChangedListener {
    
    @Override
    public void onEvent(JobStatusChangedEvent event) {
        JobID jobId = event.getJobId();
        JobStatus oldStatus = event.getOldJobStatus();
        JobStatus newStatus = event.getNewJobStatus();
        
        System.out.println("Job " + jobId + " status changed: " + oldStatus + " -> " + newStatus);
        
        switch (newStatus) {
            case RUNNING:
                handleJobStarted(jobId);
                break;
            case FINISHED:
                handleJobCompleted(jobId);
                break;
            case FAILED:
                handleJobFailed(jobId, event.getThrowable());
                break;
            case CANCELED:
                handleJobCancelled(jobId);
                break;
        }
    }
    
    private void handleJobStarted(JobID jobId) {
        System.out.println("Job " + jobId + " has started execution");
        // Start monitoring, notifications, etc.
    }
    
    private void handleJobCompleted(JobID jobId) {
        System.out.println("Job " + jobId + " completed successfully");
        // Clean up resources, send notifications, etc.
    }
    
    private void handleJobFailed(JobID jobId, Throwable throwable) {
        System.err.println("Job " + jobId + " failed: " + 
            (throwable != null ? throwable.getMessage() : "Unknown error"));
        // Send alerts, trigger retries, etc.
    }
    
    private void handleJobCancelled(JobID jobId) {
        System.out.println("Job " + jobId + " was cancelled");
        // Clean up resources, update status, etc.
    }
}

Using Job Listeners

public class JobWithListeners {
    
    public static void executeWithListeners() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Register job listener
        env.registerJobListener(new CustomJobListener());
        
        // Build pipeline
        env.fromElements("hello", "world", "flink")
           .map(new MetricsAndAccumulatorsFunction())
           .print();
        
        // Execute job (listener will be notified)
        env.execute("Job with Listeners");
    }
    
    public static void manualJobLifecycleManagement() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Build pipeline
        DataStream<String> stream = env.fromElements("data1", "data2", "data3")
            .map(new ProcessingFunction());
        
        stream.print();
        
        // Execute asynchronously
        JobClient jobClient = env.executeAsync("Manual Lifecycle Job");
        
        // Manual lifecycle management
        try {
            // Monitor job progress
            CompletableFuture<JobStatus> statusFuture = jobClient.getJobStatus();
            JobStatus initialStatus = statusFuture.get();
            
            if (initialStatus == JobStatus.RUNNING) {
                System.out.println("Job is running, monitoring progress...");
                
                // Set up periodic status checks
                ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
                ScheduledFuture<?> monitoring = scheduler.scheduleWithFixedDelay(() -> {
                    try {
                        JobStatus currentStatus = jobClient.getJobStatus().get();
                        System.out.println("Current status: " + currentStatus);
                    } catch (Exception e) {
                        System.err.println("Failed to get job status: " + e.getMessage());
                    }
                }, 0, 5, TimeUnit.SECONDS);
                
                // Wait for completion
                JobExecutionResult result = jobClient.getJobExecutionResult().get();
                monitoring.cancel(true);
                scheduler.shutdown();
                
                System.out.println("Job completed: " + result.getJobName());
                
            } else {
                System.err.println("Job failed to start, status: " + initialStatus);
            }
            
        } catch (Exception e) {
            System.err.println("Job management failed: " + e.getMessage());
            
            // Attempt to cancel job on error
            try {
                jobClient.cancel().get(30, TimeUnit.SECONDS);
                System.out.println("Job cancelled due to management failure");
            } catch (Exception cancelException) {
                System.err.println("Failed to cancel job: " + cancelException.getMessage());
            }
        }
    }
    
    private static class ProcessingFunction extends RichMapFunction<String, String> {
        
        @Override
        public String map(String value) throws Exception {
            // Simulate processing time
            Thread.sleep(1000);
            return "Processed: " + value;
        }
    }
}

Pipeline Execution

Custom Pipeline Executors

import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.core.execution.PipelineExecutorFactory;

// Custom pipeline executor factory
public class CustomPipelineExecutorFactory implements PipelineExecutorFactory {
    
    public static final String CUSTOM_EXECUTOR_NAME = "custom";
    
    @Override
    public String getName() {
        return CUSTOM_EXECUTOR_NAME;
    }
    
    @Override
    public boolean isCompatibleWith(Configuration configuration) {
        // Check if this executor is compatible with the configuration
        String executorName = configuration.getString(DeploymentOptions.TARGET);
        return CUSTOM_EXECUTOR_NAME.equals(executorName);
    }
    
    @Override
    public PipelineExecutor getExecutor(Configuration configuration) {
        return new CustomPipelineExecutor(configuration);
    }
}

// Custom pipeline executor implementation
public class CustomPipelineExecutor implements PipelineExecutor {
    
    private final Configuration configuration;
    
    public CustomPipelineExecutor(Configuration configuration) {
        this.configuration = configuration;
    }
    
    @Override
    public CompletableFuture<JobClient> execute(Pipeline pipeline, 
                                               Configuration configuration, 
                                               ClassLoader userClassloader) throws Exception {
        
        System.out.println("Executing pipeline with custom executor");
        
        // Custom execution logic
        JobGraph jobGraph = buildJobGraph(pipeline, configuration);
        
        // Submit job to custom runtime
        return submitJob(jobGraph);
    }
    
    private JobGraph buildJobGraph(Pipeline pipeline, Configuration config) {
        // Convert pipeline to JobGraph
        // This would typically involve more complex logic
        return new JobGraph("Custom Job");
    }
    
    private CompletableFuture<JobClient> submitJob(JobGraph jobGraph) {
        // Submit job to execution runtime
        return CompletableFuture.supplyAsync(() -> {
            // Simulate job submission
            JobID jobId = JobID.generate();
            return new CustomJobClient(jobId);
        });
    }
    
    private static class CustomJobClient implements JobClient {
        private final JobID jobId;
        
        public CustomJobClient(JobID jobId) {
            this.jobId = jobId;
        }
        
        @Override
        public JobID getJobID() {
            return jobId;
        }
        
        @Override
        public CompletableFuture<JobStatus> getJobStatus() {
            return CompletableFuture.completedFuture(JobStatus.RUNNING);
        }
        
        @Override
        public CompletableFuture<Void> cancel() {
            return CompletableFuture.completedFuture(null);
        }
        
        @Override
        public CompletableFuture<String> stopWithSavepoint(boolean advanceToEndOfEventTime, 
                                                          String savepointDirectory, 
                                                          SavepointFormatType formatType) {
            return CompletableFuture.completedFuture("savepoint-path");
        }
        
        @Override
        public CompletableFuture<String> triggerSavepoint(String savepointDirectory, 
                                                        SavepointFormatType formatType) {
            return CompletableFuture.completedFuture("savepoint-path");
        }
        
        @Override
        public CompletableFuture<Map<String, Object>> getAccumulators() {
            return CompletableFuture.completedFuture(Collections.emptyMap());
        }
        
        @Override
        public CompletableFuture<JobExecutionResult> getJobExecutionResult() {
            // Simulate job completion
            return CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(5000); // Simulate execution time
                    return new JobExecutionResult(jobId, 5000, Collections.emptyMap());
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            });
        }
    }
}

Distributed Cache

Using Distributed Cache

import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.functions.OpenContext;

public class DistributedCacheExample extends RichMapFunction<String, String> {
    
    private Properties configProperties;
    private List<String> referenceData;
    
    @Override
    public void open(OpenContext openContext) throws Exception {
        RuntimeContext ctx = getRuntimeContext();
        DistributedCache cache = ctx.getDistributedCache();
        
        // Access cached files
        File configFile = cache.getFile("config.properties");
        if (configFile != null && configFile.exists()) {
            configProperties = new Properties();
            try (FileInputStream fis = new FileInputStream(configFile)) {
                configProperties.load(fis);
            }
        }
        
        // Access cached directory
        File dataDir = cache.getFile("reference-data");
        if (dataDir != null && dataDir.isDirectory()) {
            referenceData = loadReferenceData(dataDir);
        }
    }
    
    @Override
    public String map(String value) throws Exception {
        // Use cached configuration and data
        String prefix = configProperties.getProperty("output.prefix", "");
        
        // Lookup reference data
        boolean isValid = referenceData.contains(value);
        
        return prefix + value + (isValid ? " [VALID]" : " [INVALID]");
    }
    
    private List<String> loadReferenceData(File dataDir) {
        List<String> data = new ArrayList<>();
        
        File[] files = dataDir.listFiles((dir, name) -> name.endsWith(".txt"));
        if (files != null) {
            for (File file : files) {
                try (BufferedReader reader = Files.newBufferedReader(file.toPath())) {
                    String line;
                    while ((line = reader.readLine()) != null) {
                        data.add(line.trim());
                    }
                } catch (IOException e) {
                    System.err.println("Error reading reference data file: " + e.getMessage());
                }
            }
        }
        
        return data;
    }
    
    public static void registerCachedFiles() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Register files with distributed cache
        env.registerCachedFile("/path/to/config.properties", "config.properties");
        env.registerCachedFile("/path/to/reference-data/", "reference-data", true); // true for directory
        
        // Use the function that accesses cached files
        env.fromElements("item1", "item2", "item3")
           .map(new DistributedCacheExample())
           .print();
        
        env.execute("Distributed Cache Example");
    }
}

Apache Flink's execution and job management APIs provide comprehensive control over job lifecycle, runtime information access, and execution monitoring. By leveraging these capabilities, you can build robust applications with proper monitoring, error handling, and resource management.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-core

docs

configuration.md

connectors.md

event-time-watermarks.md

execution-jobs.md

functions-and-operators.md

index.md

state-management.md

type-system-serialization.md

utilities.md

tile.json