Flink Table Planner connects Table/SQL API and runtime, responsible for translating and optimizing table programs into Flink pipelines.
Factory classes provide the entry points for creating planner components through the Service Provider Interface (SPI). These factories are automatically discovered and instantiated by Flink's factory system.
import org.apache.flink.table.planner.delegation.DefaultPlannerFactory;
import org.apache.flink.table.planner.delegation.DefaultParserFactory;
import org.apache.flink.table.planner.delegation.DefaultExecutorFactory;
import org.apache.flink.table.factories.PlannerFactory;
import org.apache.flink.table.factories.ParserFactory;
import org.apache.flink.table.factories.ExecutorFactory;Creates the default Planner implementation based on runtime mode (streaming or batch).
public final class DefaultPlannerFactory implements PlannerFactory {
public String factoryIdentifier();
public Planner create(Context context);
public Set<ConfigOption<?>> requiredOptions();
public Set<ConfigOption<?>> optionalOptions();
}The DefaultPlannerFactory is the primary factory for creating planner instances. It automatically determines whether to create a StreamPlanner or BatchPlanner based on the runtime mode specified in the configuration.
Key Methods:
factoryIdentifier(): Returns PlannerFactory.DEFAULT_IDENTIFIER to identify this factorycreate(Context context): Creates either StreamPlanner (streaming mode) or BatchPlanner (batch mode)requiredOptions(): Returns empty set - no required configuration optionsoptionalOptions(): Returns empty set - no optional configuration optionsUsage Example:
import org.apache.flink.table.planner.delegation.DefaultPlannerFactory;
import org.apache.flink.table.factories.PlannerFactory;
// Factory is typically discovered automatically via SPI
PlannerFactory factory = new DefaultPlannerFactory();
String identifier = factory.factoryIdentifier(); // Returns "default"
// Create planner with context
Planner planner = factory.create(context);Creates ParserImpl instances for SQL parsing using Apache Calcite.
public class DefaultParserFactory implements ParserFactory {
public String factoryIdentifier();
public Parser create(Context context);
public Set<ConfigOption<?>> requiredOptions();
public Set<ConfigOption<?>> optionalOptions();
}The DefaultParserFactory creates instances of ParserImpl which handle SQL statement parsing, identifier parsing, and SQL expression parsing.
Key Methods:
factoryIdentifier(): Returns the default SQL dialect name in lowercasecreate(Context context): Creates a new ParserImpl instancerequiredOptions(): Returns empty set - no required configuration optionsoptionalOptions(): Returns empty set - no optional configuration optionsUsage Example:
import org.apache.flink.table.planner.delegation.DefaultParserFactory;
import org.apache.flink.table.factories.ParserFactory;
// Create parser factory
ParserFactory parserFactory = new DefaultParserFactory();
String identifier = parserFactory.factoryIdentifier(); // Returns "default"
// Create parser with context
Parser parser = parserFactory.create(context);
// Parse SQL statements
List<Operation> operations = parser.parse("SELECT * FROM my_table");Creates DefaultExecutor instances for executing table programs.
public final class DefaultExecutorFactory implements ExecutorFactory {
public Executor create(Context context);
public String factoryIdentifier();
public Set<ConfigOption<?>> requiredOptions();
public Set<ConfigOption<?>> optionalOptions();
}The DefaultExecutorFactory creates instances of DefaultExecutor which handle the execution of table programs converted from the planner.
Key Methods:
factoryIdentifier(): Returns the factory identifier for this executorcreate(Context context): Creates a new DefaultExecutor instancerequiredOptions(): Returns empty set - no required configuration optionsoptionalOptions(): Returns empty set - no optional configuration optionsUsage Example:
import org.apache.flink.table.planner.delegation.DefaultExecutorFactory;
import org.apache.flink.table.factories.ExecutorFactory;
// Create executor factory
ExecutorFactory executorFactory = new DefaultExecutorFactory();
String identifier = executorFactory.factoryIdentifier();
// Create executor with context
Executor executor = executorFactory.create(context);
// Execute table programs
JobExecutionResult result = executor.execute(transformations);These factories are automatically registered through Java's Service Provider Interface mechanism. The registration is defined in:
META-INF/services/org.apache.flink.table.factories.FactoryThis file contains the fully qualified class names:
org.apache.flink.table.planner.delegation.DefaultPlannerFactoryorg.apache.flink.table.planner.delegation.DefaultParserFactoryorg.apache.flink.table.planner.delegation.DefaultExecutorFactoryAll factories receive a Context object that provides access to:
public interface Context {
Configuration getConfiguration();
ClassLoader getClassLoader();
TableEnvironment getTableEnvironment();
}The context allows factories to:
The factories are typically used indirectly when creating a TableEnvironment:
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
// The planner factory is used internally when creating the environment
TableEnvironment tableEnv = TableEnvironment.create(
EnvironmentSettings.newInstance()
.useBlinkPlanner() // Uses DefaultPlannerFactory
.build()
);Flink's factory system uses the following discovery process:
META-INF/services/org.apache.flink.table.factories.Factory filesThis design enables pluggable architectures where different planner implementations can be provided by different modules or third-party libraries.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-table-planner-2-11