CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-clients

Apache Flink client library providing programmatic APIs and command-line interfaces for submitting, managing, and monitoring Flink jobs.

Pending
Overview
Eval results
Files

application-mode.mddocs/

Application Mode Deployment

Specialized deployment mode for long-running applications with dedicated cluster resources, lifecycle management, and optimized resource utilization for application-centric workloads.

Capabilities

Application Runner Interface

Core interface for running applications in application mode with full lifecycle management.

/**
 * Interface for running applications in application mode
 */
public interface ApplicationRunner {
    /**
     * Run application with dispatcher gateway and configuration
     * @param dispatcherGateway Gateway to the dispatcher
     * @param scheduledExecutor Executor for scheduled operations
     * @param applicationConfiguration Application configuration
     * @return Future completing when application finishes
     */
    CompletableFuture<Void> run(
        DispatcherGateway dispatcherGateway,
        ScheduledExecutor scheduledExecutor,
        ApplicationConfiguration applicationConfiguration
    );
}

Detached Application Runner

Application runner implementation for detached execution where the client doesn't wait for job completion.

/**
 * Application runner for detached execution
 */
public class DetachedApplicationRunner implements ApplicationRunner {
    /**
     * Create detached application runner
     * @param highAvailabilityServices High availability services
     * @param configuration Flink configuration
     * @param mainThreadExecutor Main thread executor
     */
    public DetachedApplicationRunner(
        HighAvailabilityServices highAvailabilityServices,
        Configuration configuration,
        Executor mainThreadExecutor
    );
    
    @Override
    public CompletableFuture<Void> run(
        DispatcherGateway dispatcherGateway,
        ScheduledExecutor scheduledExecutor,
        ApplicationConfiguration applicationConfiguration
    );
}

Application Configuration

Configuration class for application deployments containing program arguments, entry point information, and execution settings.

/**
 * Configuration for application deployments
 */
public class ApplicationConfiguration {
    /**
     * Create configuration builder from Flink configuration
     * @param configuration Flink configuration
     * @return Configuration builder
     */
    public static Builder fromConfiguration(Configuration configuration);
    
    /**
     * Get program arguments
     * @return Array of program arguments
     */
    public String[] getProgramArguments();
    
    /**
     * Get application arguments
     * @return Array of application arguments
     */
    public String[] getApplicationArgs();
    
    /**
     * Get parallelism setting
     * @return Parallelism or null if not set
     */
    @Nullable
    public Integer getParallelism();
    
    /**
     * Get savepoint restore settings
     * @return Savepoint restore settings
     */
    public SavepointRestoreSettings getSavepointRestoreSettings();
    
    /**
     * Builder for application configuration
     */
    public static class Builder {
        /**
         * Set program arguments
         * @param programArguments Array of arguments
         * @return Builder instance
         */
        public Builder setProgramArguments(String... programArguments);
        
        /**
         * Set application arguments
         * @param applicationArgs Array of application arguments
         * @return Builder instance
         */
        public Builder setApplicationArgs(String... applicationArgs);
        
        /**
         * Set parallelism
         * @param parallelism Parallelism setting
         * @return Builder instance
         */
        public Builder setParallelism(Integer parallelism);
        
        /**
         * Set savepoint restore settings
         * @param savepointRestoreSettings Savepoint settings
         * @return Builder instance
         */
        public Builder setSavepointRestoreSettings(
            SavepointRestoreSettings savepointRestoreSettings
        );
        
        /**
         * Build application configuration
         * @return Configured application configuration
         */
        public ApplicationConfiguration build();
    }
}

Usage Examples:

import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;

// Create basic application configuration
ApplicationConfiguration config = ApplicationConfiguration
    .fromConfiguration(flinkConfig)
    .setProgramArguments("--input", "input.txt", "--output", "output.txt")
    .setParallelism(4)
    .build();

// Create configuration with savepoint restore
SavepointRestoreSettings savepointSettings = SavepointRestoreSettings
    .forPath("/path/to/savepoint", false);

ApplicationConfiguration configWithSavepoint = ApplicationConfiguration
    .fromConfiguration(flinkConfig)
    .setProgramArguments("--mode", "batch")
    .setSavepointRestoreSettings(savepointSettings)
    .build();

Application Cluster Entry Point

Entry point for application cluster mode providing cluster initialization and application execution coordination.

/**
 * Entry point for application cluster mode
 */
public class ApplicationClusterEntryPoint extends ClusterEntrypoint {
    /**
     * Main method for application cluster entry point
     * @param args Command-line arguments
     */
    public static void main(String[] args);
    
    /**
     * Create application cluster entry point
     * @param configuration Cluster configuration
     * @param pluginManager Plugin manager
     */
    protected ApplicationClusterEntryPoint(
        Configuration configuration,
        PluginManager pluginManager
    );
    
    @Override
    protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(
        Configuration configuration
    );
}

Application Job Clients

Specialized job client implementations for application mode execution.

/**
 * Job client for embedded application execution
 */
public class EmbeddedJobClient implements JobClient {
    /**
     * Create embedded job client
     * @param jobID Job identifier
     * @param dispatcher Dispatcher gateway
     * @param executorService Executor service
     * @param classLoader User code class loader
     */
    public EmbeddedJobClient(
        JobID jobID,
        DispatcherGateway dispatcher,
        ScheduledExecutorService executorService,
        ClassLoader classLoader
    );
    
    @Override
    public JobID getJobID();
    
    @Override
    public CompletableFuture<JobStatus> getJobStatus();
    
    @Override
    public CompletableFuture<Void> cancel();
    
    @Override
    public CompletableFuture<String> stopWithSavepoint(
        boolean advanceToEndOfEventTime,
        @Nullable String savepointDir,
        SavepointFormatType formatType
    );
    
    @Override
    public CompletableFuture<JobExecutionResult> getJobExecutionResult();
}

/**
 * Job client for web submission application execution
 */
public class WebSubmissionJobClient implements JobClient {
    /**
     * Create web submission job client
     * @param jobSubmissionResult Job submission result
     * @param jobStatusSupplier Supplier for job status
     * @param jobResultSupplier Supplier for job result
     * @param classLoader User code class loader
     */
    public WebSubmissionJobClient(
        JobSubmissionResult jobSubmissionResult,
        Supplier<CompletableFuture<JobStatus>> jobStatusSupplier,
        Supplier<CompletableFuture<JobResult>> jobResultSupplier,
        ClassLoader classLoader
    );
    
    @Override
    public JobID getJobID();
    
    @Override
    public CompletableFuture<JobStatus> getJobStatus();
    
    @Override
    public CompletableFuture<JobExecutionResult> getJobExecutionResult();
}

Entry Class Information Providers

System for discovering and providing entry class information from JARs and classpaths.

/**
 * Provider for entry class information
 */
public interface EntryClassInformationProvider {
    /**
     * Get program entry point class name
     * @return Entry point class name or null
     */
    @Nullable
    String getJobClassName();
    
    /**
     * Get JAR file location
     * @return JAR file or null
     */
    @Nullable
    File getJarFile();
}

/**
 * Entry class provider from JAR manifest
 */
public class FromJarEntryClassInformationProvider implements EntryClassInformationProvider {
    /**
     * Create provider from JAR file
     * @param jarFile JAR file to analyze
     * @param programArguments Program arguments
     */
    public FromJarEntryClassInformationProvider(File jarFile, String[] programArguments);
    
    @Override
    public String getJobClassName();
    
    @Override
    public File getJarFile();
}

/**
 * Entry class provider from classpath scanning
 */
public class FromClasspathEntryClassInformationProvider implements EntryClassInformationProvider {
    /**
     * Create provider from classpath
     * @param jobClassName Job class name
     * @param programArguments Program arguments
     */
    public FromClasspathEntryClassInformationProvider(
        String jobClassName,
        String[] programArguments
    );
    
    @Override
    public String getJobClassName();
    
    @Override
    public File getJarFile();
}

Application Executors

Executor implementations specifically designed for application mode execution.

/**
 * Executor for embedded application execution
 */
public class EmbeddedExecutor implements PipelineExecutor {
    /**
     * Create embedded executor
     * @param dispatcherGateway Dispatcher gateway
     * @param executorService Executor service
     * @param configuration Flink configuration
     * @param userCodeClassLoader User code class loader
     */
    public EmbeddedExecutor(
        DispatcherGateway dispatcherGateway,
        ScheduledExecutorService executorService,
        Configuration configuration,
        ClassLoader userCodeClassLoader
    );
    
    @Override
    public CompletableFuture<JobClient> execute(
        Pipeline pipeline,
        Configuration configuration,
        ClassLoader userCodeClassloader
    ) throws Exception;
}

/**
 * Factory for embedded executors
 */
public class EmbeddedExecutorFactory implements PipelineExecutorFactory {
    @Override
    public String getName();
    
    @Override
    public boolean isCompatibleWith(Configuration configuration);
    
    @Override
    public PipelineExecutor getExecutor(Configuration configuration);
}

Utility Classes

Utility classes for application mode operations and status monitoring.

/**
 * Utilities for polling job status in application mode
 */
public class JobStatusPollingUtils {
    /**
     * Poll job status until completion
     * @param jobClient Job client
     * @param scheduledExecutor Scheduled executor
     * @param timeout Polling timeout
     * @return Future with final job result
     */
    public static CompletableFuture<JobExecutionResult> pollJobStatusUntilFinished(
        JobClient jobClient,
        ScheduledExecutorService scheduledExecutor,
        Duration timeout
    );
    
    /**
     * Poll job status with custom polling interval
     * @param jobStatusSupplier Job status supplier
     * @param scheduledExecutor Scheduled executor
     * @param pollingInterval Polling interval
     * @param timeout Total timeout
     * @return Future with final job status
     */
    public static CompletableFuture<JobStatus> pollJobStatus(
        Supplier<CompletableFuture<JobStatus>> jobStatusSupplier,
        ScheduledExecutorService scheduledExecutor,
        Duration pollingInterval,
        Duration timeout
    );
}

/**
 * Parser for JAR manifest files
 */
public class JarManifestParser {
    /**
     * Find entry class from JAR manifest
     * @param jarFile JAR file to analyze
     * @return Entry class name or null
     */
    @Nullable
    public static String findEntryClass(File jarFile);
    
    /**
     * Check if JAR contains main class
     * @param jarFile JAR file to check
     * @return true if main class found
     */
    public static boolean hasMainClass(File jarFile);
}

Types

/**
 * Job submission result for application mode
 */
public class JobSubmissionResult {
    /**
     * Get submitted job ID
     * @return Job identifier
     */
    public JobID getJobID();
    
    /**
     * Check if submission was successful
     * @return true if successful
     */
    public boolean isSuccess();
}

/**
 * Embedded job client creator
 */
public class EmbeddedJobClientCreator {
    /**
     * Create embedded job client
     * @param dispatcherGateway Dispatcher gateway
     * @param executorService Executor service
     * @param archivedExecutionGraph Archived execution graph
     * @param userCodeClassLoader User code class loader
     * @return Embedded job client
     */
    public static EmbeddedJobClient create(
        DispatcherGateway dispatcherGateway,
        ScheduledExecutorService executorService,
        ArchivedExecutionGraph archivedExecutionGraph,
        ClassLoader userCodeClassLoader
    );
}

Exception Handling

Application mode operations handle specific error conditions:

  • Application Execution Errors: ApplicationExecutionException for application runtime failures
  • Unsuccessful Execution: UnsuccessfulExecutionException for failed job executions
  • Configuration Errors: Invalid application configurations or missing parameters
  • Resource Errors: Insufficient resources for application cluster deployment
  • Job Client Errors: Communication failures with job management systems

Error Handling Examples:

try {
    ApplicationConfiguration config = ApplicationConfiguration
        .fromConfiguration(flinkConfig)
        .setProgramArguments("--input", "data.txt")
        .build();
    
    CompletableFuture<Void> applicationResult = runner.run(
        dispatcherGateway,
        scheduledExecutor,
        config
    );
    
    applicationResult.get(); // Wait for completion
    
} catch (ApplicationExecutionException e) {
    System.err.println("Application execution failed: " + e.getMessage());
} catch (UnsuccessfulExecutionException e) {
    System.err.println("Job execution unsuccessful: " + e.getMessage());
}

Application mode provides dedicated cluster resources for long-running applications, enabling better resource isolation, simplified deployment, and optimized lifecycle management compared to session mode deployments.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-clients

docs

application-mode.md

artifact-management.md

cli-frontend.md

cluster-client.md

deployment-management.md

index.md

program-packaging.md

tile.json