or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.md

index.mddocs/

0

# Flink Walkthrough Table Java

1

2

Apache Flink Table API walkthrough archetype that generates complete Maven projects for developing Flink batch processing applications using the Table API. This archetype provides a template structure with all necessary dependencies, configuration, and a working example for creating Flink applications that process data using table operations.

3

4

## Package Information

5

6

- **Package Name**: flink-walkthrough-table-java

7

- **Package Type**: maven

8

- **Language**: Java

9

- **Installation**: Available through Maven Central as an archetype

10

- **Coordinates**: `org.apache.flink:flink-walkthrough-table-java:1.11.1`

11

12

## Core Usage

13

14

Generate a new project using Maven archetype:

15

16

```bash

17

mvn archetype:generate \

18

-DarchetypeGroupId=org.apache.flink \

19

-DarchetypeArtifactId=flink-walkthrough-table-java \

20

-DarchetypeVersion=1.11.1 \

21

-DgroupId=com.example \

22

-DartifactId=my-flink-table-app \

23

-Dversion=1.0-SNAPSHOT \

24

-Dpackage=com.example.flink

25

```

26

27

## Basic Usage

28

29

The archetype generates a complete Flink project with the following structure and example:

30

31

```java

32

package ${package};

33

34

import org.apache.flink.api.java.ExecutionEnvironment;

35

import org.apache.flink.table.api.internal.TableEnvironmentInternal;

36

import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;

37

import org.apache.flink.walkthrough.common.table.SpendReportTableSink;

38

import org.apache.flink.walkthrough.common.table.BoundedTransactionTableSource;

39

import org.apache.flink.walkthrough.common.table.TruncateDateToHour;

40

41

/**

42

* Skeleton code for the table walkthrough generated by the archetype.

43

* This demonstrates basic Flink Table API usage with batch processing.

44

*/

45

public class SpendReport {

46

public static void main(String[] args) throws Exception {

47

// Create execution environment

48

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

49

BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);

50

51

// Register table source and sink (using internal API as shown in generated code)

52

((TableEnvironmentInternal) tEnv).registerTableSourceInternal(

53

"transactions", new BoundedTransactionTableSource());

54

((TableEnvironmentInternal) tEnv).registerTableSinkInternal(

55

"spend_report", new SpendReportTableSink());

56

57

// Register custom function

58

tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour());

59

60

// Execute table operation

61

tEnv

62

.scan("transactions")

63

.insertInto("spend_report");

64

65

tEnv.execute("Spend Report");

66

}

67

}

68

```

69

70

## Architecture

71

72

The archetype generates projects with these key components:

73

74

- **Maven Project Structure**: Standard Maven directory layout with proper Java packaging

75

- **Flink Dependencies**: Pre-configured POM with all necessary Flink Table API dependencies

76

- **Build Configuration**: Maven Shade plugin setup for creating deployable fat JARs

77

- **Example Application**: Working SpendReport class demonstrating table operations

78

- **Logging Setup**: Log4j2 configuration optimized for Flink applications

79

80

## Capabilities

81

82

### Project Generation

83

84

Creates a complete Maven project structure for Flink Table API applications.

85

86

```xml { .api }

87

<!-- Maven archetype coordinates -->

88

<groupId>org.apache.flink</groupId>

89

<artifactId>flink-walkthrough-table-java</artifactId>

90

<version>1.11.1</version>

91

<packaging>maven-archetype</packaging>

92

```

93

94

**Generated Project Structure:**

95

```

96

my-flink-table-app/

97

├── pom.xml # Maven configuration with Flink dependencies

98

└── src/main/

99

├── java/${package}/

100

│ └── SpendReport.java # Main application class

101

└── resources/

102

└── log4j2.properties # Logging configuration

103

```

104

105

### Maven Dependencies Configuration

106

107

The generated POM includes these key Flink dependencies:

108

109

```xml { .api }

110

<!-- Generated POM dependencies -->

111

<dependency>

112

<groupId>org.apache.flink</groupId>

113

<artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>

114

<version>${flink.version}</version>

115

</dependency>

116

117

<!-- Provided scope dependencies (runtime environment) -->

118

<dependency>

119

<groupId>org.apache.flink</groupId>

120

<artifactId>flink-java</artifactId>

121

<version>${flink.version}</version>

122

<scope>provided</scope>

123

</dependency>

124

125

<dependency>

126

<groupId>org.apache.flink</groupId>

127

<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>

128

<version>${flink.version}</version>

129

<scope>provided</scope>

130

</dependency>

131

132

<dependency>

133

<groupId>org.apache.flink</groupId>

134

<artifactId>flink-clients_${scala.binary.version}</artifactId>

135

<version>${flink.version}</version>

136

<scope>provided</scope>

137

</dependency>

138

139

<dependency>

140

<groupId>org.apache.flink</groupId>

141

<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>

142

<version>${flink.version}</version>

143

<scope>provided</scope>

144

</dependency>

145

146

<dependency>

147

<groupId>org.apache.flink</groupId>

148

<artifactId>flink-table-planner_${scala.binary.version}</artifactId>

149

<version>${flink.version}</version>

150

<scope>provided</scope>

151

</dependency>

152

```

153

154

**Properties:**

155

- `flink.version`: @project.version@ (replaced during archetype processing)

156

- `java.version`: 1.8

157

- `scala.binary.version`: 2.11

158

- `log4j.version`: @log4j.version@ (replaced during archetype processing)

159

160

### Build Configuration

161

162

Includes Maven Shade plugin for creating deployable JAR files:

163

164

```xml { .api }

165

<!-- Maven Shade Plugin Configuration -->

166

<plugin>

167

<groupId>org.apache.maven.plugins</groupId>

168

<artifactId>maven-shade-plugin</artifactId>

169

<version>3.0.0</version>

170

<configuration>

171

<transformers>

172

<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">

173

<mainClass>${package}.SpendReport</mainClass>

174

</transformer>

175

</transformers>

176

</configuration>

177

</plugin>

178

```

179

180

### Example Application Template

181

182

Generates a working Flink Table API application class:

183

184

```java { .api }

185

// Generated SpendReport class structure

186

public class SpendReport {

187

/**

188

* Main entry point for the Flink Table API application

189

* @param args Command line arguments

190

* @throws Exception If execution fails

191

*/

192

public static void main(String[] args) throws Exception;

193

}

194

```

195

196

**Key APIs demonstrated in the generated code:**

197

198

```java { .api }

199

// Core execution environment setup

200

static ExecutionEnvironment getExecutionEnvironment(): ExecutionEnvironment

201

static BatchTableEnvironment create(ExecutionEnvironment env): BatchTableEnvironment

202

203

// Table registration (internal APIs used in generated template)

204

void registerTableSourceInternal(String name, TableSource<?> tableSource): void

205

void registerTableSinkInternal(String name, TableSink<?> tableSink): void

206

207

// Function registration

208

void registerFunction(String name, UserDefinedFunction function): void

209

210

// Table operations

211

Table scan(String tableName): Table

212

void insertInto(String tableName): void

213

JobExecutionResult execute(String jobName): JobExecutionResult

214

```

215

216

### Logging Configuration

217

218

Provides Log4j2 configuration optimized for Flink applications:

219

220

```properties { .api }

221

# Generated log4j2.properties

222

rootLogger.level = WARN

223

rootLogger.appenderRef.console.ref = ConsoleAppender

224

225

# Specific logger for walkthrough output

226

logger.sink.name = org.apache.flink.walkthrough.common.sink.LoggerOutputFormat

227

logger.sink.level = INFO

228

229

# Console appender configuration

230

appender.console.name = ConsoleAppender

231

appender.console.type = CONSOLE

232

appender.console.layout.type = PatternLayout

233

appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n

234

```

235

236

## Archetype Descriptor

237

238

The archetype is defined by a descriptor that specifies file generation:

239

240

```xml { .api }

241

<!-- archetype-metadata.xml -->

242

<archetype-descriptor name="flink-walkthrough-table-java">

243

<fileSets>

244

<!-- Java source files with package substitution -->

245

<fileSet filtered="true" packaged="true" encoding="UTF-8">

246

<directory>src/main/java</directory>

247

<includes>

248

<include>**/*.java</include>

249

</includes>

250

</fileSet>

251

<!-- Resource files -->

252

<fileSet encoding="UTF-8">

253

<directory>src/main/resources</directory>

254

</fileSet>

255

</fileSets>

256

</archetype-descriptor>

257

```

258

259

## Maven Generation Parameters

260

261

When generating projects, the archetype accepts standard Maven parameters:

262

263

```bash { .api }

264

# Required parameters

265

-DgroupId=<project-group-id> # Maven group ID for generated project

266

-DartifactId=<project-artifact-id> # Maven artifact ID for generated project

267

-Dversion=<project-version> # Version for generated project

268

-Dpackage=<java-package> # Java package name for generated classes

269

270

# Archetype coordinates

271

-DarchetypeGroupId=org.apache.flink

272

-DarchetypeArtifactId=flink-walkthrough-table-java

273

-DarchetypeVersion=1.11.1

274

```

275

276

**Template Variable Substitution:**

277

- `${groupId}` - Replaced with provided groupId in POM

278

- `${artifactId}` - Replaced with provided artifactId in POM

279

- `${version}` - Replaced with provided version in POM

280

- `${package}` - Replaced with provided package name in Java files

281

- `@project.version@` - Replaced with Flink version during archetype build

282

- `@log4j.version@` - Replaced with Log4j version during archetype build

283

284

## Integration Points

285

286

### Flink Common Walkthrough

287

288

Generated projects depend on the Flink walkthrough common library:

289

290

```xml { .api }

291

<dependency>

292

<groupId>org.apache.flink</groupId>

293

<artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>

294

<version>${flink.version}</version>

295

</dependency>

296

```

297

298

**Provides access to key classes:**

299

300

```java { .api }

301

// Table source for bounded transaction data

302

class BoundedTransactionTableSource extends InputFormatTableSource<Row> {

303

InputFormat<Row, ?> getInputFormat(): TransactionRowInputFormat

304

TableSchema getTableSchema(): TableSchema // (accountId: BIGINT, timestamp: TIMESTAMP, amount: DOUBLE)

305

DataType getProducedDataType(): DataType

306

}

307

308

// Table sink for spend report output

309

class SpendReportTableSink implements AppendStreamTableSink<Row>, BatchTableSink<Row> {

310

DataSink<?> consumeDataSet(DataSet<Row> dataSet): DataSink<?>

311

DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream): DataStreamSink<?>

312

TableSchema getTableSchema(): TableSchema // (accountId: BIGINT, timestamp: TIMESTAMP, amount: DOUBLE)

313

}

314

315

// Custom scalar function for date truncation

316

class TruncateDateToHour extends ScalarFunction {

317

long eval(long timestamp): long // Truncates timestamp to nearest hour

318

TypeInformation<?> getResultType(Class<?>[] signature): TypeInformation<Timestamp>

319

}

320

```

321

322

### Flink Runtime Environment

323

324

Generated projects are configured to run in Flink environments:

325

326

- **Local Development**: Can run directly in IDE using provided dependencies

327

- **Cluster Deployment**: Shade plugin creates fat JARs suitable for cluster submission

328

- **Dependency Scopes**: Core Flink libraries marked as 'provided' to avoid conflicts

329

330

### Build and Deployment

331

332

The generated project supports:

333

334

```bash { .api }

335

# Build the project

336

mvn clean compile

337

338

# Create deployable JAR

339

mvn clean package

340

341

# Run locally

342

mvn exec:java -Dexec.mainClass="${package}.SpendReport"

343

```

344

345

The shaded JAR can be submitted to Flink clusters using:

346

347

```bash

348

flink run target/${artifactId}-${version}.jar

349

```