CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-clients-2-11

Apache Flink Client APIs and utilities for submitting and interacting with Flink jobs

Pending
Overview
Eval results
Files

rest-client-communication.mddocs/

REST Client Communication

The Apache Flink REST Client Communication module (org.apache.flink.client.program.rest.*) provides REST-based cluster communication capabilities with comprehensive retry logic, SSL support, and configuration management for remote cluster interaction. This module enables reliable communication with Flink clusters over HTTP/HTTPS protocols.

Core REST Client Classes

RestClusterClient { .api }

REST-based implementation of the ClusterClient interface for communicating with remote Flink clusters.

public class RestClusterClient<T> implements ClusterClient<T> {
    // Constructors
    public RestClusterClient(Configuration configuration, 
                           RestClusterClientConfiguration restClusterClientConfiguration, 
                           T clusterId) { }
    
    public RestClusterClient(Configuration configuration, 
                           RestClusterClientConfiguration restClusterClientConfiguration, 
                           T clusterId, 
                           WaitStrategy waitStrategy) { }
    
    // ClusterClient interface implementation
    public T getClusterId() { }
    public Configuration getFlinkConfiguration() { }
    public String getWebInterfaceURL() { }
    
    // Job management via REST
    public CompletableFuture<Collection<JobStatusMessage>> listJobs() { }
    public CompletableFuture<JobID> submitJob(JobGraph jobGraph) { }
    public CompletableFuture<JobStatus> getJobStatus(JobID jobId) { }
    public CompletableFuture<JobResult> requestJobResult(JobID jobId) { }
    
    // Job control operations
    public CompletableFuture<Acknowledge> cancel(JobID jobId) { }
    public CompletableFuture<String> cancelWithSavepoint(JobID jobId, String savepointDirectory) { }
    public CompletableFuture<String> stopWithSavepoint(JobID jobId, 
                                                       boolean advanceToEndOfEventTime, 
                                                       String savepointDirectory) { }
    
    // Savepoint operations
    public CompletableFuture<String> triggerSavepoint(JobID jobId, String savepointDirectory) { }
    public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) { }
    
    // Metrics and coordination
    public CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID, ClassLoader loader) { }
    public CompletableFuture<CoordinationResponse> sendCoordinationRequest(JobID jobId, 
                                                                          OperatorID operatorId, 
                                                                          CoordinationRequest request) { }
    
    // Cluster management
    public void shutDownCluster() { }
    public void close() { }
}

RestClusterClientConfiguration { .api }

Configuration class for REST cluster client settings including timeouts, retries, and SSL parameters.

public class RestClusterClientConfiguration {
    // Factory method
    public static RestClusterClientConfiguration fromConfiguration(Configuration config) { }
    
    // Connection configuration
    public long getConnectionTimeout() { }
    public long getIdlenessTimeout() { }
    public AwaitingTime getAwaitLeaderTimeout() { }
    
    // Retry configuration
    public int getMaxRetryAttempts() { }
    public long getRetryDelay() { }
    
    // SSL configuration
    public SSLHandlerFactory getSslHandlerFactory() { }
    public String[] getTrustStore() { }
    public String getTrustStorePassword() { }
    public String[] getKeyStore() { }
    public String getKeyStorePassword() { }
    public String getSSLProtocol() { }
    public String[] getSSLAlgorithms() { }
}

Retry Strategy Components

WaitStrategy { .api }

Interface for defining wait strategies in retry mechanisms.

public interface WaitStrategy {
    // Calculate sleep time for a given attempt
    long sleepTime(long attempt);
}

ExponentialWaitStrategy { .api }

Exponential backoff implementation of wait strategy for retry operations.

public class ExponentialWaitStrategy implements WaitStrategy {
    // Constructor
    public ExponentialWaitStrategy(long initialWait, long maxWait) { }
    
    // WaitStrategy implementation
    public long sleepTime(long attempt) { }
}

Configuration Integration

The REST client integrates with Flink's configuration system through several key configuration options:

Connection Configuration

  • rest.address: REST endpoint hostname
  • rest.port: REST endpoint port
  • rest.connection-timeout: Connection timeout duration
  • rest.idleness-timeout: Idle connection timeout

SSL Configuration

  • security.ssl.rest.enabled: Enable SSL for REST connections
  • security.ssl.rest.keystore: Path to SSL keystore
  • security.ssl.rest.keystore-password: Keystore password
  • security.ssl.rest.truststore: Path to SSL truststore
  • security.ssl.rest.truststore-password: Truststore password

Retry Configuration

  • rest.retry.max-attempts: Maximum retry attempts
  • rest.retry.delay: Base retry delay

Usage Examples

Basic REST Client Usage

// Configure REST connection
Configuration config = new Configuration();
config.setString("rest.address", "flink-cluster.example.com");
config.setInteger("rest.port", 8081);
config.setDuration("rest.connection-timeout", Duration.ofSeconds(30));
config.setInteger("rest.retry.max-attempts", 3);

// Create REST client configuration
RestClusterClientConfiguration restConfig = 
    RestClusterClientConfiguration.fromConfiguration(config);

// Create cluster ID (implementation-specific)
StandaloneClusterId clusterId = new StandaloneClusterId();

// Create REST cluster client
RestClusterClient<StandaloneClusterId> client = 
    new RestClusterClient<>(config, restConfig, clusterId);

try {
    // Use the client for operations
    String webUrl = client.getWebInterfaceURL();
    System.out.println("Cluster web interface: " + webUrl);
    
    // List running jobs
    CompletableFuture<Collection<JobStatusMessage>> jobsFuture = client.listJobs();
    Collection<JobStatusMessage> jobs = jobsFuture.get();
    
    for (JobStatusMessage job : jobs) {
        System.out.println("Job: " + job.getJobName() + " - " + job.getJobState());
    }
} finally {
    client.close();
}

REST Client with Custom Retry Strategy

// Create custom exponential backoff strategy
WaitStrategy waitStrategy = new ExponentialWaitStrategy(
    1000, // initial wait: 1 second
    30000 // max wait: 30 seconds
);

// Configuration with retry settings
Configuration config = new Configuration();
config.setString("rest.address", "localhost");
config.setInteger("rest.port", 8081);

RestClusterClientConfiguration restConfig = 
    RestClusterClientConfiguration.fromConfiguration(config);

// Create client with custom wait strategy
RestClusterClient<StandaloneClusterId> client = 
    new RestClusterClient<>(config, restConfig, new StandaloneClusterId(), waitStrategy);

try {
    // Submit job with retry support
    JobGraph jobGraph = /* create job graph */;
    CompletableFuture<JobID> submitFuture = client.submitJob(jobGraph);
    JobID jobId = submitFuture.get();
    
    // Monitor job with automatic retries
    CompletableFuture<JobStatus> statusFuture = client.getJobStatus(jobId);
    JobStatus status = statusFuture.get();
    
} finally {
    client.close();
}

SSL-Enabled REST Client

// Configure SSL settings
Configuration config = new Configuration();
config.setString("rest.address", "secure-flink.example.com");
config.setInteger("rest.port", 8443);
config.setBoolean("security.ssl.rest.enabled", true);
config.setString("security.ssl.rest.keystore", "/path/to/keystore.jks");
config.setString("security.ssl.rest.keystore-password", "keystorePassword");
config.setString("security.ssl.rest.truststore", "/path/to/truststore.jks");
config.setString("security.ssl.rest.truststore-password", "truststorePassword");

// Create SSL-enabled REST client
RestClusterClientConfiguration restConfig = 
    RestClusterClientConfiguration.fromConfiguration(config);

RestClusterClient<StandaloneClusterId> client = 
    new RestClusterClient<>(config, restConfig, new StandaloneClusterId());

try {
    // All operations now use HTTPS
    CompletableFuture<Collection<JobStatusMessage>> jobsFuture = client.listJobs();
    // ... handle response
} finally {
    client.close();
}

Job Management via REST

// REST client setup
RestClusterClient<StandaloneClusterId> client = /* create client */;

try {
    // Submit job
    JobGraph jobGraph = /* your job graph */;
    CompletableFuture<JobID> submitFuture = client.submitJob(jobGraph);
    JobID jobId = submitFuture.get();
    
    // Wait for job to start
    CompletableFuture<JobStatus> statusFuture = client.getJobStatus(jobId);
    JobStatus status = statusFuture.get();
    
    while (status != JobStatus.RUNNING && status != JobStatus.FINISHED && status != JobStatus.FAILED) {
        Thread.sleep(1000);
        statusFuture = client.getJobStatus(jobId);
        status = statusFuture.get();
    }
    
    if (status == JobStatus.RUNNING) {
        // Trigger savepoint
        CompletableFuture<String> savepointFuture = 
            client.triggerSavepoint(jobId, "hdfs://namenode:9000/savepoints");
        String savepointPath = savepointFuture.get();
        System.out.println("Savepoint created: " + savepointPath);
        
        // Stop job with savepoint
        CompletableFuture<String> stopFuture = 
            client.stopWithSavepoint(jobId, false, "hdfs://namenode:9000/final-savepoint");
        String finalSavepointPath = stopFuture.get();
        System.out.println("Job stopped with savepoint: " + finalSavepointPath);
    }
    
    // Get final job result
    CompletableFuture<JobResult> resultFuture = client.requestJobResult(jobId);
    JobResult result = resultFuture.get();
    System.out.println("Job finished with state: " + result.getJobExecutionResult().getJobExecutionState());
    
} finally {
    client.close();
}

Coordination Request Example

// Send coordination request to operator
RestClusterClient<StandaloneClusterId> client = /* create client */;

try {
    JobID jobId = /* your job ID */;
    OperatorID operatorId = /* target operator ID */;
    
    // Create custom coordination request
    CoordinationRequest request = new CoordinationRequest() {
        // Implement coordination request logic
    };
    
    // Send request to operator
    CompletableFuture<CoordinationResponse> responseFuture = 
        client.sendCoordinationRequest(jobId, operatorId, request);
    
    CoordinationResponse response = responseFuture.get();
    // Handle coordination response
    
} finally {
    client.close();
}

Error Handling and Timeouts

// Configure timeout and retry settings
Configuration config = new Configuration();
config.setString("rest.address", "localhost");
config.setInteger("rest.port", 8081);
config.setDuration("rest.connection-timeout", Duration.ofSeconds(10));
config.setDuration("rest.idleness-timeout", Duration.ofMinutes(5));
config.setInteger("rest.retry.max-attempts", 5);
config.setDuration("rest.retry.delay", Duration.ofSeconds(2));

RestClusterClientConfiguration restConfig = 
    RestClusterClientConfiguration.fromConfiguration(config);

RestClusterClient<StandaloneClusterId> client = 
    new RestClusterClient<>(config, restConfig, new StandaloneClusterId());

try {
    // Operation with timeout handling
    CompletableFuture<Collection<JobStatusMessage>> jobsFuture = client.listJobs();
    
    Collection<JobStatusMessage> jobs = jobsFuture
        .orTimeout(30, TimeUnit.SECONDS)
        .exceptionally(throwable -> {
            System.err.println("Failed to list jobs: " + throwable.getMessage());
            return Collections.emptyList();
        })
        .get();
        
} catch (Exception e) {
    System.err.println("REST operation failed: " + e.getMessage());
} finally {
    client.close();
}

Required Imports

import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.client.program.rest.RestClusterClientConfiguration;
import org.apache.flink.client.program.rest.retry.WaitStrategy;
import org.apache.flink.client.program.rest.retry.ExponentialWaitStrategy;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.deployment.StandaloneClusterId;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.security.contexts.SecurityContext;
import org.apache.flink.runtime.rest.handler.legacy.messages.StatusOverview;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.time.Duration;

Configuration Options Reference

Essential REST Configuration

// Basic connection settings
config.setString("rest.address", "localhost");
config.setInteger("rest.port", 8081);
config.setDuration("rest.connection-timeout", Duration.ofSeconds(30));
config.setDuration("rest.idleness-timeout", Duration.ofMinutes(5));

// Retry configuration
config.setInteger("rest.retry.max-attempts", 3);
config.setDuration("rest.retry.delay", Duration.ofSeconds(1));

// SSL configuration (when needed)
config.setBoolean("security.ssl.rest.enabled", true);
config.setString("security.ssl.rest.keystore", "/path/to/keystore.jks");
config.setString("security.ssl.rest.keystore-password", "password");
config.setString("security.ssl.rest.truststore", "/path/to/truststore.jks");
config.setString("security.ssl.rest.truststore-password", "password");

The REST Client Communication module provides robust, production-ready HTTP/HTTPS communication with Flink clusters, supporting advanced features like automatic retries, SSL encryption, and comprehensive timeout handling for reliable distributed system interactions.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-clients-2-11

docs

cli-operations.md

cluster-management.md

index.md

program-execution.md

rest-client-communication.md

tile.json