or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

clustering.mddistributed-copy.mdgraph-processing.mdindex.mdmisc-examples.mdrelational-processing.mdword-count.md
tile.json

misc-examples.mddocs/

Miscellaneous Examples

Additional examples demonstrating various Flink patterns including Pi estimation using Monte Carlo method, collection-based execution, and POJO usage patterns.

Capabilities

Pi Estimation

Monte Carlo method implementation for estimating the value of Pi using random sampling and parallel computation.

/**
 * Pi estimation using Monte Carlo method.
 * Usage: PiEstimation --samples <n>
 */
public class PiEstimation {
    public static void main(String[] args) throws Exception;
    
    /**
     * Monte Carlo sampler that generates random points and counts those inside unit circle
     */
    public static class Sampler implements MapFunction<Long, Long> {
        /**
         * Generates random samples and counts hits inside unit circle
         * @param numSamples Number of samples to generate
         * @return Number of samples that fall inside unit circle
         */
        @Override
        public Long map(Long numSamples) throws Exception;
    }
    
    /**
     * Reducer for summing sample counts
     */
    public static final class SumReducer implements ReduceFunction<Long> {
        /**
         * Sums two long values
         * @param value1 First value
         * @param value2 Second value
         * @return Sum of the two values
         */
        @Override
        public Long reduce(Long value1, Long value2) throws Exception;
    }
}

Usage Examples:

// Run Pi estimation with default sample size
String[] emptyArgs = {};
PiEstimation.main(emptyArgs);

// Run with custom sample size
String[] args = {"--samples", "1000000"};
PiEstimation.main(args);

// Use Pi estimation components in custom computation
DataSet<Long> sampleCounts = env.fromElements(10000L, 10000L, 10000L, 10000L);
DataSet<Long> hits = sampleCounts.map(new PiEstimation.Sampler());
DataSet<Long> totalHits = hits.reduce(new PiEstimation.SumReducer());

// Calculate Pi estimate
DataSet<Double> piEstimate = totalHits.map(new MapFunction<Long, Double>() {
    @Override
    public Double map(Long hits) {
        long totalSamples = 40000L; // 4 * 10000
        return 4.0 * hits / totalSamples;
    }
});

Collection Execution Example

Demonstrates local collection-based execution patterns and POJO usage in Flink programs.

/**
 * Collection-based execution example demonstrating local processing with POJOs.
 * Usage: CollectionExecutionExample
 */
public class CollectionExecutionExample {
    public static void main(String[] args) throws Exception;
    
    /**
     * User POJO with identifier and name fields
     */
    public static class User {
        public int userIdentifier;
        public String name;
        
        public User();
        public User(int userIdentifier, String name);
        
        @Override
        public String toString();
    }
    
    /**
     * Email POJO with user ID, subject, and body fields
     */
    public static class EMail {
        public int userId;
        public String subject;
        public String body;
        
        public EMail();
        public EMail(int userId, String subject, String body);
        
        @Override
        public String toString();
    }
}

Usage Examples:

// Run collection execution example
String[] emptyArgs = {};
CollectionExecutionExample.main(emptyArgs);

// Use POJOs in custom collection-based processing
import org.apache.flink.examples.java.misc.CollectionExecutionExample.User;
import org.apache.flink.examples.java.misc.CollectionExecutionExample.EMail;

// Create sample data
List<User> users = Arrays.asList(
    new User(1, "Alice"),
    new User(2, "Bob"),
    new User(3, "Charlie")
);

List<EMail> emails = Arrays.asList(
    new EMail(1, "Welcome", "Welcome to our service"),
    new EMail(2, "Newsletter", "Monthly newsletter"),
    new EMail(1, "Reminder", "Don't forget to...")
);

// Process with Flink
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<User> userDataSet = env.fromCollection(users);
DataSet<EMail> emailDataSet = env.fromCollection(emails);

// Join users with their emails
DataSet<Tuple2<User, EMail>> userEmails = userDataSet
    .join(emailDataSet)
    .where("userIdentifier")
    .equalTo("userId");

Algorithm Implementations

Monte Carlo Pi Estimation

The Pi estimation algorithm uses the mathematical property that the ratio of points falling inside a unit circle to total points approximates π/4:

// Monte Carlo sampling logic
public Long map(Long numSamples) throws Exception {
    long count = 0;
    Random random = new Random();
    
    for (long i = 0; i < numSamples; i++) {
        double x = random.nextDouble();
        double y = random.nextDouble();
        
        // Check if point is inside unit circle
        if (x * x + y * y <= 1) {
            count++;
        }
    }
    
    return count;
}

// Pi calculation from sample results
double pi = 4.0 * totalHitsInsideCircle / totalSamples;

Collection-based Processing Pattern

Demonstrates local execution with in-memory collections:

// Create execution environment for local processing
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// Create DataSets from Java collections
DataSet<User> users = env.fromCollection(userList);
DataSet<EMail> emails = env.fromCollection(emailList);

// Process data locally
DataSet<String> userNames = users.map(user -> user.name);
List<String> results = userNames.collect(); // Execute locally and collect results

Data Patterns

POJO Usage Guidelines

The examples demonstrate proper POJO (Plain Old Java Object) usage in Flink:

POJO Requirements:

  • Public no-argument constructor
  • Public fields or public getter/setter methods
  • Serializable (implicitly through Flink's serialization)
// Correct POJO structure
public static class User {
    public int userIdentifier;  // Public field
    public String name;         // Public field
    
    public User() {}  // No-argument constructor
    
    public User(int userIdentifier, String name) {  // Optional constructor
        this.userIdentifier = userIdentifier;
        this.name = name;
    }
    
    @Override
    public String toString() {  // Optional but recommended
        return "User{id=" + userIdentifier + ", name='" + name + "'}";
    }
}

Random Number Generation

Pi estimation demonstrates thread-safe random number generation in parallel execution:

public static class Sampler implements MapFunction<Long, Long> {
    @Override
    public Long map(Long numSamples) throws Exception {
        // Create local Random instance for thread safety
        Random random = new Random();
        
        long count = 0;
        for (long i = 0; i < numSamples; i++) {
            // Generate random coordinates
            double x = random.nextDouble();  // [0.0, 1.0)
            double y = random.nextDouble();  // [0.0, 1.0)
            
            // Mathematical test for unit circle
            if (x * x + y * y <= 1) {
                count++;
            }
        }
        
        return count;
    }
}

Execution Patterns

Local Collection Execution

Collection execution example shows how to run Flink programs locally with in-memory data:

// Set up local execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// Create data from Java collections
List<User> userData = createUserList();
DataSet<User> users = env.fromCollection(userData);

// Process data
DataSet<String> result = users.map(user -> "Hello " + user.name);

// Execute and collect results locally
List<String> output = result.collect();
for (String greeting : output) {
    System.out.println(greeting);
}

Parallel Sample Generation

Pi estimation demonstrates parallel sample generation across multiple workers:

// Create sample tasks for parallel execution
int numParallelSamples = 4;
long samplesPerTask = totalSamples / numParallelSamples;

DataSet<Long> sampleTasks = env.fromElements(
    samplesPerTask, samplesPerTask, samplesPerTask, samplesPerTask
);

// Execute sampling in parallel
DataSet<Long> hitCounts = sampleTasks.map(new PiEstimation.Sampler());

// Reduce results
DataSet<Long> totalHits = hitCounts.reduce(new PiEstimation.SumReducer());

Parameter Handling

Both examples demonstrate different parameter handling approaches:

// Pi estimation parameter handling
ParameterTool params = ParameterTool.fromArgs(args);
long numSamples = params.getLong("samples", 1000000L);  // Default 1M samples

// Collection example (no parameters needed)
// Demonstrates self-contained execution with embedded data

Common Usage Patterns

POJO Field Access

// Field-based access (public fields)
User user = new User();
user.userIdentifier = 123;
user.name = "Alice";

EMail email = new EMail();
email.userId = user.userIdentifier;
email.subject = "Welcome";
email.body = "Hello " + user.name;

Join Operations with POJOs

// Join POJOs using field names
DataSet<User> users = env.fromCollection(userList);
DataSet<EMail> emails = env.fromCollection(emailList);

DataSet<Tuple2<User, EMail>> joined = users
    .join(emails)
    .where("userIdentifier")  // Field name from User POJO
    .equalTo("userId");       // Field name from EMail POJO

Statistical Computation

// Calculate statistics from samples
DataSet<Long> samples = generateSamples();
DataSet<Long> totalCount = samples.reduce(new PiEstimation.SumReducer());

// Convert to statistical result
DataSet<Double> statistics = totalCount.map(new MapFunction<Long, Double>() {
    @Override
    public Double map(Long count) {
        return calculateStatistic(count);
    }
});

Types

Miscellaneous Data Types

// Pi estimation types
Long sampleCount = 1000000L;
Long hitCount = 785398L;
Double piEstimate = 3.141592;

// Collection execution POJOs
CollectionExecutionExample.User user = new CollectionExecutionExample.User(1, "Alice");
CollectionExecutionExample.EMail email = new CollectionExecutionExample.EMail(1, "Subject", "Body");

// Standard Java types
Random random = new Random();
List<User> userList = new ArrayList<>();
Tuple2<User, EMail> userEmail = new Tuple2<>(user, email);