or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

application-deployment.mdcli-interface.mdclient-core.mdcluster-management.mdindex.mdprogram-execution.mdrest-client.md
tile.json

rest-client.mddocs/

REST Client

REST client implementations for communicating with Flink clusters through HTTP APIs, including retry strategies, configuration management, and comprehensive cluster operation support.

Capabilities

REST Cluster Client

REST-based implementation of the ClusterClient interface for HTTP communication with Flink clusters.

/**
 * REST-based cluster client implementation
 * @param <T> Type of cluster identifier
 */
public class RestClusterClient<T> implements ClusterClient<T> {
    /**
     * Creates REST cluster client with basic configuration
     * @param config Flink configuration
     * @param clusterId Cluster identifier
     */
    public RestClusterClient(Configuration config, T clusterId);

    /**
     * Creates REST cluster client with high availability services factory
     * @param config Flink configuration
     * @param clusterId Cluster identifier
     * @param factory High availability services factory
     */
    public RestClusterClient(
        Configuration config,
        T clusterId,
        ClientHighAvailabilityServicesFactory factory);

    // Implements all ClusterClient methods with REST-specific implementations
    @Override
    public T getClusterId();

    @Override
    public Configuration getFlinkConfiguration();

    @Override
    public void shutDownCluster();

    @Override
    public String getWebInterfaceURL();

    @Override
    public CompletableFuture<Collection<JobStatusMessage>> listJobs() throws Exception;

    @Override
    public CompletableFuture<JobID> submitJob(JobGraph jobGraph);

    @Override
    public CompletableFuture<JobStatus> getJobStatus(JobID jobId);

    @Override
    public CompletableFuture<JobResult> requestJobResult(JobID jobId);

    @Override
    public CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID);

    @Override
    public CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID, ClassLoader loader);

    @Override
    public CompletableFuture<Acknowledge> cancel(JobID jobId);

    @Override
    public CompletableFuture<String> cancelWithSavepoint(JobID jobId, String savepointDirectory);

    @Override
    public CompletableFuture<String> stopWithSavepoint(
        JobID jobId,
        boolean advanceToEndOfEventTime,
        String savepointDirectory);

    @Override
    public CompletableFuture<String> triggerSavepoint(JobID jobId, String savepointDirectory);

    @Override
    public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) throws FlinkException;

    @Override
    public CompletableFuture<CoordinationResponse> sendCoordinationRequest(
        JobID jobId,
        OperatorID operatorId,
        CoordinationRequest request);

    @Override
    public void close();
}

Usage Example:

import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.client.program.rest.RestClusterClientConfiguration;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;

// Configure REST client
Configuration config = new Configuration();
config.setString(JobManagerOptions.ADDRESS, "localhost");
config.setInteger(JobManagerOptions.PORT, 6123);
config.setInteger(RestOptions.PORT, 8081);

// Create REST client configuration
RestClusterClientConfiguration clientConfig = new RestClusterClientConfiguration.RestClusterClientConfigurationBuilder()
    .setRetryMaxAttempts(10)
    .setRetryDelay(1000)
    .setAwaitLeaderTimeout(30000)
    .build();

// Create REST cluster client
try (RestClusterClient<StandaloneClusterId> client = new RestClusterClient<>(
        config, clientConfig, new StandaloneClusterId(), new ExponentialWaitStrategy(1000, 10000))) {
    
    // Submit job via REST
    JobGraph jobGraph = createJobGraph();
    JobID jobId = client.submitJob(jobGraph).get();
    System.out.println("Job submitted via REST: " + jobId);
    
    // Monitor job status
    JobStatus status = client.getJobStatus(jobId).get();
    System.out.println("Job status: " + status);
    
    // Create savepoint via REST
    String savepointPath = client.triggerSavepoint(jobId, "/path/to/savepoints").get();
    System.out.println("Savepoint created: " + savepointPath);
}

REST Client Configuration

Configuration class for REST cluster clients with connection and retry settings.

/**
 * Configuration for REST cluster clients
 */
public final class RestClusterClientConfiguration {
    /**
     * Gets the REST client configuration
     * @return RestClientConfiguration instance
     */
    public RestClientConfiguration getRestClientConfiguration();

    /**
     * Gets the await leader timeout in milliseconds
     * @return Timeout for awaiting cluster leader
     */
    public long getAwaitLeaderTimeout();

    /**
     * Gets the maximum number of retry attempts
     * @return Maximum retry attempts
     */
    public int getRetryMaxAttempts();

    /**
     * Gets the retry delay in milliseconds
     * @return Delay between retry attempts
     */
    public long getRetryDelay();

    /**
     * Creates RestClusterClientConfiguration from Flink configuration
     * @param config Flink configuration containing REST options
     * @return RestClusterClientConfiguration instance
     * @throws ConfigurationException If configuration is invalid
     */
    public static RestClusterClientConfiguration fromConfiguration(Configuration config)
        throws ConfigurationException;
}

Usage Example:

import org.apache.flink.client.program.rest.RestClusterClientConfiguration;

// Create Flink configuration with REST options
Configuration flinkConfig = new Configuration();
flinkConfig.setLong(RestOptions.AWAIT_LEADER_TIMEOUT, 60000L);  // 60 seconds
flinkConfig.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 5);       // 5 retry attempts
flinkConfig.setLong(RestOptions.RETRY_DELAY, 2000L);            // 2 second delay

// Create REST client configuration from Flink configuration
RestClusterClientConfiguration config = RestClusterClientConfiguration.fromConfiguration(flinkConfig);

System.out.println("Leader timeout: " + config.getAwaitLeaderTimeout());
System.out.println("Max retries: " + config.getRetryMaxAttempts());
System.out.println("Retry delay: " + config.getRetryDelay());

Retry Strategies

Strategy implementations for handling retry logic in REST client operations.

/**
 * Strategy for waiting between retry attempts
 */
public interface WaitStrategy {
    /**
     * Calculates sleep time for the given attempt count
     * @param attemptCount Number of attempts made (starting from 0)
     * @return Sleep time in milliseconds
     */
    long sleepTime(long attemptCount);
}

/**
 * Configuration options for REST client settings
 */
public class RestOptions {
    public static final ConfigOption<Long> AWAIT_LEADER_TIMEOUT;
    public static final ConfigOption<Integer> RETRY_MAX_ATTEMPTS;
    public static final ConfigOption<Long> RETRY_DELAY;
}

/**
 * Exponential backoff wait strategy
 */
public class ExponentialWaitStrategy implements WaitStrategy {
    /**
     * Creates exponential wait strategy
     * @param initialDelay Initial delay in milliseconds
     * @param maxDelay Maximum delay in milliseconds
     */
    public ExponentialWaitStrategy(long initialDelay, long maxDelay);

    /**
     * Calculates exponential backoff sleep time
     * @param attemptCount Number of attempts made
     * @return Sleep time with exponential backoff
     */
    @Override
    public long sleepTime(long attemptCount);
}

Usage Example:

import org.apache.flink.client.program.rest.retry.ExponentialWaitStrategy;
import org.apache.flink.client.program.rest.retry.WaitStrategy;

// Create exponential backoff strategy
WaitStrategy waitStrategy = new ExponentialWaitStrategy(1000, 30000); // 1s initial, 30s max

// Calculate wait times for different attempts
for (int attempt = 0; attempt < 5; attempt++) {
    long waitTime = waitStrategy.sleepTime(attempt);
    System.out.println("Attempt " + attempt + ": wait " + waitTime + "ms");
}
// Output:
// Attempt 0: wait 1000ms
// Attempt 1: wait 2000ms  
// Attempt 2: wait 4000ms
// Attempt 3: wait 8000ms
// Attempt 4: wait 16000ms

REST Client Integration

The REST client integrates with the broader Flink client ecosystem through standard interfaces and provides HTTP-based communication with Flink clusters.

Key Integration Points:

  1. ClusterClient Interface: RestClusterClient implements the standard ClusterClient interface, making it interchangeable with other client implementations
  2. Configuration Integration: Uses standard Flink Configuration class for setup
  3. Retry Strategy: Pluggable retry strategies for handling network failures and temporary unavailability
  4. Async Operations: All operations return CompletableFuture for non-blocking execution
  5. Resource Management: Implements AutoCloseable for proper resource cleanup

REST Endpoints Used:

// Common REST endpoints accessed by RestClusterClient (for reference):
// GET    /jobs                           - List jobs
// POST   /jobs                           - Submit job
// GET    /jobs/{jobid}                   - Get job details
// PATCH  /jobs/{jobid}                   - Cancel job
// POST   /jobs/{jobid}/savepoints        - Trigger savepoint
// DELETE /jobs/{jobid}/savepoints/{savepointId} - Dispose savepoint
// POST   /jobs/{jobid}/stop              - Stop job with savepoint
// GET    /jobs/{jobid}/accumulators      - Get job accumulators
// POST   /jobs/{jobid}/coordination/{operatorId} - Send coordination request

Configuration Options:

// Important configuration keys for REST client (from Flink configuration):
// rest.address                    - REST server address
// rest.port                       - REST server port
// rest.connection-timeout         - Connection timeout
// rest.idleness-timeout          - Idleness timeout
// rest.await-leader-timeout      - Leader election timeout
// rest.retry.max-attempts        - Maximum retry attempts
// rest.retry.delay               - Delay between retries

Usage in Different Deployment Modes:

import org.apache.flink.client.program.rest.RestClusterClient;

// For standalone cluster
Configuration standaloneConfig = new Configuration();
standaloneConfig.setString("rest.address", "flink-master");
standaloneConfig.setInteger("rest.port", 8081);

RestClusterClient<StandaloneClusterId> standaloneClient = 
    new RestClusterClient<>(standaloneConfig, new StandaloneClusterId());

// For session cluster (e.g., YARN session)
Configuration sessionConfig = new Configuration();
sessionConfig.setString("yarn.application.id", "application_123456_0001");
sessionConfig.setString("rest.address", "yarn-session-master");

RestClusterClient<YarnClusterId> yarnClient = 
    new RestClusterClient<>(sessionConfig, yarnClusterId);

Types

/**
 * REST client configuration for low-level HTTP settings
 */
public class RestClientConfiguration {
    public long getConnectionTimeout();
    public long getIdlenessTimeout();
    public int getMaxContentLength();
    public SSLHandlerFactory getSslHandlerFactory();
}

/**
 * Retry strategy interface
 */
public interface RetryStrategy {
    boolean canRetry(int attemptCount);
    long getRetryDelay(int attemptCount);
}

/**
 * HTTP-specific exceptions (typically from underlying REST framework)
 */
public class RestClientException extends Exception {
    public RestClientException(String message);
    public RestClientException(String message, Throwable cause);
}

/**
 * Connection timeout exception
 */
public class ConnectionTimeoutException extends RestClientException {
    public ConnectionTimeoutException(String message);
}

/**
 * Leader retrieval exception for REST clients
 */
public class LeaderRetrievalException extends RestClientException {
    public LeaderRetrievalException(String message);
    public LeaderRetrievalException(String message, Throwable cause);
}

Advanced Usage Patterns

Custom Retry Strategy

import org.apache.flink.client.program.rest.retry.WaitStrategy;

// Implement custom retry strategy
public class LinearWaitStrategy implements WaitStrategy {
    private final long baseDelay;
    private final long increment;
    
    public LinearWaitStrategy(long baseDelay, long increment) {
        this.baseDelay = baseDelay;
        this.increment = increment;
    }
    
    @Override
    public long sleepTime(long attemptCount) {
        return baseDelay + (attemptCount * increment);
    }
}

// Use custom strategy
WaitStrategy customStrategy = new LinearWaitStrategy(1000, 500);
RestClusterClient<T> client = new RestClusterClient<>(
    config, clientConfig, clusterId, customStrategy);

Monitoring and Error Handling

import org.apache.flink.client.program.rest.RestClusterClient;
import java.util.concurrent.CompletionException;

try (RestClusterClient<StandaloneClusterId> client = createRestClient()) {
    // Submit job with error handling
    CompletableFuture<JobID> submitFuture = client.submitJob(jobGraph);
    
    JobID jobId = submitFuture.handle((result, throwable) -> {
        if (throwable != null) {
            if (throwable instanceof CompletionException) {
                Throwable cause = throwable.getCause();
                if (cause instanceof RestClientException) {
                    System.err.println("REST communication failed: " + cause.getMessage());
                } else {
                    System.err.println("Job submission failed: " + cause.getMessage());
                }
            }
            return null;
        }
        return result;
    }).get();
    
    if (jobId != null) {
        System.out.println("Job submitted successfully: " + jobId);
        
        // Monitor job with periodic status checks
        CompletableFuture<JobResult> resultFuture = client.requestJobResult(jobId);
        JobResult result = resultFuture.get();
        System.out.println("Job completed with status: " + result.getJobExecutionResult());
    }
}