Additional examples demonstrating various Flink patterns including Pi estimation using Monte Carlo method, collection-based execution, and POJO usage patterns.
Monte Carlo method implementation for estimating the value of Pi using random sampling and parallel computation.
/**
* Pi estimation using Monte Carlo method.
* Usage: PiEstimation --samples <n>
*/
public class PiEstimation {
public static void main(String[] args) throws Exception;
/**
* Monte Carlo sampler that generates random points and counts those inside unit circle
*/
public static class Sampler implements MapFunction<Long, Long> {
/**
* Generates random samples and counts hits inside unit circle
* @param numSamples Number of samples to generate
* @return Number of samples that fall inside unit circle
*/
@Override
public Long map(Long numSamples) throws Exception;
}
/**
* Reducer for summing sample counts
*/
public static final class SumReducer implements ReduceFunction<Long> {
/**
* Sums two long values
* @param value1 First value
* @param value2 Second value
* @return Sum of the two values
*/
@Override
public Long reduce(Long value1, Long value2) throws Exception;
}
}Usage Examples:
// Run Pi estimation with default sample size
String[] emptyArgs = {};
PiEstimation.main(emptyArgs);
// Run with custom sample size
String[] args = {"--samples", "1000000"};
PiEstimation.main(args);
// Use Pi estimation components in custom computation
DataSet<Long> sampleCounts = env.fromElements(10000L, 10000L, 10000L, 10000L);
DataSet<Long> hits = sampleCounts.map(new PiEstimation.Sampler());
DataSet<Long> totalHits = hits.reduce(new PiEstimation.SumReducer());
// Calculate Pi estimate
DataSet<Double> piEstimate = totalHits.map(new MapFunction<Long, Double>() {
@Override
public Double map(Long hits) {
long totalSamples = 40000L; // 4 * 10000
return 4.0 * hits / totalSamples;
}
});Demonstrates local collection-based execution patterns and POJO usage in Flink programs.
/**
* Collection-based execution example demonstrating local processing with POJOs.
* Usage: CollectionExecutionExample
*/
public class CollectionExecutionExample {
public static void main(String[] args) throws Exception;
/**
* User POJO with identifier and name fields
*/
public static class User {
public int userIdentifier;
public String name;
public User();
public User(int userIdentifier, String name);
@Override
public String toString();
}
/**
* Email POJO with user ID, subject, and body fields
*/
public static class EMail {
public int userId;
public String subject;
public String body;
public EMail();
public EMail(int userId, String subject, String body);
@Override
public String toString();
}
}Usage Examples:
// Run collection execution example
String[] emptyArgs = {};
CollectionExecutionExample.main(emptyArgs);
// Use POJOs in custom collection-based processing
import org.apache.flink.examples.java.misc.CollectionExecutionExample.User;
import org.apache.flink.examples.java.misc.CollectionExecutionExample.EMail;
// Create sample data
List<User> users = Arrays.asList(
new User(1, "Alice"),
new User(2, "Bob"),
new User(3, "Charlie")
);
List<EMail> emails = Arrays.asList(
new EMail(1, "Welcome", "Welcome to our service"),
new EMail(2, "Newsletter", "Monthly newsletter"),
new EMail(1, "Reminder", "Don't forget to...")
);
// Process with Flink
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<User> userDataSet = env.fromCollection(users);
DataSet<EMail> emailDataSet = env.fromCollection(emails);
// Join users with their emails
DataSet<Tuple2<User, EMail>> userEmails = userDataSet
.join(emailDataSet)
.where("userIdentifier")
.equalTo("userId");The Pi estimation algorithm uses the mathematical property that the ratio of points falling inside a unit circle to total points approximates π/4:
// Monte Carlo sampling logic
public Long map(Long numSamples) throws Exception {
long count = 0;
Random random = new Random();
for (long i = 0; i < numSamples; i++) {
double x = random.nextDouble();
double y = random.nextDouble();
// Check if point is inside unit circle
if (x * x + y * y <= 1) {
count++;
}
}
return count;
}
// Pi calculation from sample results
double pi = 4.0 * totalHitsInsideCircle / totalSamples;Demonstrates local execution with in-memory collections:
// Create execution environment for local processing
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Create DataSets from Java collections
DataSet<User> users = env.fromCollection(userList);
DataSet<EMail> emails = env.fromCollection(emailList);
// Process data locally
DataSet<String> userNames = users.map(user -> user.name);
List<String> results = userNames.collect(); // Execute locally and collect resultsThe examples demonstrate proper POJO (Plain Old Java Object) usage in Flink:
POJO Requirements:
// Correct POJO structure
public static class User {
public int userIdentifier; // Public field
public String name; // Public field
public User() {} // No-argument constructor
public User(int userIdentifier, String name) { // Optional constructor
this.userIdentifier = userIdentifier;
this.name = name;
}
@Override
public String toString() { // Optional but recommended
return "User{id=" + userIdentifier + ", name='" + name + "'}";
}
}Pi estimation demonstrates thread-safe random number generation in parallel execution:
public static class Sampler implements MapFunction<Long, Long> {
@Override
public Long map(Long numSamples) throws Exception {
// Create local Random instance for thread safety
Random random = new Random();
long count = 0;
for (long i = 0; i < numSamples; i++) {
// Generate random coordinates
double x = random.nextDouble(); // [0.0, 1.0)
double y = random.nextDouble(); // [0.0, 1.0)
// Mathematical test for unit circle
if (x * x + y * y <= 1) {
count++;
}
}
return count;
}
}Collection execution example shows how to run Flink programs locally with in-memory data:
// Set up local execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Create data from Java collections
List<User> userData = createUserList();
DataSet<User> users = env.fromCollection(userData);
// Process data
DataSet<String> result = users.map(user -> "Hello " + user.name);
// Execute and collect results locally
List<String> output = result.collect();
for (String greeting : output) {
System.out.println(greeting);
}Pi estimation demonstrates parallel sample generation across multiple workers:
// Create sample tasks for parallel execution
int numParallelSamples = 4;
long samplesPerTask = totalSamples / numParallelSamples;
DataSet<Long> sampleTasks = env.fromElements(
samplesPerTask, samplesPerTask, samplesPerTask, samplesPerTask
);
// Execute sampling in parallel
DataSet<Long> hitCounts = sampleTasks.map(new PiEstimation.Sampler());
// Reduce results
DataSet<Long> totalHits = hitCounts.reduce(new PiEstimation.SumReducer());Both examples demonstrate different parameter handling approaches:
// Pi estimation parameter handling
ParameterTool params = ParameterTool.fromArgs(args);
long numSamples = params.getLong("samples", 1000000L); // Default 1M samples
// Collection example (no parameters needed)
// Demonstrates self-contained execution with embedded data// Field-based access (public fields)
User user = new User();
user.userIdentifier = 123;
user.name = "Alice";
EMail email = new EMail();
email.userId = user.userIdentifier;
email.subject = "Welcome";
email.body = "Hello " + user.name;// Join POJOs using field names
DataSet<User> users = env.fromCollection(userList);
DataSet<EMail> emails = env.fromCollection(emailList);
DataSet<Tuple2<User, EMail>> joined = users
.join(emails)
.where("userIdentifier") // Field name from User POJO
.equalTo("userId"); // Field name from EMail POJO// Calculate statistics from samples
DataSet<Long> samples = generateSamples();
DataSet<Long> totalCount = samples.reduce(new PiEstimation.SumReducer());
// Convert to statistical result
DataSet<Double> statistics = totalCount.map(new MapFunction<Long, Double>() {
@Override
public Double map(Long count) {
return calculateStatistic(count);
}
});// Pi estimation types
Long sampleCount = 1000000L;
Long hitCount = 785398L;
Double piEstimate = 3.141592;
// Collection execution POJOs
CollectionExecutionExample.User user = new CollectionExecutionExample.User(1, "Alice");
CollectionExecutionExample.EMail email = new CollectionExecutionExample.EMail(1, "Subject", "Body");
// Standard Java types
Random random = new Random();
List<User> userList = new ArrayList<>();
Tuple2<User, EMail> userEmail = new Tuple2<>(user, email);