0
# Flink Walkthrough Table Java
1
2
Apache Flink Table API walkthrough archetype that generates complete Maven projects for developing Flink batch processing applications using the Table API. This archetype provides a template structure with all necessary dependencies, configuration, and a working example for creating Flink applications that process data using table operations.
3
4
## Package Information
5
6
- **Package Name**: flink-walkthrough-table-java
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Installation**: Available through Maven Central as an archetype
10
- **Coordinates**: `org.apache.flink:flink-walkthrough-table-java:1.11.1`
11
12
## Core Usage
13
14
Generate a new project using Maven archetype:
15
16
```bash
17
mvn archetype:generate \
18
-DarchetypeGroupId=org.apache.flink \
19
-DarchetypeArtifactId=flink-walkthrough-table-java \
20
-DarchetypeVersion=1.11.1 \
21
-DgroupId=com.example \
22
-DartifactId=my-flink-table-app \
23
-Dversion=1.0-SNAPSHOT \
24
-Dpackage=com.example.flink
25
```
26
27
## Basic Usage
28
29
The archetype generates a complete Flink project with the following structure and example:
30
31
```java
32
package ${package};
33
34
import org.apache.flink.api.java.ExecutionEnvironment;
35
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
36
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
37
import org.apache.flink.walkthrough.common.table.SpendReportTableSink;
38
import org.apache.flink.walkthrough.common.table.BoundedTransactionTableSource;
39
import org.apache.flink.walkthrough.common.table.TruncateDateToHour;
40
41
/**
42
* Skeleton code for the table walkthrough generated by the archetype.
43
* This demonstrates basic Flink Table API usage with batch processing.
44
*/
45
public class SpendReport {
46
public static void main(String[] args) throws Exception {
47
// Create execution environment
48
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
49
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
50
51
// Register table source and sink (using internal API as shown in generated code)
52
((TableEnvironmentInternal) tEnv).registerTableSourceInternal(
53
"transactions", new BoundedTransactionTableSource());
54
((TableEnvironmentInternal) tEnv).registerTableSinkInternal(
55
"spend_report", new SpendReportTableSink());
56
57
// Register custom function
58
tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour());
59
60
// Execute table operation
61
tEnv
62
.scan("transactions")
63
.insertInto("spend_report");
64
65
tEnv.execute("Spend Report");
66
}
67
}
68
```
69
70
## Architecture
71
72
The archetype generates projects with these key components:
73
74
- **Maven Project Structure**: Standard Maven directory layout with proper Java packaging
75
- **Flink Dependencies**: Pre-configured POM with all necessary Flink Table API dependencies
76
- **Build Configuration**: Maven Shade plugin setup for creating deployable fat JARs
77
- **Example Application**: Working SpendReport class demonstrating table operations
78
- **Logging Setup**: Log4j2 configuration optimized for Flink applications
79
80
## Capabilities
81
82
### Project Generation
83
84
Creates a complete Maven project structure for Flink Table API applications.
85
86
```xml { .api }
87
<!-- Maven archetype coordinates -->
88
<groupId>org.apache.flink</groupId>
89
<artifactId>flink-walkthrough-table-java</artifactId>
90
<version>1.11.1</version>
91
<packaging>maven-archetype</packaging>
92
```
93
94
**Generated Project Structure:**
95
```
96
my-flink-table-app/
97
├── pom.xml # Maven configuration with Flink dependencies
98
└── src/main/
99
├── java/${package}/
100
│ └── SpendReport.java # Main application class
101
└── resources/
102
└── log4j2.properties # Logging configuration
103
```
104
105
### Maven Dependencies Configuration
106
107
The generated POM includes these key Flink dependencies:
108
109
```xml { .api }
110
<!-- Generated POM dependencies -->
111
<dependency>
112
<groupId>org.apache.flink</groupId>
113
<artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
114
<version>${flink.version}</version>
115
</dependency>
116
117
<!-- Provided scope dependencies (runtime environment) -->
118
<dependency>
119
<groupId>org.apache.flink</groupId>
120
<artifactId>flink-java</artifactId>
121
<version>${flink.version}</version>
122
<scope>provided</scope>
123
</dependency>
124
125
<dependency>
126
<groupId>org.apache.flink</groupId>
127
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
128
<version>${flink.version}</version>
129
<scope>provided</scope>
130
</dependency>
131
132
<dependency>
133
<groupId>org.apache.flink</groupId>
134
<artifactId>flink-clients_${scala.binary.version}</artifactId>
135
<version>${flink.version}</version>
136
<scope>provided</scope>
137
</dependency>
138
139
<dependency>
140
<groupId>org.apache.flink</groupId>
141
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
142
<version>${flink.version}</version>
143
<scope>provided</scope>
144
</dependency>
145
146
<dependency>
147
<groupId>org.apache.flink</groupId>
148
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
149
<version>${flink.version}</version>
150
<scope>provided</scope>
151
</dependency>
152
```
153
154
**Properties:**
155
- `flink.version`: @project.version@ (replaced during archetype processing)
156
- `java.version`: 1.8
157
- `scala.binary.version`: 2.11
158
- `log4j.version`: @log4j.version@ (replaced during archetype processing)
159
160
### Build Configuration
161
162
Includes Maven Shade plugin for creating deployable JAR files:
163
164
```xml { .api }
165
<!-- Maven Shade Plugin Configuration -->
166
<plugin>
167
<groupId>org.apache.maven.plugins</groupId>
168
<artifactId>maven-shade-plugin</artifactId>
169
<version>3.0.0</version>
170
<configuration>
171
<transformers>
172
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
173
<mainClass>${package}.SpendReport</mainClass>
174
</transformer>
175
</transformers>
176
</configuration>
177
</plugin>
178
```
179
180
### Example Application Template
181
182
Generates a working Flink Table API application class:
183
184
```java { .api }
185
// Generated SpendReport class structure
186
public class SpendReport {
187
/**
188
* Main entry point for the Flink Table API application
189
* @param args Command line arguments
190
* @throws Exception If execution fails
191
*/
192
public static void main(String[] args) throws Exception;
193
}
194
```
195
196
**Key APIs demonstrated in the generated code:**
197
198
```java { .api }
199
// Core execution environment setup
200
static ExecutionEnvironment getExecutionEnvironment(): ExecutionEnvironment
201
static BatchTableEnvironment create(ExecutionEnvironment env): BatchTableEnvironment
202
203
// Table registration (internal APIs used in generated template)
204
void registerTableSourceInternal(String name, TableSource<?> tableSource): void
205
void registerTableSinkInternal(String name, TableSink<?> tableSink): void
206
207
// Function registration
208
void registerFunction(String name, UserDefinedFunction function): void
209
210
// Table operations
211
Table scan(String tableName): Table
212
void insertInto(String tableName): void
213
JobExecutionResult execute(String jobName): JobExecutionResult
214
```
215
216
### Logging Configuration
217
218
Provides Log4j2 configuration optimized for Flink applications:
219
220
```properties { .api }
221
# Generated log4j2.properties
222
rootLogger.level = WARN
223
rootLogger.appenderRef.console.ref = ConsoleAppender
224
225
# Specific logger for walkthrough output
226
logger.sink.name = org.apache.flink.walkthrough.common.sink.LoggerOutputFormat
227
logger.sink.level = INFO
228
229
# Console appender configuration
230
appender.console.name = ConsoleAppender
231
appender.console.type = CONSOLE
232
appender.console.layout.type = PatternLayout
233
appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
234
```
235
236
## Archetype Descriptor
237
238
The archetype is defined by a descriptor that specifies file generation:
239
240
```xml { .api }
241
<!-- archetype-metadata.xml -->
242
<archetype-descriptor name="flink-walkthrough-table-java">
243
<fileSets>
244
<!-- Java source files with package substitution -->
245
<fileSet filtered="true" packaged="true" encoding="UTF-8">
246
<directory>src/main/java</directory>
247
<includes>
248
<include>**/*.java</include>
249
</includes>
250
</fileSet>
251
<!-- Resource files -->
252
<fileSet encoding="UTF-8">
253
<directory>src/main/resources</directory>
254
</fileSet>
255
</fileSets>
256
</archetype-descriptor>
257
```
258
259
## Maven Generation Parameters
260
261
When generating projects, the archetype accepts standard Maven parameters:
262
263
```bash { .api }
264
# Required parameters
265
-DgroupId=<project-group-id> # Maven group ID for generated project
266
-DartifactId=<project-artifact-id> # Maven artifact ID for generated project
267
-Dversion=<project-version> # Version for generated project
268
-Dpackage=<java-package> # Java package name for generated classes
269
270
# Archetype coordinates
271
-DarchetypeGroupId=org.apache.flink
272
-DarchetypeArtifactId=flink-walkthrough-table-java
273
-DarchetypeVersion=1.11.1
274
```
275
276
**Template Variable Substitution:**
277
- `${groupId}` - Replaced with provided groupId in POM
278
- `${artifactId}` - Replaced with provided artifactId in POM
279
- `${version}` - Replaced with provided version in POM
280
- `${package}` - Replaced with provided package name in Java files
281
- `@project.version@` - Replaced with Flink version during archetype build
282
- `@log4j.version@` - Replaced with Log4j version during archetype build
283
284
## Integration Points
285
286
### Flink Common Walkthrough
287
288
Generated projects depend on the Flink walkthrough common library:
289
290
```xml { .api }
291
<dependency>
292
<groupId>org.apache.flink</groupId>
293
<artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
294
<version>${flink.version}</version>
295
</dependency>
296
```
297
298
**Provides access to key classes:**
299
300
```java { .api }
301
// Table source for bounded transaction data
302
class BoundedTransactionTableSource extends InputFormatTableSource<Row> {
303
InputFormat<Row, ?> getInputFormat(): TransactionRowInputFormat
304
TableSchema getTableSchema(): TableSchema // (accountId: BIGINT, timestamp: TIMESTAMP, amount: DOUBLE)
305
DataType getProducedDataType(): DataType
306
}
307
308
// Table sink for spend report output
309
class SpendReportTableSink implements AppendStreamTableSink<Row>, BatchTableSink<Row> {
310
DataSink<?> consumeDataSet(DataSet<Row> dataSet): DataSink<?>
311
DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream): DataStreamSink<?>
312
TableSchema getTableSchema(): TableSchema // (accountId: BIGINT, timestamp: TIMESTAMP, amount: DOUBLE)
313
}
314
315
// Custom scalar function for date truncation
316
class TruncateDateToHour extends ScalarFunction {
317
long eval(long timestamp): long // Truncates timestamp to nearest hour
318
TypeInformation<?> getResultType(Class<?>[] signature): TypeInformation<Timestamp>
319
}
320
```
321
322
### Flink Runtime Environment
323
324
Generated projects are configured to run in Flink environments:
325
326
- **Local Development**: Can run directly in IDE using provided dependencies
327
- **Cluster Deployment**: Shade plugin creates fat JARs suitable for cluster submission
328
- **Dependency Scopes**: Core Flink libraries marked as 'provided' to avoid conflicts
329
330
### Build and Deployment
331
332
The generated project supports:
333
334
```bash { .api }
335
# Build the project
336
mvn clean compile
337
338
# Create deployable JAR
339
mvn clean package
340
341
# Run locally
342
mvn exec:java -Dexec.mainClass="${package}.SpendReport"
343
```
344
345
The shaded JAR can be submitted to Flink clusters using:
346
347
```bash
348
flink run target/${artifactId}-${version}.jar
349
```