Apache Flink walkthrough common library containing shared entities and utilities for Flink streaming examples
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
Apache Flink walkthrough common library containing shared entities and utilities for Flink streaming examples. This package provides Transaction and Alert data classes, input/output formatters, and source/sink implementations designed to support educational walkthroughs and example applications demonstrating Flink's stream processing capabilities.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-walkthrough-common_2.12</artifactId>
<version>1.14.6</version>
</dependency>import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.source.TransactionSource;
import org.apache.flink.walkthrough.common.source.TransactionRowInputFormat;
import org.apache.flink.walkthrough.common.sink.AlertSink;
import org.apache.flink.walkthrough.common.sink.LoggerOutputFormat;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.source.TransactionSource;
import org.apache.flink.walkthrough.common.sink.AlertSink;
// Create streaming environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create transaction stream with built-in sample data
DataStream<Transaction> transactions = env.addSource(new TransactionSource())
.name("transactions");
// Process transactions to generate alerts
DataStream<Alert> alerts = transactions
.filter(transaction -> transaction.getAmount() > 1000.0)
.map(transaction -> {
Alert alert = new Alert();
alert.setId(transaction.getAccountId());
return alert;
});
// Output alerts using provided sink
alerts.addSink(new AlertSink()).name("alerts");
env.execute("Fraud Detection");Core data classes representing transaction and alert events for streaming applications.
Represents a financial transaction with account ID, timestamp, and amount.
public final class Transaction {
public Transaction();
public Transaction(long accountId, long timestamp, double amount);
public long getAccountId();
public void setAccountId(long accountId);
public long getTimestamp();
public void setTimestamp(long timestamp);
public double getAmount();
public void setAmount(double amount);
@Override
public boolean equals(Object o);
@Override
public int hashCode();
@Override
public String toString();
}Represents an alert event with an identifier.
public final class Alert {
public Alert();
public long getId();
public void setId(long id);
@Override
public boolean equals(Object o);
@Override
public int hashCode();
@Override
public String toString();
}Source functions for generating transaction streams in Flink applications.
Streaming source that generates transactions with rate limiting for realistic simulation.
public class TransactionSource extends FromIteratorFunction<Transaction> {
public TransactionSource();
}Usage:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Transaction> transactions = env.addSource(new TransactionSource());The TransactionSource generates an endless stream of sample transaction data with a built-in rate limit (100ms delay between transactions) and predefined transaction amounts across 5 different account IDs.
Batch input format for reading transactions as Flink Row objects.
public class TransactionRowInputFormat extends GenericInputFormat<Row>
implements NonParallelInput {
@Override
public void open(GenericInputSplit split);
@Override
public boolean reachedEnd();
@Override
public Row nextRecord(Row reuse);
}Usage:
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.types.Row;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Row> transactionRows = env.createInput(new TransactionRowInputFormat());Sink functions for outputting processed data from Flink streams.
Streaming sink that logs alert events to standard output.
public class AlertSink implements SinkFunction<Alert> {
@Override
public void invoke(Alert value, Context context);
}Usage:
import org.apache.flink.streaming.api.datastream.DataStream;
// Assuming you have a DataStream<Alert>
DataStream<Alert> alerts = // ... your alert stream
alerts.addSink(new AlertSink());Batch output format that logs string records at INFO level.
public class LoggerOutputFormat implements OutputFormat<String> {
@Override
public void configure(Configuration parameters);
@Override
public void open(int taskNumber, int numTasks);
@Override
public void writeRecord(String record);
@Override
public void close();
}Usage:
import org.apache.flink.api.java.DataSet;
// Assuming you have a DataSet<String>
DataSet<String> dataSet = // ... your data
dataSet.output(new LoggerOutputFormat());The library follows Flink's standard patterns and integrates with core Flink interfaces:
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
DataStream<Transaction> transactions = env.addSource(new TransactionSource());
DataStream<Alert> alerts = transactions
.keyBy(Transaction::getAccountId)
.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
.process(new FraudDetectionFunction()) // Custom function
.map(transaction -> {
Alert alert = new Alert();
alert.setId(transaction.getAccountId());
return alert;
});
alerts.addSink(new AlertSink());import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.types.Row;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Row> transactions = env.createInput(new TransactionRowInputFormat());
DataSet<String> processedData = transactions
.map(new ProcessTransactionFunction()) // Custom function
.reduce(new SummarizeFunction()); // Custom function
processedData.output(new LoggerOutputFormat());The library follows standard Java exception patterns: