Apache Flink batch processing examples demonstrating various algorithms and use cases including WordCount, PageRank, KMeans clustering, Connected Components, and graph processing
Additional examples demonstrating various Flink patterns including Pi estimation using Monte Carlo method, collection-based execution, and POJO usage patterns.
Monte Carlo method implementation for estimating the value of Pi using random sampling and parallel computation.
/**
* Pi estimation using Monte Carlo method.
* Usage: PiEstimation --samples <n>
*/
public class PiEstimation {
public static void main(String[] args) throws Exception;
/**
* Monte Carlo sampler that generates random points and counts those inside unit circle
*/
public static class Sampler implements MapFunction<Long, Long> {
/**
* Generates random samples and counts hits inside unit circle
* @param numSamples Number of samples to generate
* @return Number of samples that fall inside unit circle
*/
@Override
public Long map(Long numSamples) throws Exception;
}
/**
* Reducer for summing sample counts
*/
public static final class SumReducer implements ReduceFunction<Long> {
/**
* Sums two long values
* @param value1 First value
* @param value2 Second value
* @return Sum of the two values
*/
@Override
public Long reduce(Long value1, Long value2) throws Exception;
}
}Usage Examples:
// Run Pi estimation with default sample size
String[] emptyArgs = {};
PiEstimation.main(emptyArgs);
// Run with custom sample size
String[] args = {"--samples", "1000000"};
PiEstimation.main(args);
// Use Pi estimation components in custom computation
DataSet<Long> sampleCounts = env.fromElements(10000L, 10000L, 10000L, 10000L);
DataSet<Long> hits = sampleCounts.map(new PiEstimation.Sampler());
DataSet<Long> totalHits = hits.reduce(new PiEstimation.SumReducer());
// Calculate Pi estimate
DataSet<Double> piEstimate = totalHits.map(new MapFunction<Long, Double>() {
@Override
public Double map(Long hits) {
long totalSamples = 40000L; // 4 * 10000
return 4.0 * hits / totalSamples;
}
});Demonstrates local collection-based execution patterns and POJO usage in Flink programs.
/**
* Collection-based execution example demonstrating local processing with POJOs.
* Usage: CollectionExecutionExample
*/
public class CollectionExecutionExample {
public static void main(String[] args) throws Exception;
/**
* User POJO with identifier and name fields
*/
public static class User {
public int userIdentifier;
public String name;
public User();
public User(int userIdentifier, String name);
@Override
public String toString();
}
/**
* Email POJO with user ID, subject, and body fields
*/
public static class EMail {
public int userId;
public String subject;
public String body;
public EMail();
public EMail(int userId, String subject, String body);
@Override
public String toString();
}
}Usage Examples:
// Run collection execution example
String[] emptyArgs = {};
CollectionExecutionExample.main(emptyArgs);
// Use POJOs in custom collection-based processing
import org.apache.flink.examples.java.misc.CollectionExecutionExample.User;
import org.apache.flink.examples.java.misc.CollectionExecutionExample.EMail;
// Create sample data
List<User> users = Arrays.asList(
new User(1, "Alice"),
new User(2, "Bob"),
new User(3, "Charlie")
);
List<EMail> emails = Arrays.asList(
new EMail(1, "Welcome", "Welcome to our service"),
new EMail(2, "Newsletter", "Monthly newsletter"),
new EMail(1, "Reminder", "Don't forget to...")
);
// Process with Flink
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<User> userDataSet = env.fromCollection(users);
DataSet<EMail> emailDataSet = env.fromCollection(emails);
// Join users with their emails
DataSet<Tuple2<User, EMail>> userEmails = userDataSet
.join(emailDataSet)
.where("userIdentifier")
.equalTo("userId");The Pi estimation algorithm uses the mathematical property that the ratio of points falling inside a unit circle to total points approximates π/4:
// Monte Carlo sampling logic
public Long map(Long numSamples) throws Exception {
long count = 0;
Random random = new Random();
for (long i = 0; i < numSamples; i++) {
double x = random.nextDouble();
double y = random.nextDouble();
// Check if point is inside unit circle
if (x * x + y * y <= 1) {
count++;
}
}
return count;
}
// Pi calculation from sample results
double pi = 4.0 * totalHitsInsideCircle / totalSamples;Demonstrates local execution with in-memory collections:
// Create execution environment for local processing
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Create DataSets from Java collections
DataSet<User> users = env.fromCollection(userList);
DataSet<EMail> emails = env.fromCollection(emailList);
// Process data locally
DataSet<String> userNames = users.map(user -> user.name);
List<String> results = userNames.collect(); // Execute locally and collect resultsThe examples demonstrate proper POJO (Plain Old Java Object) usage in Flink:
POJO Requirements:
// Correct POJO structure
public static class User {
public int userIdentifier; // Public field
public String name; // Public field
public User() {} // No-argument constructor
public User(int userIdentifier, String name) { // Optional constructor
this.userIdentifier = userIdentifier;
this.name = name;
}
@Override
public String toString() { // Optional but recommended
return "User{id=" + userIdentifier + ", name='" + name + "'}";
}
}Pi estimation demonstrates thread-safe random number generation in parallel execution:
public static class Sampler implements MapFunction<Long, Long> {
@Override
public Long map(Long numSamples) throws Exception {
// Create local Random instance for thread safety
Random random = new Random();
long count = 0;
for (long i = 0; i < numSamples; i++) {
// Generate random coordinates
double x = random.nextDouble(); // [0.0, 1.0)
double y = random.nextDouble(); // [0.0, 1.0)
// Mathematical test for unit circle
if (x * x + y * y <= 1) {
count++;
}
}
return count;
}
}Collection execution example shows how to run Flink programs locally with in-memory data:
// Set up local execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Create data from Java collections
List<User> userData = createUserList();
DataSet<User> users = env.fromCollection(userData);
// Process data
DataSet<String> result = users.map(user -> "Hello " + user.name);
// Execute and collect results locally
List<String> output = result.collect();
for (String greeting : output) {
System.out.println(greeting);
}Pi estimation demonstrates parallel sample generation across multiple workers:
// Create sample tasks for parallel execution
int numParallelSamples = 4;
long samplesPerTask = totalSamples / numParallelSamples;
DataSet<Long> sampleTasks = env.fromElements(
samplesPerTask, samplesPerTask, samplesPerTask, samplesPerTask
);
// Execute sampling in parallel
DataSet<Long> hitCounts = sampleTasks.map(new PiEstimation.Sampler());
// Reduce results
DataSet<Long> totalHits = hitCounts.reduce(new PiEstimation.SumReducer());Both examples demonstrate different parameter handling approaches:
// Pi estimation parameter handling
ParameterTool params = ParameterTool.fromArgs(args);
long numSamples = params.getLong("samples", 1000000L); // Default 1M samples
// Collection example (no parameters needed)
// Demonstrates self-contained execution with embedded data// Field-based access (public fields)
User user = new User();
user.userIdentifier = 123;
user.name = "Alice";
EMail email = new EMail();
email.userId = user.userIdentifier;
email.subject = "Welcome";
email.body = "Hello " + user.name;// Join POJOs using field names
DataSet<User> users = env.fromCollection(userList);
DataSet<EMail> emails = env.fromCollection(emailList);
DataSet<Tuple2<User, EMail>> joined = users
.join(emails)
.where("userIdentifier") // Field name from User POJO
.equalTo("userId"); // Field name from EMail POJO// Calculate statistics from samples
DataSet<Long> samples = generateSamples();
DataSet<Long> totalCount = samples.reduce(new PiEstimation.SumReducer());
// Convert to statistical result
DataSet<Double> statistics = totalCount.map(new MapFunction<Long, Double>() {
@Override
public Double map(Long count) {
return calculateStatistic(count);
}
});// Pi estimation types
Long sampleCount = 1000000L;
Long hitCount = 785398L;
Double piEstimate = 3.141592;
// Collection execution POJOs
CollectionExecutionExample.User user = new CollectionExecutionExample.User(1, "Alice");
CollectionExecutionExample.EMail email = new CollectionExecutionExample.EMail(1, "Subject", "Body");
// Standard Java types
Random random = new Random();
List<User> userList = new ArrayList<>();
Tuple2<User, EMail> userEmail = new Tuple2<>(user, email);Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-examples-batch-2-11