0
# Flink Walkthrough Common
1
2
Apache Flink walkthrough common library containing shared entities and utilities for Flink streaming examples. This package provides Transaction and Alert data classes, input/output formatters, and source/sink implementations designed to support educational walkthroughs and example applications demonstrating Flink's stream processing capabilities.
3
4
## Package Information
5
6
- **Package Name**: flink-walkthrough-common_2.12
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Group ID**: org.apache.flink
10
- **Artifact ID**: flink-walkthrough-common_2.12
11
- **Installation**: Add to pom.xml:
12
```xml
13
<dependency>
14
<groupId>org.apache.flink</groupId>
15
<artifactId>flink-walkthrough-common_2.12</artifactId>
16
<version>1.14.6</version>
17
</dependency>
18
```
19
20
## Core Imports
21
22
```java
23
import org.apache.flink.walkthrough.common.entity.Transaction;
24
import org.apache.flink.walkthrough.common.entity.Alert;
25
import org.apache.flink.walkthrough.common.source.TransactionSource;
26
import org.apache.flink.walkthrough.common.source.TransactionRowInputFormat;
27
import org.apache.flink.walkthrough.common.sink.AlertSink;
28
import org.apache.flink.walkthrough.common.sink.LoggerOutputFormat;
29
```
30
31
## Basic Usage
32
33
```java
34
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
35
import org.apache.flink.streaming.api.datastream.DataStream;
36
import org.apache.flink.walkthrough.common.entity.Transaction;
37
import org.apache.flink.walkthrough.common.entity.Alert;
38
import org.apache.flink.walkthrough.common.source.TransactionSource;
39
import org.apache.flink.walkthrough.common.sink.AlertSink;
40
41
// Create streaming environment
42
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
43
44
// Create transaction stream with built-in sample data
45
DataStream<Transaction> transactions = env.addSource(new TransactionSource())
46
.name("transactions");
47
48
// Process transactions to generate alerts
49
DataStream<Alert> alerts = transactions
50
.filter(transaction -> transaction.getAmount() > 1000.0)
51
.map(transaction -> {
52
Alert alert = new Alert();
53
alert.setId(transaction.getAccountId());
54
return alert;
55
});
56
57
// Output alerts using provided sink
58
alerts.addSink(new AlertSink()).name("alerts");
59
60
env.execute("Fraud Detection");
61
```
62
63
## Capabilities
64
65
### Entity Classes
66
67
Core data classes representing transaction and alert events for streaming applications.
68
69
#### Transaction Entity
70
71
Represents a financial transaction with account ID, timestamp, and amount.
72
73
```java { .api }
74
public final class Transaction {
75
public Transaction();
76
public Transaction(long accountId, long timestamp, double amount);
77
78
public long getAccountId();
79
public void setAccountId(long accountId);
80
81
public long getTimestamp();
82
public void setTimestamp(long timestamp);
83
84
public double getAmount();
85
public void setAmount(double amount);
86
87
@Override
88
public boolean equals(Object o);
89
90
@Override
91
public int hashCode();
92
93
@Override
94
public String toString();
95
}
96
```
97
98
#### Alert Entity
99
100
Represents an alert event with an identifier.
101
102
```java { .api }
103
public final class Alert {
104
public Alert();
105
106
public long getId();
107
public void setId(long id);
108
109
@Override
110
public boolean equals(Object o);
111
112
@Override
113
public int hashCode();
114
115
@Override
116
public String toString();
117
}
118
```
119
120
### Data Sources
121
122
Source functions for generating transaction streams in Flink applications.
123
124
#### Transaction Source
125
126
Streaming source that generates transactions with rate limiting for realistic simulation.
127
128
```java { .api }
129
public class TransactionSource extends FromIteratorFunction<Transaction> {
130
public TransactionSource();
131
}
132
```
133
134
**Usage:**
135
```java
136
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
137
import org.apache.flink.streaming.api.datastream.DataStream;
138
139
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
140
DataStream<Transaction> transactions = env.addSource(new TransactionSource());
141
```
142
143
The TransactionSource generates an endless stream of sample transaction data with a built-in rate limit (100ms delay between transactions) and predefined transaction amounts across 5 different account IDs.
144
145
#### Transaction Row Input Format
146
147
Batch input format for reading transactions as Flink Row objects.
148
149
```java { .api }
150
public class TransactionRowInputFormat extends GenericInputFormat<Row>
151
implements NonParallelInput {
152
@Override
153
public void open(GenericInputSplit split);
154
155
@Override
156
public boolean reachedEnd();
157
158
@Override
159
public Row nextRecord(Row reuse);
160
}
161
```
162
163
**Usage:**
164
```java
165
import org.apache.flink.api.java.ExecutionEnvironment;
166
import org.apache.flink.api.java.DataSet;
167
import org.apache.flink.types.Row;
168
169
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
170
DataSet<Row> transactionRows = env.createInput(new TransactionRowInputFormat());
171
```
172
173
### Data Sinks
174
175
Sink functions for outputting processed data from Flink streams.
176
177
#### Alert Sink
178
179
Streaming sink that logs alert events to standard output.
180
181
```java { .api }
182
public class AlertSink implements SinkFunction<Alert> {
183
@Override
184
public void invoke(Alert value, Context context);
185
}
186
```
187
188
**Usage:**
189
```java
190
import org.apache.flink.streaming.api.datastream.DataStream;
191
192
// Assuming you have a DataStream<Alert>
193
DataStream<Alert> alerts = // ... your alert stream
194
alerts.addSink(new AlertSink());
195
```
196
197
#### Logger Output Format
198
199
Batch output format that logs string records at INFO level.
200
201
```java { .api }
202
public class LoggerOutputFormat implements OutputFormat<String> {
203
@Override
204
public void configure(Configuration parameters);
205
206
@Override
207
public void open(int taskNumber, int numTasks);
208
209
@Override
210
public void writeRecord(String record);
211
212
@Override
213
public void close();
214
}
215
```
216
217
**Usage:**
218
```java
219
import org.apache.flink.api.java.DataSet;
220
221
// Assuming you have a DataSet<String>
222
DataSet<String> dataSet = // ... your data
223
dataSet.output(new LoggerOutputFormat());
224
```
225
226
## Architecture
227
228
The library follows Flink's standard patterns and integrates with core Flink interfaces:
229
230
- **Entity Classes**: Serializable POJOs suitable for Flink's type system
231
- **Source Functions**: Extend Flink's source abstractions for both streaming and batch
232
- **Sink Functions**: Implement Flink's sink interfaces for data output
233
- **Rate Limiting**: Built-in throttling in TransactionSource for realistic data simulation
234
- **Sample Data**: Predefined transaction dataset with varied amounts for testing fraud detection scenarios
235
236
## Dependencies
237
238
- **Apache Flink**: flink-streaming-java_2.12 (provided scope)
239
- **SLF4J**: Logging framework for output formatting
240
- **Java Standard Library**: Core Java APIs for collections, serialization, and SQL timestamp handling
241
242
## Common Patterns
243
244
### Fraud Detection Pipeline
245
246
```java
247
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
248
import org.apache.flink.streaming.api.windowing.time.Time;
249
250
DataStream<Transaction> transactions = env.addSource(new TransactionSource());
251
252
DataStream<Alert> alerts = transactions
253
.keyBy(Transaction::getAccountId)
254
.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
255
.process(new FraudDetectionFunction()) // Custom function
256
.map(transaction -> {
257
Alert alert = new Alert();
258
alert.setId(transaction.getAccountId());
259
return alert;
260
});
261
262
alerts.addSink(new AlertSink());
263
```
264
265
### Batch Transaction Processing
266
267
```java
268
import org.apache.flink.api.java.ExecutionEnvironment;
269
import org.apache.flink.api.java.DataSet;
270
import org.apache.flink.types.Row;
271
272
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
273
274
DataSet<Row> transactions = env.createInput(new TransactionRowInputFormat());
275
276
DataSet<String> processedData = transactions
277
.map(new ProcessTransactionFunction()) // Custom function
278
.reduce(new SummarizeFunction()); // Custom function
279
280
processedData.output(new LoggerOutputFormat());
281
```
282
283
## Error Handling
284
285
The library follows standard Java exception patterns:
286
287
- **RuntimeException**: Thrown by TransactionSource on thread interruption
288
- **Standard Java Exceptions**: Standard getter/setter and construction exceptions apply
289
- **Flink Exceptions**: All Flink-specific exceptions (serialization, checkpointing) are handled by the framework