Apache Flink walkthrough common library containing shared entities and utilities for Flink streaming examples
npx @tessl/cli install tessl/maven-org-apache-flink--flink-walkthrough-common-2-12@1.14.0Apache Flink walkthrough common library containing shared entities and utilities for Flink streaming examples. This package provides Transaction and Alert data classes, input/output formatters, and source/sink implementations designed to support educational walkthroughs and example applications demonstrating Flink's stream processing capabilities.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-walkthrough-common_2.12</artifactId>
<version>1.14.6</version>
</dependency>import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.source.TransactionSource;
import org.apache.flink.walkthrough.common.source.TransactionRowInputFormat;
import org.apache.flink.walkthrough.common.sink.AlertSink;
import org.apache.flink.walkthrough.common.sink.LoggerOutputFormat;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.source.TransactionSource;
import org.apache.flink.walkthrough.common.sink.AlertSink;
// Create streaming environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create transaction stream with built-in sample data
DataStream<Transaction> transactions = env.addSource(new TransactionSource())
.name("transactions");
// Process transactions to generate alerts
DataStream<Alert> alerts = transactions
.filter(transaction -> transaction.getAmount() > 1000.0)
.map(transaction -> {
Alert alert = new Alert();
alert.setId(transaction.getAccountId());
return alert;
});
// Output alerts using provided sink
alerts.addSink(new AlertSink()).name("alerts");
env.execute("Fraud Detection");Core data classes representing transaction and alert events for streaming applications.
Represents a financial transaction with account ID, timestamp, and amount.
public final class Transaction {
public Transaction();
public Transaction(long accountId, long timestamp, double amount);
public long getAccountId();
public void setAccountId(long accountId);
public long getTimestamp();
public void setTimestamp(long timestamp);
public double getAmount();
public void setAmount(double amount);
@Override
public boolean equals(Object o);
@Override
public int hashCode();
@Override
public String toString();
}Represents an alert event with an identifier.
public final class Alert {
public Alert();
public long getId();
public void setId(long id);
@Override
public boolean equals(Object o);
@Override
public int hashCode();
@Override
public String toString();
}Source functions for generating transaction streams in Flink applications.
Streaming source that generates transactions with rate limiting for realistic simulation.
public class TransactionSource extends FromIteratorFunction<Transaction> {
public TransactionSource();
}Usage:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Transaction> transactions = env.addSource(new TransactionSource());The TransactionSource generates an endless stream of sample transaction data with a built-in rate limit (100ms delay between transactions) and predefined transaction amounts across 5 different account IDs.
Batch input format for reading transactions as Flink Row objects.
public class TransactionRowInputFormat extends GenericInputFormat<Row>
implements NonParallelInput {
@Override
public void open(GenericInputSplit split);
@Override
public boolean reachedEnd();
@Override
public Row nextRecord(Row reuse);
}Usage:
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.types.Row;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Row> transactionRows = env.createInput(new TransactionRowInputFormat());Sink functions for outputting processed data from Flink streams.
Streaming sink that logs alert events to standard output.
public class AlertSink implements SinkFunction<Alert> {
@Override
public void invoke(Alert value, Context context);
}Usage:
import org.apache.flink.streaming.api.datastream.DataStream;
// Assuming you have a DataStream<Alert>
DataStream<Alert> alerts = // ... your alert stream
alerts.addSink(new AlertSink());Batch output format that logs string records at INFO level.
public class LoggerOutputFormat implements OutputFormat<String> {
@Override
public void configure(Configuration parameters);
@Override
public void open(int taskNumber, int numTasks);
@Override
public void writeRecord(String record);
@Override
public void close();
}Usage:
import org.apache.flink.api.java.DataSet;
// Assuming you have a DataSet<String>
DataSet<String> dataSet = // ... your data
dataSet.output(new LoggerOutputFormat());The library follows Flink's standard patterns and integrates with core Flink interfaces:
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
DataStream<Transaction> transactions = env.addSource(new TransactionSource());
DataStream<Alert> alerts = transactions
.keyBy(Transaction::getAccountId)
.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
.process(new FraudDetectionFunction()) // Custom function
.map(transaction -> {
Alert alert = new Alert();
alert.setId(transaction.getAccountId());
return alert;
});
alerts.addSink(new AlertSink());import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.types.Row;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Row> transactions = env.createInput(new TransactionRowInputFormat());
DataSet<String> processedData = transactions
.map(new ProcessTransactionFunction()) // Custom function
.reduce(new SummarizeFunction()); // Custom function
processedData.output(new LoggerOutputFormat());The library follows standard Java exception patterns: