Classloader mechanism for loading flink-table-planner through isolated classloader for Scala version compatibility
npx @tessl/cli install tessl/maven-org-apache-flink--flink-table-planner-loader@2.1.0Flink Table Planner Loader provides a sophisticated classloader mechanism for loading Flink table planner components while isolating Scala version dependencies. This module enables the use of arbitrary Scala versions in the classpath by hiding the specific Scala version used by the planner implementation through delegation patterns.
pom.xml:<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader</artifactId>
<version>2.1.0</version>
</dependency>import org.apache.flink.table.planner.loader.DelegatePlannerFactory;
import org.apache.flink.table.planner.loader.DelegateExecutorFactory;
import org.apache.flink.table.delegation.PlannerFactory;
import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.factories.FactoryUtil;The factories are automatically discovered via Java SPI (Service Provider Interface). The typical usage involves factory discovery through Flink's factory utility:
import org.apache.flink.table.delegation.PlannerFactory;
import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.factories.FactoryUtil;
// Discover planner factory
PlannerFactory plannerFactory = FactoryUtil.discoverFactory(
classLoader,
PlannerFactory.class,
PlannerFactory.DEFAULT_IDENTIFIER
);
// Discover executor factory
ExecutorFactory executorFactory = FactoryUtil.discoverFactory(
classLoader,
ExecutorFactory.class,
ExecutorFactory.DEFAULT_IDENTIFIER
);
// Create planner instance
Planner planner = plannerFactory.create(context);
// Create executor instance
Executor executor = executorFactory.create(configuration);The module implements a delegation pattern with classloader isolation:
BaseDelegateFactory and delegate operations to dynamically loaded implementationsPlannerModule for isolated classloading to hide Scala version dependenciesDelegatePlannerFactory dynamically loads JAR files for different SQL dialectsMETA-INF/services/org.apache.flink.table.factories.FactoryCreates planner instances with SQL dialect support and classloader isolation.
/**
* Delegate implementation of PlannerFactory that loads planner through isolated classloader
*/
@Internal
public class DelegatePlannerFactory extends BaseDelegateFactory<PlannerFactory>
implements PlannerFactory {
/**
* Default constructor that loads planner factory via PlannerModule
*/
public DelegatePlannerFactory();
/**
* Creates planner instance with SQL dialect support
* @param context - Planner context containing configuration and environment
* @return Planner instance configured for the specified SQL dialect
*/
@Override
public Planner create(Context context);
}Creates executor instances for stream processing environments.
/**
* Delegate implementation of ExecutorFactory that loads executor through isolated classloader
*/
@Internal
public class DelegateExecutorFactory extends BaseDelegateFactory<StreamExecutorFactory>
implements StreamExecutorFactory {
/**
* Default constructor that loads executor factory via PlannerModule
*/
public DelegateExecutorFactory();
/**
* Creates executor from configuration
* @param configuration - Flink configuration settings
* @return Executor instance configured with provided settings
*/
@Override
public Executor create(Configuration configuration);
/**
* Creates executor from stream execution environment
* @param streamExecutionEnvironment - Stream execution environment
* @return Executor instance for the stream environment
*/
public Executor create(StreamExecutionEnvironment streamExecutionEnvironment);
}Abstract base class providing common delegation functionality.
/**
* Base class for all factory delegates
* @param <DELEGATE> The type of factory being delegated to
*/
abstract class BaseDelegateFactory<DELEGATE extends Factory> implements Factory {
/** The delegated factory instance */
final DELEGATE delegate;
/**
* Constructor accepting delegate factory
* @param delegate - The factory instance to delegate to
*/
protected BaseDelegateFactory(DELEGATE delegate);
/**
* Returns factory identifier from delegate
* @return String identifier for this factory type
*/
@Override
public String factoryIdentifier();
/**
* Returns required configuration options from delegate
* @return Set of required ConfigOption instances
*/
@Override
public Set<ConfigOption<?>> requiredOptions();
/**
* Returns optional configuration options from delegate
* @return Set of optional ConfigOption instances
*/
@Override
public Set<ConfigOption<?>> optionalOptions();
}These types are provided by the Flink framework and used by the planner loader:
/**
* Factory interface for creating planner instances
*/
interface PlannerFactory extends Factory {
String DEFAULT_IDENTIFIER = "default";
/**
* Context for planner creation containing configuration and environment
*/
interface Context {
TableConfig getTableConfig();
// Additional context methods...
}
/**
* Creates a planner instance
* @param context - Creation context
* @return Planner instance
*/
Planner create(Context context);
}
/**
* Factory interface for creating executor instances
*/
interface ExecutorFactory extends Factory {
String DEFAULT_IDENTIFIER = "default";
/**
* Creates executor from configuration
* @param configuration - Flink configuration
* @return Executor instance
*/
Executor create(Configuration configuration);
}
/**
* Factory interface for creating stream executor instances
*/
interface StreamExecutorFactory extends ExecutorFactory {
/**
* Creates executor from stream execution environment
* @param streamExecutionEnvironment - Stream execution environment
* @return Executor instance
*/
Executor create(StreamExecutionEnvironment streamExecutionEnvironment);
}
/**
* Base factory interface providing common factory methods
*/
interface Factory {
/**
* Returns unique identifier for this factory
* @return String identifier
*/
String factoryIdentifier();
/**
* Returns required configuration options
* @return Set of required options
*/
Set<ConfigOption<?>> requiredOptions();
/**
* Returns optional configuration options
* @return Set of optional options
*/
Set<ConfigOption<?>> optionalOptions();
}
/**
* SQL dialect enumeration for different SQL flavors
*/
enum SqlDialect {
DEFAULT,
HIVE
}
/**
* Configuration option type for Flink settings
* @param <T> The type of the configuration value
*/
class ConfigOption<T> {
// ConfigOption implementation details...
}
/**
* Flink configuration container
*/
class Configuration {
// Configuration implementation details...
}
/**
* Stream execution environment for Flink streaming jobs
*/
class StreamExecutionEnvironment {
// StreamExecutionEnvironment implementation details...
}
/**
* Table configuration containing SQL dialect and other table settings
*/
class TableConfig {
/**
* Returns the current SQL dialect
* @return SqlDialect instance
*/
SqlDialect getSqlDialect();
// Additional table configuration methods...
}
/**
* Planner interface for table query planning
*/
interface Planner {
// Planner implementation details...
}
/**
* Executor interface for query execution
*/
interface Executor {
// Executor implementation details...
}/**
* Annotation marking internal APIs not intended for public use
*/
@Target({ElementType.TYPE, ElementType.METHOD, ElementType.FIELD, ElementType.CONSTRUCTOR})
@Retention(RetentionPolicy.RUNTIME)
@interface Internal {
}The module registers factories via Java SPI in META-INF/services/org.apache.flink.table.factories.Factory:
org.apache.flink.table.planner.loader.DelegateExecutorFactoryorg.apache.flink.table.planner.loader.DelegatePlannerFactoryThis enables automatic discovery by Flink's FactoryUtil.discoverFactory() method.
The factories may throw the following types of exceptions:
@Internal, indicating they are not intended for direct public use