CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-walkthrough-table-java

Apache Flink Table API walkthrough archetype for creating Java batch processing applications

Pending
Overview
Eval results
Files

Flink Walkthrough Table Java

Apache Flink Table API walkthrough archetype that generates complete Maven projects for developing Flink batch processing applications using the Table API. This archetype provides a template structure with all necessary dependencies, configuration, and a working example for creating Flink applications that process data using table operations.

Package Information

  • Package Name: flink-walkthrough-table-java
  • Package Type: maven
  • Language: Java
  • Installation: Available through Maven Central as an archetype
  • Coordinates: org.apache.flink:flink-walkthrough-table-java:1.11.1

Core Usage

Generate a new project using Maven archetype:

mvn archetype:generate \
  -DarchetypeGroupId=org.apache.flink \
  -DarchetypeArtifactId=flink-walkthrough-table-java \
  -DarchetypeVersion=1.11.1 \
  -DgroupId=com.example \
  -DartifactId=my-flink-table-app \
  -Dversion=1.0-SNAPSHOT \
  -Dpackage=com.example.flink

Basic Usage

The archetype generates a complete Flink project with the following structure and example:

package ${package};

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.walkthrough.common.table.SpendReportTableSink;
import org.apache.flink.walkthrough.common.table.BoundedTransactionTableSource;
import org.apache.flink.walkthrough.common.table.TruncateDateToHour;

/**
 * Skeleton code for the table walkthrough generated by the archetype.
 * This demonstrates basic Flink Table API usage with batch processing.
 */
public class SpendReport {
    public static void main(String[] args) throws Exception {
        // Create execution environment
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);

        // Register table source and sink (using internal API as shown in generated code)
        ((TableEnvironmentInternal) tEnv).registerTableSourceInternal(
            "transactions", new BoundedTransactionTableSource());
        ((TableEnvironmentInternal) tEnv).registerTableSinkInternal(
            "spend_report", new SpendReportTableSink());
        
        // Register custom function
        tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour());

        // Execute table operation
        tEnv
            .scan("transactions")
            .insertInto("spend_report");
        
        tEnv.execute("Spend Report");
    }
}

Architecture

The archetype generates projects with these key components:

  • Maven Project Structure: Standard Maven directory layout with proper Java packaging
  • Flink Dependencies: Pre-configured POM with all necessary Flink Table API dependencies
  • Build Configuration: Maven Shade plugin setup for creating deployable fat JARs
  • Example Application: Working SpendReport class demonstrating table operations
  • Logging Setup: Log4j2 configuration optimized for Flink applications

Capabilities

Project Generation

Creates a complete Maven project structure for Flink Table API applications.

<!-- Maven archetype coordinates -->
<groupId>org.apache.flink</groupId>
<artifactId>flink-walkthrough-table-java</artifactId>
<version>1.11.1</version>
<packaging>maven-archetype</packaging>

Generated Project Structure:

my-flink-table-app/
├── pom.xml                          # Maven configuration with Flink dependencies
└── src/main/
    ├── java/${package}/
    │   └── SpendReport.java         # Main application class
    └── resources/
        └── log4j2.properties        # Logging configuration

Maven Dependencies Configuration

The generated POM includes these key Flink dependencies:

<!-- Generated POM dependencies -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

<!-- Provided scope dependencies (runtime environment) -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>

Properties:

  • flink.version: @project.version@ (replaced during archetype processing)
  • java.version: 1.8
  • scala.binary.version: 2.11
  • log4j.version: @log4j.version@ (replaced during archetype processing)

Build Configuration

Includes Maven Shade plugin for creating deployable JAR files:

<!-- Maven Shade Plugin Configuration -->
<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <version>3.0.0</version>
    <configuration>
        <transformers>
            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                <mainClass>${package}.SpendReport</mainClass>
            </transformer>
        </transformers>
    </configuration>
</plugin>

Example Application Template

Generates a working Flink Table API application class:

// Generated SpendReport class structure
public class SpendReport {
    /**
     * Main entry point for the Flink Table API application
     * @param args Command line arguments
     * @throws Exception If execution fails
     */
    public static void main(String[] args) throws Exception;
}

Key APIs demonstrated in the generated code:

// Core execution environment setup
static ExecutionEnvironment getExecutionEnvironment(): ExecutionEnvironment
static BatchTableEnvironment create(ExecutionEnvironment env): BatchTableEnvironment

// Table registration (internal APIs used in generated template)
void registerTableSourceInternal(String name, TableSource<?> tableSource): void
void registerTableSinkInternal(String name, TableSink<?> tableSink): void

// Function registration
void registerFunction(String name, UserDefinedFunction function): void

// Table operations
Table scan(String tableName): Table
void insertInto(String tableName): void
JobExecutionResult execute(String jobName): JobExecutionResult

Logging Configuration

Provides Log4j2 configuration optimized for Flink applications:

# Generated log4j2.properties
rootLogger.level = WARN
rootLogger.appenderRef.console.ref = ConsoleAppender

# Specific logger for walkthrough output
logger.sink.name = org.apache.flink.walkthrough.common.sink.LoggerOutputFormat
logger.sink.level = INFO

# Console appender configuration
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n

Archetype Descriptor

The archetype is defined by a descriptor that specifies file generation:

<!-- archetype-metadata.xml -->
<archetype-descriptor name="flink-walkthrough-table-java">
  <fileSets>
    <!-- Java source files with package substitution -->
    <fileSet filtered="true" packaged="true" encoding="UTF-8">
      <directory>src/main/java</directory>
      <includes>
        <include>**/*.java</include>
      </includes>
    </fileSet>
    <!-- Resource files -->
    <fileSet encoding="UTF-8">
      <directory>src/main/resources</directory>
    </fileSet>
  </fileSets>
</archetype-descriptor>

Maven Generation Parameters

When generating projects, the archetype accepts standard Maven parameters:

# Required parameters
-DgroupId=<project-group-id>         # Maven group ID for generated project
-DartifactId=<project-artifact-id>   # Maven artifact ID for generated project
-Dversion=<project-version>          # Version for generated project
-Dpackage=<java-package>             # Java package name for generated classes

# Archetype coordinates
-DarchetypeGroupId=org.apache.flink
-DarchetypeArtifactId=flink-walkthrough-table-java
-DarchetypeVersion=1.11.1

Template Variable Substitution:

  • ${groupId} - Replaced with provided groupId in POM
  • ${artifactId} - Replaced with provided artifactId in POM
  • ${version} - Replaced with provided version in POM
  • ${package} - Replaced with provided package name in Java files
  • @project.version@ - Replaced with Flink version during archetype build
  • @log4j.version@ - Replaced with Log4j version during archetype build

Integration Points

Flink Common Walkthrough

Generated projects depend on the Flink walkthrough common library:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

Provides access to key classes:

// Table source for bounded transaction data
class BoundedTransactionTableSource extends InputFormatTableSource<Row> {
  InputFormat<Row, ?> getInputFormat(): TransactionRowInputFormat
  TableSchema getTableSchema(): TableSchema // (accountId: BIGINT, timestamp: TIMESTAMP, amount: DOUBLE)
  DataType getProducedDataType(): DataType
}

// Table sink for spend report output
class SpendReportTableSink implements AppendStreamTableSink<Row>, BatchTableSink<Row> {
  DataSink<?> consumeDataSet(DataSet<Row> dataSet): DataSink<?>
  DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream): DataStreamSink<?>
  TableSchema getTableSchema(): TableSchema // (accountId: BIGINT, timestamp: TIMESTAMP, amount: DOUBLE)
}

// Custom scalar function for date truncation
class TruncateDateToHour extends ScalarFunction {
  long eval(long timestamp): long // Truncates timestamp to nearest hour
  TypeInformation<?> getResultType(Class<?>[] signature): TypeInformation<Timestamp>
}

Flink Runtime Environment

Generated projects are configured to run in Flink environments:

  • Local Development: Can run directly in IDE using provided dependencies
  • Cluster Deployment: Shade plugin creates fat JARs suitable for cluster submission
  • Dependency Scopes: Core Flink libraries marked as 'provided' to avoid conflicts

Build and Deployment

The generated project supports:

# Build the project
mvn clean compile

# Create deployable JAR
mvn clean package

# Run locally
mvn exec:java -Dexec.mainClass="${package}.SpendReport"

The shaded JAR can be submitted to Flink clusters using:

flink run target/${artifactId}-${version}.jar

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-walkthrough-table-java
Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-walkthrough-table-java@1.11.x
Badge
tessl/maven-org-apache-flink--flink-walkthrough-table-java badge