Apache Flink Table API walkthrough archetype for creating Java batch processing applications
npx @tessl/cli install tessl/maven-org-apache-flink--flink-walkthrough-table-java@1.11.0Apache Flink Table API walkthrough archetype that generates complete Maven projects for developing Flink batch processing applications using the Table API. This archetype provides a template structure with all necessary dependencies, configuration, and a working example for creating Flink applications that process data using table operations.
org.apache.flink:flink-walkthrough-table-java:1.11.1Generate a new project using Maven archetype:
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-walkthrough-table-java \
-DarchetypeVersion=1.11.1 \
-DgroupId=com.example \
-DartifactId=my-flink-table-app \
-Dversion=1.0-SNAPSHOT \
-Dpackage=com.example.flinkThe archetype generates a complete Flink project with the following structure and example:
package ${package};
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.walkthrough.common.table.SpendReportTableSink;
import org.apache.flink.walkthrough.common.table.BoundedTransactionTableSource;
import org.apache.flink.walkthrough.common.table.TruncateDateToHour;
/**
* Skeleton code for the table walkthrough generated by the archetype.
* This demonstrates basic Flink Table API usage with batch processing.
*/
public class SpendReport {
public static void main(String[] args) throws Exception {
// Create execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
// Register table source and sink (using internal API as shown in generated code)
((TableEnvironmentInternal) tEnv).registerTableSourceInternal(
"transactions", new BoundedTransactionTableSource());
((TableEnvironmentInternal) tEnv).registerTableSinkInternal(
"spend_report", new SpendReportTableSink());
// Register custom function
tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour());
// Execute table operation
tEnv
.scan("transactions")
.insertInto("spend_report");
tEnv.execute("Spend Report");
}
}The archetype generates projects with these key components:
Creates a complete Maven project structure for Flink Table API applications.
<!-- Maven archetype coordinates -->
<groupId>org.apache.flink</groupId>
<artifactId>flink-walkthrough-table-java</artifactId>
<version>1.11.1</version>
<packaging>maven-archetype</packaging>Generated Project Structure:
my-flink-table-app/
├── pom.xml # Maven configuration with Flink dependencies
└── src/main/
├── java/${package}/
│ └── SpendReport.java # Main application class
└── resources/
└── log4j2.properties # Logging configurationThe generated POM includes these key Flink dependencies:
<!-- Generated POM dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Provided scope dependencies (runtime environment) -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>Properties:
flink.version: @project.version@ (replaced during archetype processing)java.version: 1.8scala.binary.version: 2.11log4j.version: @log4j.version@ (replaced during archetype processing)Includes Maven Shade plugin for creating deployable JAR files:
<!-- Maven Shade Plugin Configuration -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>${package}.SpendReport</mainClass>
</transformer>
</transformers>
</configuration>
</plugin>Generates a working Flink Table API application class:
// Generated SpendReport class structure
public class SpendReport {
/**
* Main entry point for the Flink Table API application
* @param args Command line arguments
* @throws Exception If execution fails
*/
public static void main(String[] args) throws Exception;
}Key APIs demonstrated in the generated code:
// Core execution environment setup
static ExecutionEnvironment getExecutionEnvironment(): ExecutionEnvironment
static BatchTableEnvironment create(ExecutionEnvironment env): BatchTableEnvironment
// Table registration (internal APIs used in generated template)
void registerTableSourceInternal(String name, TableSource<?> tableSource): void
void registerTableSinkInternal(String name, TableSink<?> tableSink): void
// Function registration
void registerFunction(String name, UserDefinedFunction function): void
// Table operations
Table scan(String tableName): Table
void insertInto(String tableName): void
JobExecutionResult execute(String jobName): JobExecutionResultProvides Log4j2 configuration optimized for Flink applications:
# Generated log4j2.properties
rootLogger.level = WARN
rootLogger.appenderRef.console.ref = ConsoleAppender
# Specific logger for walkthrough output
logger.sink.name = org.apache.flink.walkthrough.common.sink.LoggerOutputFormat
logger.sink.level = INFO
# Console appender configuration
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%nThe archetype is defined by a descriptor that specifies file generation:
<!-- archetype-metadata.xml -->
<archetype-descriptor name="flink-walkthrough-table-java">
<fileSets>
<!-- Java source files with package substitution -->
<fileSet filtered="true" packaged="true" encoding="UTF-8">
<directory>src/main/java</directory>
<includes>
<include>**/*.java</include>
</includes>
</fileSet>
<!-- Resource files -->
<fileSet encoding="UTF-8">
<directory>src/main/resources</directory>
</fileSet>
</fileSets>
</archetype-descriptor>When generating projects, the archetype accepts standard Maven parameters:
# Required parameters
-DgroupId=<project-group-id> # Maven group ID for generated project
-DartifactId=<project-artifact-id> # Maven artifact ID for generated project
-Dversion=<project-version> # Version for generated project
-Dpackage=<java-package> # Java package name for generated classes
# Archetype coordinates
-DarchetypeGroupId=org.apache.flink
-DarchetypeArtifactId=flink-walkthrough-table-java
-DarchetypeVersion=1.11.1Template Variable Substitution:
${groupId} - Replaced with provided groupId in POM${artifactId} - Replaced with provided artifactId in POM${version} - Replaced with provided version in POM${package} - Replaced with provided package name in Java files@project.version@ - Replaced with Flink version during archetype build@log4j.version@ - Replaced with Log4j version during archetype buildGenerated projects depend on the Flink walkthrough common library:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>Provides access to key classes:
// Table source for bounded transaction data
class BoundedTransactionTableSource extends InputFormatTableSource<Row> {
InputFormat<Row, ?> getInputFormat(): TransactionRowInputFormat
TableSchema getTableSchema(): TableSchema // (accountId: BIGINT, timestamp: TIMESTAMP, amount: DOUBLE)
DataType getProducedDataType(): DataType
}
// Table sink for spend report output
class SpendReportTableSink implements AppendStreamTableSink<Row>, BatchTableSink<Row> {
DataSink<?> consumeDataSet(DataSet<Row> dataSet): DataSink<?>
DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream): DataStreamSink<?>
TableSchema getTableSchema(): TableSchema // (accountId: BIGINT, timestamp: TIMESTAMP, amount: DOUBLE)
}
// Custom scalar function for date truncation
class TruncateDateToHour extends ScalarFunction {
long eval(long timestamp): long // Truncates timestamp to nearest hour
TypeInformation<?> getResultType(Class<?>[] signature): TypeInformation<Timestamp>
}Generated projects are configured to run in Flink environments:
The generated project supports:
# Build the project
mvn clean compile
# Create deployable JAR
mvn clean package
# Run locally
mvn exec:java -Dexec.mainClass="${package}.SpendReport"The shaded JAR can be submitted to Flink clusters using:
flink run target/${artifactId}-${version}.jar