Maven archetype for building fraud detection applications using Apache Flink DataStream API with Scala
npx @tessl/cli install tessl/maven-org-apache-flink--flink-walkthrough-datastream-scala@1.16.00
# Flink Walkthrough DataStream Scala
1
2
Apache Flink DataStream Scala walkthrough is a Maven archetype that provides a complete template for building fraud detection applications using Apache Flink's DataStream API with Scala. This archetype generates skeleton code that developers can customize to implement real-time stream processing applications for fraud detection and similar use cases.
3
4
## Package Information
5
6
- **Package Name**: flink-walkthrough-datastream-scala
7
- **Package Type**: maven archetype
8
- **Group ID**: org.apache.flink
9
- **Language**: Scala
10
- **Installation**: `mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-walkthrough-datastream-scala -DarchetypeVersion=1.16.3`
11
12
## Core Imports
13
14
The generated fraud detection application uses the following core imports:
15
16
```scala
17
import org.apache.flink.streaming.api.scala._
18
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
19
import org.apache.flink.util.Collector
20
import org.apache.flink.walkthrough.common.sink.AlertSink
21
import org.apache.flink.walkthrough.common.entity.{Alert, Transaction}
22
import org.apache.flink.walkthrough.common.source.TransactionSource
23
```
24
25
## Core Usage
26
27
Generate a new fraud detection project from the archetype:
28
29
```bash
30
mvn archetype:generate \
31
-DarchetypeGroupId=org.apache.flink \
32
-DarchetypeArtifactId=flink-walkthrough-datastream-scala \
33
-DarchetypeVersion=1.16.3 \
34
-DgroupId=com.example \
35
-DartifactId=my-fraud-detection \
36
-Dversion=1.0-SNAPSHOT \
37
-Dpackage=com.example.fraud
38
```
39
40
## Basic Usage
41
42
After generating the project, the archetype provides a complete skeleton for a fraud detection application:
43
44
```scala
45
// Generated FraudDetectionJob.scala
46
import org.apache.flink.streaming.api.scala._
47
import org.apache.flink.walkthrough.common.sink.AlertSink
48
import org.apache.flink.walkthrough.common.entity.{Alert, Transaction}
49
import org.apache.flink.walkthrough.common.source.TransactionSource
50
51
object FraudDetectionJob {
52
def main(args: Array[String]): Unit = {
53
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
54
55
val transactions: DataStream[Transaction] = env
56
.addSource(new TransactionSource)
57
.name("transactions")
58
59
val alerts: DataStream[Alert] = transactions
60
.keyBy(transaction => transaction.getAccountId)
61
.process(new FraudDetector)
62
.name("fraud-detector")
63
64
alerts
65
.addSink(new AlertSink)
66
.name("send-alerts")
67
68
env.execute("Fraud Detection")
69
}
70
}
71
```
72
73
## Architecture
74
75
The archetype generates a complete fraud detection application structure:
76
77
- **Main Application**: `FraudDetectionJob` object serving as the entry point
78
- **Stream Processing Logic**: `FraudDetector` class implementing the fraud detection algorithm
79
- **Data Sources**: Integration with `TransactionSource` for transaction data streams
80
- **Data Sinks**: Integration with `AlertSink` for outputting fraud alerts
81
- **Maven Configuration**: Complete POM with Flink dependencies and build configuration
82
83
## Capabilities
84
85
### Main Application Entry Point
86
87
The primary application class that sets up the Flink streaming environment and defines the data processing pipeline.
88
89
```scala { .api }
90
object FraudDetectionJob {
91
/**
92
* Main entry point for the fraud detection streaming application
93
* @param args Command line arguments passed to the application
94
* @throws Exception if the streaming job fails to execute
95
*/
96
@throws[Exception]
97
def main(args: Array[String]): Unit
98
}
99
```
100
101
### Fraud Detection Processing Function
102
103
A stateful stream processing function that analyzes transaction patterns to detect fraudulent activity.
104
105
```scala { .api }
106
/**
107
* Stateful fraud detection processor extending KeyedProcessFunction
108
* Processes transactions keyed by account ID to detect suspicious patterns
109
*/
110
@SerialVersionUID(1L)
111
class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
112
113
/**
114
* Processes each transaction event and emits alerts for suspicious activity
115
* @param transaction The input transaction to analyze
116
* @param context Processing context providing access to timers and state
117
* @param collector Output collector for emitting fraud alerts
118
* @throws Exception if processing fails
119
*/
120
@throws[Exception]
121
def processElement(
122
transaction: Transaction,
123
context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
124
collector: Collector[Alert]): Unit
125
}
126
```
127
128
### Fraud Detection Constants
129
130
Configuration constants used in fraud detection logic.
131
132
```scala { .api }
133
object FraudDetector {
134
/** Threshold for small transaction amounts */
135
val SMALL_AMOUNT: Double = 1.00
136
137
/** Threshold for large transaction amounts */
138
val LARGE_AMOUNT: Double = 500.00
139
140
/** Time constant representing one minute in milliseconds */
141
val ONE_MINUTE: Long = 60 * 1000L
142
}
143
```
144
145
## Generated Project Structure
146
147
When instantiated, the archetype creates the following project structure:
148
149
```
150
my-fraud-detection/
151
├── pom.xml # Maven build configuration with Flink dependencies
152
└── src/
153
└── main/
154
├── scala/
155
│ ├── FraudDetectionJob.scala # Main application entry point
156
│ └── FraudDetector.scala # Fraud detection logic
157
└── resources/
158
└── log4j2.properties # Preconfigured logging for Flink applications
159
```
160
161
## Dependencies and Integration
162
163
The generated project includes the following key dependencies:
164
165
### Flink Framework Dependencies
166
- **flink-streaming-scala**: Core Flink streaming API for Scala (provided scope)
167
- **flink-clients**: Flink client libraries (provided scope)
168
- **flink-walkthrough-common**: Common entities and utilities for walkthrough examples
169
170
### External Entity Types
171
The archetype integrates with common Flink walkthrough entities:
172
173
```scala { .api }
174
// From flink-walkthrough-common dependency
175
import org.apache.flink.walkthrough.common.entity.Transaction
176
import org.apache.flink.walkthrough.common.entity.Alert
177
import org.apache.flink.walkthrough.common.source.TransactionSource
178
import org.apache.flink.walkthrough.common.sink.AlertSink
179
180
// From Flink framework
181
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
182
import org.apache.flink.util.Collector
183
```
184
185
### Build Configuration
186
187
The generated project includes:
188
- **Maven Shade Plugin**: Creates fat JAR with dependencies
189
- **Scala Maven Plugin**: Compiles Scala source code
190
- **Java 8 Target**: Compatible with Flink runtime requirements
191
- **Main Class Configuration**: `${package}.FraudDetectionJob` as entry point
192
193
### Logging Configuration
194
195
The archetype includes a preconfigured `log4j2.properties` file with optimized settings for Flink applications:
196
197
```properties
198
rootLogger.level = WARN
199
rootLogger.appenderRef.console.ref = ConsoleAppender
200
201
logger.sink.name = org.apache.flink.walkthrough.common.sink.AlertSink
202
logger.sink.level = INFO
203
204
appender.console.name = ConsoleAppender
205
appender.console.type = CONSOLE
206
appender.console.layout.type = PatternLayout
207
appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
208
```
209
210
This configuration enables INFO-level logging for the AlertSink to display fraud alerts in the console while keeping other components at WARN level to reduce noise.
211
212
## Usage Examples
213
214
### Customizing Fraud Detection Logic
215
216
Developers can customize the `FraudDetector.processElement` method to implement specific fraud detection rules:
217
218
```scala
219
class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
220
221
def processElement(
222
transaction: Transaction,
223
context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
224
collector: Collector[Alert]): Unit = {
225
226
// Custom fraud detection logic
227
if (transaction.getAmount > FraudDetector.LARGE_AMOUNT) {
228
val alert = new Alert
229
alert.setId(transaction.getAccountId)
230
alert.setMessage(s"Large transaction detected: ${transaction.getAmount}")
231
collector.collect(alert)
232
}
233
}
234
}
235
```
236
237
### Running the Generated Application
238
239
```bash
240
# Build the project
241
mvn clean package
242
243
# Run locally
244
mvn exec:java -Dexec.mainClass="com.example.fraud.FraudDetectionJob"
245
246
# Or run the fat JAR
247
java -jar target/my-fraud-detection-1.0-SNAPSHOT.jar
248
```
249
250
## Template Variables
251
252
The archetype uses the following Maven template variables that are replaced during project generation:
253
254
- **${package}**: Package name for generated classes
255
- **${groupId}**: Maven group ID for the project
256
- **${artifactId}**: Maven artifact ID for the project
257
- **${version}**: Version for the generated project
258
259
These variables are automatically substituted when running `mvn archetype:generate` with the corresponding `-D` parameters.