0
# Flink Table Planner Loader
1
2
Flink Table Planner Loader provides a sophisticated classloader mechanism for loading Flink table planner components while isolating Scala version dependencies. This module enables the use of arbitrary Scala versions in the classpath by hiding the specific Scala version used by the planner implementation through delegation patterns.
3
4
## Package Information
5
6
- **Package Name**: flink-table-planner-loader
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Group ID**: org.apache.flink
10
- **Artifact ID**: flink-table-planner-loader
11
- **Installation**: Add dependency to your Maven `pom.xml`:
12
13
```xml
14
<dependency>
15
<groupId>org.apache.flink</groupId>
16
<artifactId>flink-table-planner-loader</artifactId>
17
<version>2.1.0</version>
18
</dependency>
19
```
20
21
## Core Imports
22
23
```java
24
import org.apache.flink.table.planner.loader.DelegatePlannerFactory;
25
import org.apache.flink.table.planner.loader.DelegateExecutorFactory;
26
import org.apache.flink.table.delegation.PlannerFactory;
27
import org.apache.flink.table.delegation.ExecutorFactory;
28
import org.apache.flink.table.factories.FactoryUtil;
29
```
30
31
## Basic Usage
32
33
The factories are automatically discovered via Java SPI (Service Provider Interface). The typical usage involves factory discovery through Flink's factory utility:
34
35
```java
36
import org.apache.flink.table.delegation.PlannerFactory;
37
import org.apache.flink.table.delegation.ExecutorFactory;
38
import org.apache.flink.table.factories.FactoryUtil;
39
40
// Discover planner factory
41
PlannerFactory plannerFactory = FactoryUtil.discoverFactory(
42
classLoader,
43
PlannerFactory.class,
44
PlannerFactory.DEFAULT_IDENTIFIER
45
);
46
47
// Discover executor factory
48
ExecutorFactory executorFactory = FactoryUtil.discoverFactory(
49
classLoader,
50
ExecutorFactory.class,
51
ExecutorFactory.DEFAULT_IDENTIFIER
52
);
53
54
// Create planner instance
55
Planner planner = plannerFactory.create(context);
56
57
// Create executor instance
58
Executor executor = executorFactory.create(configuration);
59
```
60
61
## Architecture
62
63
The module implements a delegation pattern with classloader isolation:
64
65
- **Delegation Pattern**: All factories extend `BaseDelegateFactory` and delegate operations to dynamically loaded implementations
66
- **Classloader Isolation**: Uses `PlannerModule` for isolated classloading to hide Scala version dependencies
67
- **SQL Dialect Support**: `DelegatePlannerFactory` dynamically loads JAR files for different SQL dialects
68
- **Service Registration**: Factories are registered via Java SPI in `META-INF/services/org.apache.flink.table.factories.Factory`
69
70
## Capabilities
71
72
### Planner Factory Delegation
73
74
Creates planner instances with SQL dialect support and classloader isolation.
75
76
```java { .api }
77
/**
78
* Delegate implementation of PlannerFactory that loads planner through isolated classloader
79
*/
80
@Internal
81
public class DelegatePlannerFactory extends BaseDelegateFactory<PlannerFactory>
82
implements PlannerFactory {
83
84
/**
85
* Default constructor that loads planner factory via PlannerModule
86
*/
87
public DelegatePlannerFactory();
88
89
/**
90
* Creates planner instance with SQL dialect support
91
* @param context - Planner context containing configuration and environment
92
* @return Planner instance configured for the specified SQL dialect
93
*/
94
@Override
95
public Planner create(Context context);
96
}
97
```
98
99
### Executor Factory Delegation
100
101
Creates executor instances for stream processing environments.
102
103
```java { .api }
104
/**
105
* Delegate implementation of ExecutorFactory that loads executor through isolated classloader
106
*/
107
@Internal
108
public class DelegateExecutorFactory extends BaseDelegateFactory<StreamExecutorFactory>
109
implements StreamExecutorFactory {
110
111
/**
112
* Default constructor that loads executor factory via PlannerModule
113
*/
114
public DelegateExecutorFactory();
115
116
/**
117
* Creates executor from configuration
118
* @param configuration - Flink configuration settings
119
* @return Executor instance configured with provided settings
120
*/
121
@Override
122
public Executor create(Configuration configuration);
123
124
/**
125
* Creates executor from stream execution environment
126
* @param streamExecutionEnvironment - Stream execution environment
127
* @return Executor instance for the stream environment
128
*/
129
public Executor create(StreamExecutionEnvironment streamExecutionEnvironment);
130
}
131
```
132
133
### Base Factory Delegation
134
135
Abstract base class providing common delegation functionality.
136
137
```java { .api }
138
/**
139
* Base class for all factory delegates
140
* @param <DELEGATE> The type of factory being delegated to
141
*/
142
abstract class BaseDelegateFactory<DELEGATE extends Factory> implements Factory {
143
144
/** The delegated factory instance */
145
final DELEGATE delegate;
146
147
/**
148
* Constructor accepting delegate factory
149
* @param delegate - The factory instance to delegate to
150
*/
151
protected BaseDelegateFactory(DELEGATE delegate);
152
153
/**
154
* Returns factory identifier from delegate
155
* @return String identifier for this factory type
156
*/
157
@Override
158
public String factoryIdentifier();
159
160
/**
161
* Returns required configuration options from delegate
162
* @return Set of required ConfigOption instances
163
*/
164
@Override
165
public Set<ConfigOption<?>> requiredOptions();
166
167
/**
168
* Returns optional configuration options from delegate
169
* @return Set of optional ConfigOption instances
170
*/
171
@Override
172
public Set<ConfigOption<?>> optionalOptions();
173
}
174
```
175
176
## Types
177
178
### Core Flink Types
179
180
These types are provided by the Flink framework and used by the planner loader:
181
182
```java { .api }
183
/**
184
* Factory interface for creating planner instances
185
*/
186
interface PlannerFactory extends Factory {
187
String DEFAULT_IDENTIFIER = "default";
188
189
/**
190
* Context for planner creation containing configuration and environment
191
*/
192
interface Context {
193
TableConfig getTableConfig();
194
// Additional context methods...
195
}
196
197
/**
198
* Creates a planner instance
199
* @param context - Creation context
200
* @return Planner instance
201
*/
202
Planner create(Context context);
203
}
204
205
/**
206
* Factory interface for creating executor instances
207
*/
208
interface ExecutorFactory extends Factory {
209
String DEFAULT_IDENTIFIER = "default";
210
211
/**
212
* Creates executor from configuration
213
* @param configuration - Flink configuration
214
* @return Executor instance
215
*/
216
Executor create(Configuration configuration);
217
}
218
219
/**
220
* Factory interface for creating stream executor instances
221
*/
222
interface StreamExecutorFactory extends ExecutorFactory {
223
/**
224
* Creates executor from stream execution environment
225
* @param streamExecutionEnvironment - Stream execution environment
226
* @return Executor instance
227
*/
228
Executor create(StreamExecutionEnvironment streamExecutionEnvironment);
229
}
230
231
/**
232
* Base factory interface providing common factory methods
233
*/
234
interface Factory {
235
/**
236
* Returns unique identifier for this factory
237
* @return String identifier
238
*/
239
String factoryIdentifier();
240
241
/**
242
* Returns required configuration options
243
* @return Set of required options
244
*/
245
Set<ConfigOption<?>> requiredOptions();
246
247
/**
248
* Returns optional configuration options
249
* @return Set of optional options
250
*/
251
Set<ConfigOption<?>> optionalOptions();
252
}
253
254
/**
255
* SQL dialect enumeration for different SQL flavors
256
*/
257
enum SqlDialect {
258
DEFAULT,
259
HIVE
260
}
261
262
/**
263
* Configuration option type for Flink settings
264
* @param <T> The type of the configuration value
265
*/
266
class ConfigOption<T> {
267
// ConfigOption implementation details...
268
}
269
270
/**
271
* Flink configuration container
272
*/
273
class Configuration {
274
// Configuration implementation details...
275
}
276
277
/**
278
* Stream execution environment for Flink streaming jobs
279
*/
280
class StreamExecutionEnvironment {
281
// StreamExecutionEnvironment implementation details...
282
}
283
284
/**
285
* Table configuration containing SQL dialect and other table settings
286
*/
287
class TableConfig {
288
/**
289
* Returns the current SQL dialect
290
* @return SqlDialect instance
291
*/
292
SqlDialect getSqlDialect();
293
// Additional table configuration methods...
294
}
295
296
/**
297
* Planner interface for table query planning
298
*/
299
interface Planner {
300
// Planner implementation details...
301
}
302
303
/**
304
* Executor interface for query execution
305
*/
306
interface Executor {
307
// Executor implementation details...
308
}
309
```
310
311
### Internal Annotations
312
313
```java { .api }
314
/**
315
* Annotation marking internal APIs not intended for public use
316
*/
317
@Target({ElementType.TYPE, ElementType.METHOD, ElementType.FIELD, ElementType.CONSTRUCTOR})
318
@Retention(RetentionPolicy.RUNTIME)
319
@interface Internal {
320
}
321
```
322
323
## Service Provider Interface
324
325
The module registers factories via Java SPI in `META-INF/services/org.apache.flink.table.factories.Factory`:
326
327
- `org.apache.flink.table.planner.loader.DelegateExecutorFactory`
328
- `org.apache.flink.table.planner.loader.DelegatePlannerFactory`
329
330
This enables automatic discovery by Flink's `FactoryUtil.discoverFactory()` method.
331
332
## Error Handling
333
334
The factories may throw the following types of exceptions:
335
336
- **ClassNotFoundException**: When the delegated factory implementation cannot be loaded
337
- **IllegalStateException**: When factory initialization fails or required resources are missing
338
- **ConfigurationException**: When invalid configuration options are provided
339
- **IOException**: When JAR files for SQL dialects cannot be accessed or loaded
340
341
## Usage Notes
342
343
- Both factory classes are annotated with `@Internal`, indicating they are not intended for direct public use
344
- Factories are automatically instantiated via Java SPI - manual instantiation is not recommended
345
- The module handles Scala version isolation automatically - no manual classloader management required
346
- SQL dialect support is handled transparently by the planner factory
347
- All delegation is lazy-loaded to minimize initialization overhead