Apache Flink Table API walkthrough archetype for creating Java batch processing applications
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
Apache Flink Table API walkthrough archetype that generates complete Maven projects for developing Flink batch processing applications using the Table API. This archetype provides a template structure with all necessary dependencies, configuration, and a working example for creating Flink applications that process data using table operations.
org.apache.flink:flink-walkthrough-table-java:1.11.1Generate a new project using Maven archetype:
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-walkthrough-table-java \
-DarchetypeVersion=1.11.1 \
-DgroupId=com.example \
-DartifactId=my-flink-table-app \
-Dversion=1.0-SNAPSHOT \
-Dpackage=com.example.flinkThe archetype generates a complete Flink project with the following structure and example:
package ${package};
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.walkthrough.common.table.SpendReportTableSink;
import org.apache.flink.walkthrough.common.table.BoundedTransactionTableSource;
import org.apache.flink.walkthrough.common.table.TruncateDateToHour;
/**
* Skeleton code for the table walkthrough generated by the archetype.
* This demonstrates basic Flink Table API usage with batch processing.
*/
public class SpendReport {
public static void main(String[] args) throws Exception {
// Create execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
// Register table source and sink (using internal API as shown in generated code)
((TableEnvironmentInternal) tEnv).registerTableSourceInternal(
"transactions", new BoundedTransactionTableSource());
((TableEnvironmentInternal) tEnv).registerTableSinkInternal(
"spend_report", new SpendReportTableSink());
// Register custom function
tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour());
// Execute table operation
tEnv
.scan("transactions")
.insertInto("spend_report");
tEnv.execute("Spend Report");
}
}The archetype generates projects with these key components:
Creates a complete Maven project structure for Flink Table API applications.
<!-- Maven archetype coordinates -->
<groupId>org.apache.flink</groupId>
<artifactId>flink-walkthrough-table-java</artifactId>
<version>1.11.1</version>
<packaging>maven-archetype</packaging>Generated Project Structure:
my-flink-table-app/
├── pom.xml # Maven configuration with Flink dependencies
└── src/main/
├── java/${package}/
│ └── SpendReport.java # Main application class
└── resources/
└── log4j2.properties # Logging configurationThe generated POM includes these key Flink dependencies:
<!-- Generated POM dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Provided scope dependencies (runtime environment) -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>Properties:
flink.version: @project.version@ (replaced during archetype processing)java.version: 1.8scala.binary.version: 2.11log4j.version: @log4j.version@ (replaced during archetype processing)Includes Maven Shade plugin for creating deployable JAR files:
<!-- Maven Shade Plugin Configuration -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>${package}.SpendReport</mainClass>
</transformer>
</transformers>
</configuration>
</plugin>Generates a working Flink Table API application class:
// Generated SpendReport class structure
public class SpendReport {
/**
* Main entry point for the Flink Table API application
* @param args Command line arguments
* @throws Exception If execution fails
*/
public static void main(String[] args) throws Exception;
}Key APIs demonstrated in the generated code:
// Core execution environment setup
static ExecutionEnvironment getExecutionEnvironment(): ExecutionEnvironment
static BatchTableEnvironment create(ExecutionEnvironment env): BatchTableEnvironment
// Table registration (internal APIs used in generated template)
void registerTableSourceInternal(String name, TableSource<?> tableSource): void
void registerTableSinkInternal(String name, TableSink<?> tableSink): void
// Function registration
void registerFunction(String name, UserDefinedFunction function): void
// Table operations
Table scan(String tableName): Table
void insertInto(String tableName): void
JobExecutionResult execute(String jobName): JobExecutionResultProvides Log4j2 configuration optimized for Flink applications:
# Generated log4j2.properties
rootLogger.level = WARN
rootLogger.appenderRef.console.ref = ConsoleAppender
# Specific logger for walkthrough output
logger.sink.name = org.apache.flink.walkthrough.common.sink.LoggerOutputFormat
logger.sink.level = INFO
# Console appender configuration
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%nThe archetype is defined by a descriptor that specifies file generation:
<!-- archetype-metadata.xml -->
<archetype-descriptor name="flink-walkthrough-table-java">
<fileSets>
<!-- Java source files with package substitution -->
<fileSet filtered="true" packaged="true" encoding="UTF-8">
<directory>src/main/java</directory>
<includes>
<include>**/*.java</include>
</includes>
</fileSet>
<!-- Resource files -->
<fileSet encoding="UTF-8">
<directory>src/main/resources</directory>
</fileSet>
</fileSets>
</archetype-descriptor>When generating projects, the archetype accepts standard Maven parameters:
# Required parameters
-DgroupId=<project-group-id> # Maven group ID for generated project
-DartifactId=<project-artifact-id> # Maven artifact ID for generated project
-Dversion=<project-version> # Version for generated project
-Dpackage=<java-package> # Java package name for generated classes
# Archetype coordinates
-DarchetypeGroupId=org.apache.flink
-DarchetypeArtifactId=flink-walkthrough-table-java
-DarchetypeVersion=1.11.1Template Variable Substitution:
${groupId} - Replaced with provided groupId in POM${artifactId} - Replaced with provided artifactId in POM${version} - Replaced with provided version in POM${package} - Replaced with provided package name in Java files@project.version@ - Replaced with Flink version during archetype build@log4j.version@ - Replaced with Log4j version during archetype buildGenerated projects depend on the Flink walkthrough common library:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>Provides access to key classes:
// Table source for bounded transaction data
class BoundedTransactionTableSource extends InputFormatTableSource<Row> {
InputFormat<Row, ?> getInputFormat(): TransactionRowInputFormat
TableSchema getTableSchema(): TableSchema // (accountId: BIGINT, timestamp: TIMESTAMP, amount: DOUBLE)
DataType getProducedDataType(): DataType
}
// Table sink for spend report output
class SpendReportTableSink implements AppendStreamTableSink<Row>, BatchTableSink<Row> {
DataSink<?> consumeDataSet(DataSet<Row> dataSet): DataSink<?>
DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream): DataStreamSink<?>
TableSchema getTableSchema(): TableSchema // (accountId: BIGINT, timestamp: TIMESTAMP, amount: DOUBLE)
}
// Custom scalar function for date truncation
class TruncateDateToHour extends ScalarFunction {
long eval(long timestamp): long // Truncates timestamp to nearest hour
TypeInformation<?> getResultType(Class<?>[] signature): TypeInformation<Timestamp>
}Generated projects are configured to run in Flink environments:
The generated project supports:
# Build the project
mvn clean compile
# Create deployable JAR
mvn clean package
# Run locally
mvn exec:java -Dexec.mainClass="${package}.SpendReport"The shaded JAR can be submitted to Flink clusters using:
flink run target/${artifactId}-${version}.jar