Apache Flink Core runtime components, type system, and foundational APIs for stream processing applications
—
Apache Flink Core provides comprehensive APIs for job execution, runtime contexts, and job lifecycle management. These components enable applications to interact with the Flink runtime, access execution metadata, and manage distributed job execution.
Access results and statistics after job completion.
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class JobExecutionExample {
public static void basicJobExecution() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Build your job pipeline
env.fromElements(1, 2, 3, 4, 5)
.map(x -> x * 2)
.print();
// Execute and get result
JobExecutionResult result = env.execute("My Job");
// Access execution information
long netRuntime = result.getNetRuntime();
String jobName = result.getJobName();
JobID jobId = result.getJobID();
System.out.println("Job '" + jobName + "' completed in " + netRuntime + "ms");
System.out.println("Job ID: " + jobId);
// Access accumulators (if any were used)
Map<String, Object> accumulators = result.getAllAccumulatorResults();
for (Map.Entry<String, Object> entry : accumulators.entrySet()) {
System.out.println("Accumulator " + entry.getKey() + ": " + entry.getValue());
}
}
public static void detachedJobExecution() throws Exception {
Configuration config = new Configuration();
config.setBoolean(DeploymentOptions.ATTACHED, false); // Detached mode
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(config);
// Build pipeline
env.fromElements("hello", "world", "flink")
.map(String::toUpperCase)
.print();
// Execute in detached mode
JobExecutionResult result = env.execute("Detached Job");
// In detached mode, result is available immediately but job runs asynchronously
if (result instanceof DetachedJobExecutionResult) {
System.out.println("Job submitted in detached mode");
System.out.println("Job ID: " + result.getJobID());
}
}
}Interact with running jobs programmatically.
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.api.common.JobStatus;
public class JobClientExample {
public static void jobClientUsage() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Build pipeline
env.fromElements(1, 2, 3, 4, 5)
.map(x -> {
// Simulate long-running computation
Thread.sleep(1000);
return x * 2;
})
.print();
// Execute and get JobClient
JobClient jobClient = env.executeAsync("Long Running Job");
// Monitor job status
CompletableFuture<JobStatus> statusFuture = jobClient.getJobStatus();
JobStatus status = statusFuture.get();
System.out.println("Initial job status: " + status);
// Get job execution result asynchronously
CompletableFuture<JobExecutionResult> resultFuture = jobClient.getJobExecutionResult();
// Cancel job if needed
if (shouldCancelJob()) {
CompletableFuture<Void> cancelFuture = jobClient.cancel();
cancelFuture.get(); // Wait for cancellation
System.out.println("Job cancelled");
} else {
// Wait for job completion
JobExecutionResult result = resultFuture.get();
System.out.println("Job completed: " + result.getJobName());
}
}
public static void jobClientWithTimeout() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Configure job
env.fromCollection(generateLargeDataset())
.keyBy(Record::getKey)
.sum("value")
.print();
JobClient jobClient = env.executeAsync("Batch Processing Job");
try {
// Wait for completion with timeout
JobExecutionResult result = jobClient.getJobExecutionResult()
.get(10, TimeUnit.MINUTES);
System.out.println("Job completed successfully");
} catch (TimeoutException e) {
System.out.println("Job is taking too long, cancelling...");
jobClient.cancel();
} catch (ExecutionException e) {
System.out.println("Job failed: " + e.getCause().getMessage());
}
}
private static boolean shouldCancelJob() {
// Implementation-specific logic to determine if job should be cancelled
return false;
}
private static List<Record> generateLargeDataset() {
// Generate test data
return IntStream.range(0, 10000)
.mapToObj(i -> new Record("key" + (i % 100), i))
.collect(Collectors.toList());
}
}import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.OpenContext;
public class RuntimeContextExample extends RichMapFunction<String, String> {
@Override
public void open(OpenContext openContext) throws Exception {
RuntimeContext ctx = getRuntimeContext();
// Basic runtime information
String taskName = ctx.getTaskName();
int subtaskIndex = ctx.getIndexOfThisSubtask();
int numberOfParallelSubtasks = ctx.getNumberOfParallelSubtasks();
int attemptNumber = ctx.getAttemptNumber();
System.out.println("Task: " + taskName);
System.out.println("Subtask: " + subtaskIndex + "/" + numberOfParallelSubtasks);
System.out.println("Attempt: " + attemptNumber);
// Execution configuration
ExecutionConfig execConfig = ctx.getExecutionConfig();
int parallelism = execConfig.getParallelism();
boolean closureCleanerEnabled = execConfig.isClosureCleanerEnabled();
// Distributed cache
File cachedFile = ctx.getDistributedCache().getFile("my-config-file");
if (cachedFile != null && cachedFile.exists()) {
// Use cached file
loadConfigurationFromFile(cachedFile);
}
// Job information
JobInfo jobInfo = ctx.getJobInfo();
String jobName = jobInfo.getJobName();
JobID jobId = jobInfo.getJobId();
System.out.println("Job: " + jobName + " (" + jobId + ")");
}
@Override
public String map(String value) throws Exception {
RuntimeContext ctx = getRuntimeContext();
// Access runtime context during processing
int subtaskIndex = ctx.getIndexOfThisSubtask();
return "[Subtask-" + subtaskIndex + "] " + value;
}
private void loadConfigurationFromFile(File configFile) {
// Load configuration from cached file
}
}import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Meter;
public class MetricsAndAccumulatorsFunction extends RichMapFunction<String, String> {
// Accumulators for job-level statistics
private IntCounter processedRecords;
private LongCounter totalBytes;
// Metrics for runtime monitoring
private Counter metricsCounter;
private Histogram processingLatency;
private Meter throughput;
@Override
public void open(OpenContext openContext) throws Exception {
RuntimeContext ctx = getRuntimeContext();
// Initialize accumulators
processedRecords = new IntCounter();
totalBytes = new LongCounter();
ctx.addAccumulator("processed-records", processedRecords);
ctx.addAccumulator("total-bytes", totalBytes);
// Initialize metrics
MetricGroup metricGroup = ctx.getMetricGroup()
.addGroup("my-operator")
.addGroup("subtask", String.valueOf(ctx.getIndexOfThisSubtask()));
metricsCounter = metricGroup.counter("records-processed");
processingLatency = metricGroup.histogram("processing-latency");
throughput = metricGroup.meter("throughput");
// Custom gauge
metricGroup.gauge("queue-size", () -> getCurrentQueueSize());
}
@Override
public String map(String value) throws Exception {
long startTime = System.nanoTime();
try {
// Process the value
String result = processValue(value);
// Update accumulators
processedRecords.add(1);
totalBytes.add(value.getBytes().length);
// Update metrics
metricsCounter.inc();
throughput.markEvent();
return result;
} finally {
// Record processing latency
long latency = System.nanoTime() - startTime;
processingLatency.update(latency / 1_000_000); // Convert to milliseconds
}
}
private String processValue(String value) {
// Simulate processing
return value.toUpperCase();
}
private int getCurrentQueueSize() {
// Return current queue size for gauge metric
return 0; // Implementation specific
}
}import org.apache.flink.core.execution.JobListener;
import org.apache.flink.core.execution.JobStatusChangedListener;
// Job listener for execution events
public class CustomJobListener implements JobListener {
@Override
public void onJobSubmitted(JobClient jobClient, Throwable throwable) {
if (throwable == null) {
System.out.println("Job submitted successfully: " + jobClient.getJobID());
// Start monitoring job
startJobMonitoring(jobClient);
} else {
System.err.println("Job submission failed: " + throwable.getMessage());
handleJobSubmissionFailure(throwable);
}
}
@Override
public void onJobExecuted(JobExecutionResult jobExecutionResult, Throwable throwable) {
if (throwable == null) {
System.out.println("Job executed successfully: " + jobExecutionResult.getJobName());
System.out.println("Runtime: " + jobExecutionResult.getNetRuntime() + "ms");
// Process job results
processJobResults(jobExecutionResult);
} else {
System.err.println("Job execution failed: " + throwable.getMessage());
handleJobExecutionFailure(throwable);
}
}
private void startJobMonitoring(JobClient jobClient) {
// Start background monitoring
CompletableFuture.runAsync(() -> {
try {
while (true) {
JobStatus status = jobClient.getJobStatus().get();
System.out.println("Job status: " + status);
if (status.isTerminalState()) {
break;
}
Thread.sleep(5000); // Check every 5 seconds
}
} catch (Exception e) {
System.err.println("Job monitoring failed: " + e.getMessage());
}
});
}
private void handleJobSubmissionFailure(Throwable throwable) {
// Handle submission failure (e.g., notify admin, retry, etc.)
}
private void processJobResults(JobExecutionResult result) {
// Process successful job results
Map<String, Object> accumulators = result.getAllAccumulatorResults();
for (Map.Entry<String, Object> entry : accumulators.entrySet()) {
System.out.println("Final " + entry.getKey() + ": " + entry.getValue());
}
}
private void handleJobExecutionFailure(Throwable throwable) {
// Handle execution failure
}
}
// Job status change listener
public class JobStatusChangeListener implements JobStatusChangedListener {
@Override
public void onEvent(JobStatusChangedEvent event) {
JobID jobId = event.getJobId();
JobStatus oldStatus = event.getOldJobStatus();
JobStatus newStatus = event.getNewJobStatus();
System.out.println("Job " + jobId + " status changed: " + oldStatus + " -> " + newStatus);
switch (newStatus) {
case RUNNING:
handleJobStarted(jobId);
break;
case FINISHED:
handleJobCompleted(jobId);
break;
case FAILED:
handleJobFailed(jobId, event.getThrowable());
break;
case CANCELED:
handleJobCancelled(jobId);
break;
}
}
private void handleJobStarted(JobID jobId) {
System.out.println("Job " + jobId + " has started execution");
// Start monitoring, notifications, etc.
}
private void handleJobCompleted(JobID jobId) {
System.out.println("Job " + jobId + " completed successfully");
// Clean up resources, send notifications, etc.
}
private void handleJobFailed(JobID jobId, Throwable throwable) {
System.err.println("Job " + jobId + " failed: " +
(throwable != null ? throwable.getMessage() : "Unknown error"));
// Send alerts, trigger retries, etc.
}
private void handleJobCancelled(JobID jobId) {
System.out.println("Job " + jobId + " was cancelled");
// Clean up resources, update status, etc.
}
}public class JobWithListeners {
public static void executeWithListeners() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Register job listener
env.registerJobListener(new CustomJobListener());
// Build pipeline
env.fromElements("hello", "world", "flink")
.map(new MetricsAndAccumulatorsFunction())
.print();
// Execute job (listener will be notified)
env.execute("Job with Listeners");
}
public static void manualJobLifecycleManagement() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Build pipeline
DataStream<String> stream = env.fromElements("data1", "data2", "data3")
.map(new ProcessingFunction());
stream.print();
// Execute asynchronously
JobClient jobClient = env.executeAsync("Manual Lifecycle Job");
// Manual lifecycle management
try {
// Monitor job progress
CompletableFuture<JobStatus> statusFuture = jobClient.getJobStatus();
JobStatus initialStatus = statusFuture.get();
if (initialStatus == JobStatus.RUNNING) {
System.out.println("Job is running, monitoring progress...");
// Set up periodic status checks
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
ScheduledFuture<?> monitoring = scheduler.scheduleWithFixedDelay(() -> {
try {
JobStatus currentStatus = jobClient.getJobStatus().get();
System.out.println("Current status: " + currentStatus);
} catch (Exception e) {
System.err.println("Failed to get job status: " + e.getMessage());
}
}, 0, 5, TimeUnit.SECONDS);
// Wait for completion
JobExecutionResult result = jobClient.getJobExecutionResult().get();
monitoring.cancel(true);
scheduler.shutdown();
System.out.println("Job completed: " + result.getJobName());
} else {
System.err.println("Job failed to start, status: " + initialStatus);
}
} catch (Exception e) {
System.err.println("Job management failed: " + e.getMessage());
// Attempt to cancel job on error
try {
jobClient.cancel().get(30, TimeUnit.SECONDS);
System.out.println("Job cancelled due to management failure");
} catch (Exception cancelException) {
System.err.println("Failed to cancel job: " + cancelException.getMessage());
}
}
}
private static class ProcessingFunction extends RichMapFunction<String, String> {
@Override
public String map(String value) throws Exception {
// Simulate processing time
Thread.sleep(1000);
return "Processed: " + value;
}
}
}import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.core.execution.PipelineExecutorFactory;
// Custom pipeline executor factory
public class CustomPipelineExecutorFactory implements PipelineExecutorFactory {
public static final String CUSTOM_EXECUTOR_NAME = "custom";
@Override
public String getName() {
return CUSTOM_EXECUTOR_NAME;
}
@Override
public boolean isCompatibleWith(Configuration configuration) {
// Check if this executor is compatible with the configuration
String executorName = configuration.getString(DeploymentOptions.TARGET);
return CUSTOM_EXECUTOR_NAME.equals(executorName);
}
@Override
public PipelineExecutor getExecutor(Configuration configuration) {
return new CustomPipelineExecutor(configuration);
}
}
// Custom pipeline executor implementation
public class CustomPipelineExecutor implements PipelineExecutor {
private final Configuration configuration;
public CustomPipelineExecutor(Configuration configuration) {
this.configuration = configuration;
}
@Override
public CompletableFuture<JobClient> execute(Pipeline pipeline,
Configuration configuration,
ClassLoader userClassloader) throws Exception {
System.out.println("Executing pipeline with custom executor");
// Custom execution logic
JobGraph jobGraph = buildJobGraph(pipeline, configuration);
// Submit job to custom runtime
return submitJob(jobGraph);
}
private JobGraph buildJobGraph(Pipeline pipeline, Configuration config) {
// Convert pipeline to JobGraph
// This would typically involve more complex logic
return new JobGraph("Custom Job");
}
private CompletableFuture<JobClient> submitJob(JobGraph jobGraph) {
// Submit job to execution runtime
return CompletableFuture.supplyAsync(() -> {
// Simulate job submission
JobID jobId = JobID.generate();
return new CustomJobClient(jobId);
});
}
private static class CustomJobClient implements JobClient {
private final JobID jobId;
public CustomJobClient(JobID jobId) {
this.jobId = jobId;
}
@Override
public JobID getJobID() {
return jobId;
}
@Override
public CompletableFuture<JobStatus> getJobStatus() {
return CompletableFuture.completedFuture(JobStatus.RUNNING);
}
@Override
public CompletableFuture<Void> cancel() {
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<String> stopWithSavepoint(boolean advanceToEndOfEventTime,
String savepointDirectory,
SavepointFormatType formatType) {
return CompletableFuture.completedFuture("savepoint-path");
}
@Override
public CompletableFuture<String> triggerSavepoint(String savepointDirectory,
SavepointFormatType formatType) {
return CompletableFuture.completedFuture("savepoint-path");
}
@Override
public CompletableFuture<Map<String, Object>> getAccumulators() {
return CompletableFuture.completedFuture(Collections.emptyMap());
}
@Override
public CompletableFuture<JobExecutionResult> getJobExecutionResult() {
// Simulate job completion
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000); // Simulate execution time
return new JobExecutionResult(jobId, 5000, Collections.emptyMap());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
}
}import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.functions.OpenContext;
public class DistributedCacheExample extends RichMapFunction<String, String> {
private Properties configProperties;
private List<String> referenceData;
@Override
public void open(OpenContext openContext) throws Exception {
RuntimeContext ctx = getRuntimeContext();
DistributedCache cache = ctx.getDistributedCache();
// Access cached files
File configFile = cache.getFile("config.properties");
if (configFile != null && configFile.exists()) {
configProperties = new Properties();
try (FileInputStream fis = new FileInputStream(configFile)) {
configProperties.load(fis);
}
}
// Access cached directory
File dataDir = cache.getFile("reference-data");
if (dataDir != null && dataDir.isDirectory()) {
referenceData = loadReferenceData(dataDir);
}
}
@Override
public String map(String value) throws Exception {
// Use cached configuration and data
String prefix = configProperties.getProperty("output.prefix", "");
// Lookup reference data
boolean isValid = referenceData.contains(value);
return prefix + value + (isValid ? " [VALID]" : " [INVALID]");
}
private List<String> loadReferenceData(File dataDir) {
List<String> data = new ArrayList<>();
File[] files = dataDir.listFiles((dir, name) -> name.endsWith(".txt"));
if (files != null) {
for (File file : files) {
try (BufferedReader reader = Files.newBufferedReader(file.toPath())) {
String line;
while ((line = reader.readLine()) != null) {
data.add(line.trim());
}
} catch (IOException e) {
System.err.println("Error reading reference data file: " + e.getMessage());
}
}
}
return data;
}
public static void registerCachedFiles() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Register files with distributed cache
env.registerCachedFile("/path/to/config.properties", "config.properties");
env.registerCachedFile("/path/to/reference-data/", "reference-data", true); // true for directory
// Use the function that accesses cached files
env.fromElements("item1", "item2", "item3")
.map(new DistributedCacheExample())
.print();
env.execute("Distributed Cache Example");
}
}Apache Flink's execution and job management APIs provide comprehensive control over job lifecycle, runtime information access, and execution monitoring. By leveraging these capabilities, you can build robust applications with proper monitoring, error handling, and resource management.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-core