Maven archetype for building fraud detection applications using Apache Flink DataStream API with Scala
npx @tessl/cli install tessl/maven-org-apache-flink--flink-walkthrough-datastream-scala@1.16.0Apache Flink DataStream Scala walkthrough is a Maven archetype that provides a complete template for building fraud detection applications using Apache Flink's DataStream API with Scala. This archetype generates skeleton code that developers can customize to implement real-time stream processing applications for fraud detection and similar use cases.
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-walkthrough-datastream-scala -DarchetypeVersion=1.16.3The generated fraud detection application uses the following core imports:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.util.Collector
import org.apache.flink.walkthrough.common.sink.AlertSink
import org.apache.flink.walkthrough.common.entity.{Alert, Transaction}
import org.apache.flink.walkthrough.common.source.TransactionSourceGenerate a new fraud detection project from the archetype:
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-walkthrough-datastream-scala \
-DarchetypeVersion=1.16.3 \
-DgroupId=com.example \
-DartifactId=my-fraud-detection \
-Dversion=1.0-SNAPSHOT \
-Dpackage=com.example.fraudAfter generating the project, the archetype provides a complete skeleton for a fraud detection application:
// Generated FraudDetectionJob.scala
import org.apache.flink.streaming.api.scala._
import org.apache.flink.walkthrough.common.sink.AlertSink
import org.apache.flink.walkthrough.common.entity.{Alert, Transaction}
import org.apache.flink.walkthrough.common.source.TransactionSource
object FraudDetectionJob {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val transactions: DataStream[Transaction] = env
.addSource(new TransactionSource)
.name("transactions")
val alerts: DataStream[Alert] = transactions
.keyBy(transaction => transaction.getAccountId)
.process(new FraudDetector)
.name("fraud-detector")
alerts
.addSink(new AlertSink)
.name("send-alerts")
env.execute("Fraud Detection")
}
}The archetype generates a complete fraud detection application structure:
FraudDetectionJob object serving as the entry pointFraudDetector class implementing the fraud detection algorithmTransactionSource for transaction data streamsAlertSink for outputting fraud alertsThe primary application class that sets up the Flink streaming environment and defines the data processing pipeline.
object FraudDetectionJob {
/**
* Main entry point for the fraud detection streaming application
* @param args Command line arguments passed to the application
* @throws Exception if the streaming job fails to execute
*/
@throws[Exception]
def main(args: Array[String]): Unit
}A stateful stream processing function that analyzes transaction patterns to detect fraudulent activity.
/**
* Stateful fraud detection processor extending KeyedProcessFunction
* Processes transactions keyed by account ID to detect suspicious patterns
*/
@SerialVersionUID(1L)
class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
/**
* Processes each transaction event and emits alerts for suspicious activity
* @param transaction The input transaction to analyze
* @param context Processing context providing access to timers and state
* @param collector Output collector for emitting fraud alerts
* @throws Exception if processing fails
*/
@throws[Exception]
def processElement(
transaction: Transaction,
context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
collector: Collector[Alert]): Unit
}Configuration constants used in fraud detection logic.
object FraudDetector {
/** Threshold for small transaction amounts */
val SMALL_AMOUNT: Double = 1.00
/** Threshold for large transaction amounts */
val LARGE_AMOUNT: Double = 500.00
/** Time constant representing one minute in milliseconds */
val ONE_MINUTE: Long = 60 * 1000L
}When instantiated, the archetype creates the following project structure:
my-fraud-detection/
├── pom.xml # Maven build configuration with Flink dependencies
└── src/
└── main/
├── scala/
│ ├── FraudDetectionJob.scala # Main application entry point
│ └── FraudDetector.scala # Fraud detection logic
└── resources/
└── log4j2.properties # Preconfigured logging for Flink applicationsThe generated project includes the following key dependencies:
The archetype integrates with common Flink walkthrough entities:
// From flink-walkthrough-common dependency
import org.apache.flink.walkthrough.common.entity.Transaction
import org.apache.flink.walkthrough.common.entity.Alert
import org.apache.flink.walkthrough.common.source.TransactionSource
import org.apache.flink.walkthrough.common.sink.AlertSink
// From Flink framework
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.util.CollectorThe generated project includes:
${package}.FraudDetectionJob as entry pointThe archetype includes a preconfigured log4j2.properties file with optimized settings for Flink applications:
rootLogger.level = WARN
rootLogger.appenderRef.console.ref = ConsoleAppender
logger.sink.name = org.apache.flink.walkthrough.common.sink.AlertSink
logger.sink.level = INFO
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%nThis configuration enables INFO-level logging for the AlertSink to display fraud alerts in the console while keeping other components at WARN level to reduce noise.
Developers can customize the FraudDetector.processElement method to implement specific fraud detection rules:
class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
def processElement(
transaction: Transaction,
context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
collector: Collector[Alert]): Unit = {
// Custom fraud detection logic
if (transaction.getAmount > FraudDetector.LARGE_AMOUNT) {
val alert = new Alert
alert.setId(transaction.getAccountId)
alert.setMessage(s"Large transaction detected: ${transaction.getAmount}")
collector.collect(alert)
}
}
}# Build the project
mvn clean package
# Run locally
mvn exec:java -Dexec.mainClass="com.example.fraud.FraudDetectionJob"
# Or run the fat JAR
java -jar target/my-fraud-detection-1.0-SNAPSHOT.jarThe archetype uses the following Maven template variables that are replaced during project generation:
These variables are automatically substituted when running mvn archetype:generate with the corresponding -D parameters.