or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.md

index.mddocs/

0

# Flink Walkthrough DataStream Scala

1

2

Apache Flink DataStream Scala walkthrough is a Maven archetype that provides a complete template for building fraud detection applications using Apache Flink's DataStream API with Scala. This archetype generates skeleton code that developers can customize to implement real-time stream processing applications for fraud detection and similar use cases.

3

4

## Package Information

5

6

- **Package Name**: flink-walkthrough-datastream-scala

7

- **Package Type**: maven archetype

8

- **Group ID**: org.apache.flink

9

- **Language**: Scala

10

- **Installation**: `mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-walkthrough-datastream-scala -DarchetypeVersion=1.16.3`

11

12

## Core Imports

13

14

The generated fraud detection application uses the following core imports:

15

16

```scala

17

import org.apache.flink.streaming.api.scala._

18

import org.apache.flink.streaming.api.functions.KeyedProcessFunction

19

import org.apache.flink.util.Collector

20

import org.apache.flink.walkthrough.common.sink.AlertSink

21

import org.apache.flink.walkthrough.common.entity.{Alert, Transaction}

22

import org.apache.flink.walkthrough.common.source.TransactionSource

23

```

24

25

## Core Usage

26

27

Generate a new fraud detection project from the archetype:

28

29

```bash

30

mvn archetype:generate \

31

-DarchetypeGroupId=org.apache.flink \

32

-DarchetypeArtifactId=flink-walkthrough-datastream-scala \

33

-DarchetypeVersion=1.16.3 \

34

-DgroupId=com.example \

35

-DartifactId=my-fraud-detection \

36

-Dversion=1.0-SNAPSHOT \

37

-Dpackage=com.example.fraud

38

```

39

40

## Basic Usage

41

42

After generating the project, the archetype provides a complete skeleton for a fraud detection application:

43

44

```scala

45

// Generated FraudDetectionJob.scala

46

import org.apache.flink.streaming.api.scala._

47

import org.apache.flink.walkthrough.common.sink.AlertSink

48

import org.apache.flink.walkthrough.common.entity.{Alert, Transaction}

49

import org.apache.flink.walkthrough.common.source.TransactionSource

50

51

object FraudDetectionJob {

52

def main(args: Array[String]): Unit = {

53

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

54

55

val transactions: DataStream[Transaction] = env

56

.addSource(new TransactionSource)

57

.name("transactions")

58

59

val alerts: DataStream[Alert] = transactions

60

.keyBy(transaction => transaction.getAccountId)

61

.process(new FraudDetector)

62

.name("fraud-detector")

63

64

alerts

65

.addSink(new AlertSink)

66

.name("send-alerts")

67

68

env.execute("Fraud Detection")

69

}

70

}

71

```

72

73

## Architecture

74

75

The archetype generates a complete fraud detection application structure:

76

77

- **Main Application**: `FraudDetectionJob` object serving as the entry point

78

- **Stream Processing Logic**: `FraudDetector` class implementing the fraud detection algorithm

79

- **Data Sources**: Integration with `TransactionSource` for transaction data streams

80

- **Data Sinks**: Integration with `AlertSink` for outputting fraud alerts

81

- **Maven Configuration**: Complete POM with Flink dependencies and build configuration

82

83

## Capabilities

84

85

### Main Application Entry Point

86

87

The primary application class that sets up the Flink streaming environment and defines the data processing pipeline.

88

89

```scala { .api }

90

object FraudDetectionJob {

91

/**

92

* Main entry point for the fraud detection streaming application

93

* @param args Command line arguments passed to the application

94

* @throws Exception if the streaming job fails to execute

95

*/

96

@throws[Exception]

97

def main(args: Array[String]): Unit

98

}

99

```

100

101

### Fraud Detection Processing Function

102

103

A stateful stream processing function that analyzes transaction patterns to detect fraudulent activity.

104

105

```scala { .api }

106

/**

107

* Stateful fraud detection processor extending KeyedProcessFunction

108

* Processes transactions keyed by account ID to detect suspicious patterns

109

*/

110

@SerialVersionUID(1L)

111

class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {

112

113

/**

114

* Processes each transaction event and emits alerts for suspicious activity

115

* @param transaction The input transaction to analyze

116

* @param context Processing context providing access to timers and state

117

* @param collector Output collector for emitting fraud alerts

118

* @throws Exception if processing fails

119

*/

120

@throws[Exception]

121

def processElement(

122

transaction: Transaction,

123

context: KeyedProcessFunction[Long, Transaction, Alert]#Context,

124

collector: Collector[Alert]): Unit

125

}

126

```

127

128

### Fraud Detection Constants

129

130

Configuration constants used in fraud detection logic.

131

132

```scala { .api }

133

object FraudDetector {

134

/** Threshold for small transaction amounts */

135

val SMALL_AMOUNT: Double = 1.00

136

137

/** Threshold for large transaction amounts */

138

val LARGE_AMOUNT: Double = 500.00

139

140

/** Time constant representing one minute in milliseconds */

141

val ONE_MINUTE: Long = 60 * 1000L

142

}

143

```

144

145

## Generated Project Structure

146

147

When instantiated, the archetype creates the following project structure:

148

149

```

150

my-fraud-detection/

151

├── pom.xml # Maven build configuration with Flink dependencies

152

└── src/

153

└── main/

154

├── scala/

155

│ ├── FraudDetectionJob.scala # Main application entry point

156

│ └── FraudDetector.scala # Fraud detection logic

157

└── resources/

158

└── log4j2.properties # Preconfigured logging for Flink applications

159

```

160

161

## Dependencies and Integration

162

163

The generated project includes the following key dependencies:

164

165

### Flink Framework Dependencies

166

- **flink-streaming-scala**: Core Flink streaming API for Scala (provided scope)

167

- **flink-clients**: Flink client libraries (provided scope)

168

- **flink-walkthrough-common**: Common entities and utilities for walkthrough examples

169

170

### External Entity Types

171

The archetype integrates with common Flink walkthrough entities:

172

173

```scala { .api }

174

// From flink-walkthrough-common dependency

175

import org.apache.flink.walkthrough.common.entity.Transaction

176

import org.apache.flink.walkthrough.common.entity.Alert

177

import org.apache.flink.walkthrough.common.source.TransactionSource

178

import org.apache.flink.walkthrough.common.sink.AlertSink

179

180

// From Flink framework

181

import org.apache.flink.streaming.api.functions.KeyedProcessFunction

182

import org.apache.flink.util.Collector

183

```

184

185

### Build Configuration

186

187

The generated project includes:

188

- **Maven Shade Plugin**: Creates fat JAR with dependencies

189

- **Scala Maven Plugin**: Compiles Scala source code

190

- **Java 8 Target**: Compatible with Flink runtime requirements

191

- **Main Class Configuration**: `${package}.FraudDetectionJob` as entry point

192

193

### Logging Configuration

194

195

The archetype includes a preconfigured `log4j2.properties` file with optimized settings for Flink applications:

196

197

```properties

198

rootLogger.level = WARN

199

rootLogger.appenderRef.console.ref = ConsoleAppender

200

201

logger.sink.name = org.apache.flink.walkthrough.common.sink.AlertSink

202

logger.sink.level = INFO

203

204

appender.console.name = ConsoleAppender

205

appender.console.type = CONSOLE

206

appender.console.layout.type = PatternLayout

207

appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n

208

```

209

210

This configuration enables INFO-level logging for the AlertSink to display fraud alerts in the console while keeping other components at WARN level to reduce noise.

211

212

## Usage Examples

213

214

### Customizing Fraud Detection Logic

215

216

Developers can customize the `FraudDetector.processElement` method to implement specific fraud detection rules:

217

218

```scala

219

class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {

220

221

def processElement(

222

transaction: Transaction,

223

context: KeyedProcessFunction[Long, Transaction, Alert]#Context,

224

collector: Collector[Alert]): Unit = {

225

226

// Custom fraud detection logic

227

if (transaction.getAmount > FraudDetector.LARGE_AMOUNT) {

228

val alert = new Alert

229

alert.setId(transaction.getAccountId)

230

alert.setMessage(s"Large transaction detected: ${transaction.getAmount}")

231

collector.collect(alert)

232

}

233

}

234

}

235

```

236

237

### Running the Generated Application

238

239

```bash

240

# Build the project

241

mvn clean package

242

243

# Run locally

244

mvn exec:java -Dexec.mainClass="com.example.fraud.FraudDetectionJob"

245

246

# Or run the fat JAR

247

java -jar target/my-fraud-detection-1.0-SNAPSHOT.jar

248

```

249

250

## Template Variables

251

252

The archetype uses the following Maven template variables that are replaced during project generation:

253

254

- **${package}**: Package name for generated classes

255

- **${groupId}**: Maven group ID for the project

256

- **${artifactId}**: Maven artifact ID for the project

257

- **${version}**: Version for the generated project

258

259

These variables are automatically substituted when running `mvn archetype:generate` with the corresponding `-D` parameters.