Core client utilities and translators for Flink pipelines, providing essential functionality for program execution, pipeline translation, and classloader management.
Utility functions for Flink client operations including classloader creation, program execution, and job result handling.
/**
* Utility functions for Flink client operations
*/
public enum ClientUtils {
/**
* Creates user code classloader with specified JARs and classpaths
* @param jars List of JAR URLs to include
* @param classpaths List of classpath URLs to include
* @param parent Parent classloader
* @param configuration Flink configuration
* @return URLClassLoader for user code execution
*/
public static URLClassLoader buildUserCodeClassLoader(
List<URL> jars,
List<URL> classpaths,
ClassLoader parent,
Configuration configuration);
/**
* Executes a packaged program using the provided executor service
* @param executorServiceLoader Service loader for pipeline executors
* @param configuration Flink configuration
* @param program Packaged program to execute
* @param enforceSingleJobExecution Whether to enforce single job execution
* @param suppressSysout Whether to suppress system output during execution
*/
public static void executeProgram(
PipelineExecutorServiceLoader executorServiceLoader,
Configuration configuration,
PackagedProgram program,
boolean enforceSingleJobExecution,
boolean suppressSysout);
/**
* Waits until job initialization is finished, blocks until the job status is not INITIALIZING
* @param jobStatusSupplier Supplier returning the job status
* @param jobResultSupplier Supplier returning the job result (called only if job reaches FAILED state)
* @param userCodeClassloader User code classloader for exception deserialization
* @throws JobInitializationException If the initialization failed
*/
public static void waitUntilJobInitializationFinished(
SupplierWithException<JobStatus, Exception> jobStatusSupplier,
SupplierWithException<JobResult, Exception> jobResultSupplier,
ClassLoader userCodeClassloader) throws JobInitializationException;
}Usage Example:
import org.apache.flink.client.ClientUtils;
import org.apache.flink.configuration.Configuration;
// Create user code classloader
List<URL> jars = Arrays.asList(new File("user-job.jar").toURI().toURL());
List<URL> classpaths = Arrays.asList(new File("lib/").toURI().toURL());
URLClassLoader userClassLoader = ClientUtils.buildUserCodeClassLoader(
jars, classpaths, Thread.currentThread().getContextClassLoader(), new Configuration());
// Execute a program
Configuration config = new Configuration();
PackagedProgram program = PackagedProgram.newBuilder()
.setJarFile(new File("user-job.jar"))
.build();
PipelineExecutorServiceLoader executorLoader =
DefaultExecutorServiceLoader.INSTANCE;
ClientUtils.executeProgram(executorLoader, config, program, false, false);Interface and utilities for translating Flink pipelines into executable job graphs.
/**
* Interface for translating Flink pipelines into executable job graphs
*/
public interface FlinkPipelineTranslator {
/**
* Translates a pipeline into a JobGraph
* @param pipeline The pipeline to translate (DataStream or DataSet)
* @param optimizerConfiguration Configuration for optimization
* @param defaultParallelism Default parallelism for operations
* @return JobGraph ready for execution
*/
JobGraph translateToJobGraph(
Pipeline pipeline,
Configuration optimizerConfiguration,
int defaultParallelism);
}
/**
* Utility methods for pipeline translation
*/
public class FlinkPipelineTranslationUtil {
/**
* Gets JobGraph from pipeline with configuration
* @param pipeline Pipeline to translate
* @param configuration Flink configuration
* @param defaultParallelism Default parallelism
* @return Translated JobGraph
*/
public static JobGraph getJobGraph(
Pipeline pipeline,
Configuration configuration,
int defaultParallelism);
}Usage Example:
import org.apache.flink.client.FlinkPipelineTranslationUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
// Create a DataStream pipeline
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream = env.socketTextStream("localhost", 9999);
stream.print();
// Translate to JobGraph
Pipeline pipeline = env.getStreamGraph();
Configuration config = new Configuration();
JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(pipeline, config, 1);Translator for DataSet API plans to job graphs for batch processing workloads.
/**
* Translates DataSet API plans to job graphs
*/
public class PlanTranslator implements FlinkPipelineTranslator {
/**
* Translates DataSet plan to JobGraph
* @param pipeline DataSet pipeline to translate
* @param optimizerConfiguration Optimizer configuration
* @param defaultParallelism Default parallelism for operations
* @return JobGraph for DataSet execution
*/
@Override
public JobGraph translateToJobGraph(
Pipeline pipeline,
Configuration optimizerConfiguration,
int defaultParallelism);
}Translator for stream graphs to job graphs for streaming workloads.
/**
* Translates stream graphs to job graphs
*/
public class StreamGraphTranslator implements FlinkPipelineTranslator {
/**
* Translates StreamGraph to JobGraph
* @param pipeline StreamGraph to translate
* @param optimizerConfiguration Optimizer configuration
* @param defaultParallelism Default parallelism for operations
* @return JobGraph for streaming execution
*/
@Override
public JobGraph translateToJobGraph(
Pipeline pipeline,
Configuration optimizerConfiguration,
int defaultParallelism);
}public interface Pipeline {
// Base interface for DataStream and DataSet pipelines
}
public interface SupplierWithException<T, E extends Exception> {
T get() throws E;
}
public interface WaitStrategy {
long sleepTime(long attemptCount);
}
public interface PipelineExecutorServiceLoader {
Stream<PipelineExecutorFactory> getExecutorFactories();
}
public class DefaultExecutorServiceLoader implements PipelineExecutorServiceLoader {
public static final DefaultExecutorServiceLoader INSTANCE;
@Override
public Stream<PipelineExecutorFactory> getExecutorFactories();
}
public interface JobClient extends AutoCloseable {
JobID getJobId();
CompletableFuture<JobStatus> getJobStatus();
CompletableFuture<JobResult> getJobExecutionResult();
CompletableFuture<Void> cancel();
}
public class JobInitializationException extends Exception {
public JobInitializationException(String message);
public JobInitializationException(String message, Throwable cause);
}