0
# Miscellaneous Examples
1
2
Additional examples demonstrating various Flink patterns including Pi estimation using Monte Carlo method, collection-based execution, and POJO usage patterns.
3
4
## Capabilities
5
6
### Pi Estimation
7
8
Monte Carlo method implementation for estimating the value of Pi using random sampling and parallel computation.
9
10
```java { .api }
11
/**
12
* Pi estimation using Monte Carlo method.
13
* Usage: PiEstimation --samples <n>
14
*/
15
public class PiEstimation {
16
public static void main(String[] args) throws Exception;
17
18
/**
19
* Monte Carlo sampler that generates random points and counts those inside unit circle
20
*/
21
public static class Sampler implements MapFunction<Long, Long> {
22
/**
23
* Generates random samples and counts hits inside unit circle
24
* @param numSamples Number of samples to generate
25
* @return Number of samples that fall inside unit circle
26
*/
27
@Override
28
public Long map(Long numSamples) throws Exception;
29
}
30
31
/**
32
* Reducer for summing sample counts
33
*/
34
public static final class SumReducer implements ReduceFunction<Long> {
35
/**
36
* Sums two long values
37
* @param value1 First value
38
* @param value2 Second value
39
* @return Sum of the two values
40
*/
41
@Override
42
public Long reduce(Long value1, Long value2) throws Exception;
43
}
44
}
45
```
46
47
**Usage Examples:**
48
49
```java
50
// Run Pi estimation with default sample size
51
String[] emptyArgs = {};
52
PiEstimation.main(emptyArgs);
53
54
// Run with custom sample size
55
String[] args = {"--samples", "1000000"};
56
PiEstimation.main(args);
57
58
// Use Pi estimation components in custom computation
59
DataSet<Long> sampleCounts = env.fromElements(10000L, 10000L, 10000L, 10000L);
60
DataSet<Long> hits = sampleCounts.map(new PiEstimation.Sampler());
61
DataSet<Long> totalHits = hits.reduce(new PiEstimation.SumReducer());
62
63
// Calculate Pi estimate
64
DataSet<Double> piEstimate = totalHits.map(new MapFunction<Long, Double>() {
65
@Override
66
public Double map(Long hits) {
67
long totalSamples = 40000L; // 4 * 10000
68
return 4.0 * hits / totalSamples;
69
}
70
});
71
```
72
73
### Collection Execution Example
74
75
Demonstrates local collection-based execution patterns and POJO usage in Flink programs.
76
77
```java { .api }
78
/**
79
* Collection-based execution example demonstrating local processing with POJOs.
80
* Usage: CollectionExecutionExample
81
*/
82
public class CollectionExecutionExample {
83
public static void main(String[] args) throws Exception;
84
85
/**
86
* User POJO with identifier and name fields
87
*/
88
public static class User {
89
public int userIdentifier;
90
public String name;
91
92
public User();
93
public User(int userIdentifier, String name);
94
95
@Override
96
public String toString();
97
}
98
99
/**
100
* Email POJO with user ID, subject, and body fields
101
*/
102
public static class EMail {
103
public int userId;
104
public String subject;
105
public String body;
106
107
public EMail();
108
public EMail(int userId, String subject, String body);
109
110
@Override
111
public String toString();
112
}
113
}
114
```
115
116
**Usage Examples:**
117
118
```java
119
// Run collection execution example
120
String[] emptyArgs = {};
121
CollectionExecutionExample.main(emptyArgs);
122
123
// Use POJOs in custom collection-based processing
124
import org.apache.flink.examples.java.misc.CollectionExecutionExample.User;
125
import org.apache.flink.examples.java.misc.CollectionExecutionExample.EMail;
126
127
// Create sample data
128
List<User> users = Arrays.asList(
129
new User(1, "Alice"),
130
new User(2, "Bob"),
131
new User(3, "Charlie")
132
);
133
134
List<EMail> emails = Arrays.asList(
135
new EMail(1, "Welcome", "Welcome to our service"),
136
new EMail(2, "Newsletter", "Monthly newsletter"),
137
new EMail(1, "Reminder", "Don't forget to...")
138
);
139
140
// Process with Flink
141
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
142
DataSet<User> userDataSet = env.fromCollection(users);
143
DataSet<EMail> emailDataSet = env.fromCollection(emails);
144
145
// Join users with their emails
146
DataSet<Tuple2<User, EMail>> userEmails = userDataSet
147
.join(emailDataSet)
148
.where("userIdentifier")
149
.equalTo("userId");
150
```
151
152
## Algorithm Implementations
153
154
### Monte Carlo Pi Estimation
155
156
The Pi estimation algorithm uses the mathematical property that the ratio of points falling inside a unit circle to total points approximates π/4:
157
158
```java
159
// Monte Carlo sampling logic
160
public Long map(Long numSamples) throws Exception {
161
long count = 0;
162
Random random = new Random();
163
164
for (long i = 0; i < numSamples; i++) {
165
double x = random.nextDouble();
166
double y = random.nextDouble();
167
168
// Check if point is inside unit circle
169
if (x * x + y * y <= 1) {
170
count++;
171
}
172
}
173
174
return count;
175
}
176
177
// Pi calculation from sample results
178
double pi = 4.0 * totalHitsInsideCircle / totalSamples;
179
```
180
181
### Collection-based Processing Pattern
182
183
Demonstrates local execution with in-memory collections:
184
185
```java
186
// Create execution environment for local processing
187
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
188
189
// Create DataSets from Java collections
190
DataSet<User> users = env.fromCollection(userList);
191
DataSet<EMail> emails = env.fromCollection(emailList);
192
193
// Process data locally
194
DataSet<String> userNames = users.map(user -> user.name);
195
List<String> results = userNames.collect(); // Execute locally and collect results
196
```
197
198
## Data Patterns
199
200
### POJO Usage Guidelines
201
202
The examples demonstrate proper POJO (Plain Old Java Object) usage in Flink:
203
204
**POJO Requirements:**
205
- Public no-argument constructor
206
- Public fields or public getter/setter methods
207
- Serializable (implicitly through Flink's serialization)
208
209
```java
210
// Correct POJO structure
211
public static class User {
212
public int userIdentifier; // Public field
213
public String name; // Public field
214
215
public User() {} // No-argument constructor
216
217
public User(int userIdentifier, String name) { // Optional constructor
218
this.userIdentifier = userIdentifier;
219
this.name = name;
220
}
221
222
@Override
223
public String toString() { // Optional but recommended
224
return "User{id=" + userIdentifier + ", name='" + name + "'}";
225
}
226
}
227
```
228
229
### Random Number Generation
230
231
Pi estimation demonstrates thread-safe random number generation in parallel execution:
232
233
```java
234
public static class Sampler implements MapFunction<Long, Long> {
235
@Override
236
public Long map(Long numSamples) throws Exception {
237
// Create local Random instance for thread safety
238
Random random = new Random();
239
240
long count = 0;
241
for (long i = 0; i < numSamples; i++) {
242
// Generate random coordinates
243
double x = random.nextDouble(); // [0.0, 1.0)
244
double y = random.nextDouble(); // [0.0, 1.0)
245
246
// Mathematical test for unit circle
247
if (x * x + y * y <= 1) {
248
count++;
249
}
250
}
251
252
return count;
253
}
254
}
255
```
256
257
## Execution Patterns
258
259
### Local Collection Execution
260
261
Collection execution example shows how to run Flink programs locally with in-memory data:
262
263
```java
264
// Set up local execution environment
265
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
266
267
// Create data from Java collections
268
List<User> userData = createUserList();
269
DataSet<User> users = env.fromCollection(userData);
270
271
// Process data
272
DataSet<String> result = users.map(user -> "Hello " + user.name);
273
274
// Execute and collect results locally
275
List<String> output = result.collect();
276
for (String greeting : output) {
277
System.out.println(greeting);
278
}
279
```
280
281
### Parallel Sample Generation
282
283
Pi estimation demonstrates parallel sample generation across multiple workers:
284
285
```java
286
// Create sample tasks for parallel execution
287
int numParallelSamples = 4;
288
long samplesPerTask = totalSamples / numParallelSamples;
289
290
DataSet<Long> sampleTasks = env.fromElements(
291
samplesPerTask, samplesPerTask, samplesPerTask, samplesPerTask
292
);
293
294
// Execute sampling in parallel
295
DataSet<Long> hitCounts = sampleTasks.map(new PiEstimation.Sampler());
296
297
// Reduce results
298
DataSet<Long> totalHits = hitCounts.reduce(new PiEstimation.SumReducer());
299
```
300
301
### Parameter Handling
302
303
Both examples demonstrate different parameter handling approaches:
304
305
```java
306
// Pi estimation parameter handling
307
ParameterTool params = ParameterTool.fromArgs(args);
308
long numSamples = params.getLong("samples", 1000000L); // Default 1M samples
309
310
// Collection example (no parameters needed)
311
// Demonstrates self-contained execution with embedded data
312
```
313
314
## Common Usage Patterns
315
316
### POJO Field Access
317
318
```java
319
// Field-based access (public fields)
320
User user = new User();
321
user.userIdentifier = 123;
322
user.name = "Alice";
323
324
EMail email = new EMail();
325
email.userId = user.userIdentifier;
326
email.subject = "Welcome";
327
email.body = "Hello " + user.name;
328
```
329
330
### Join Operations with POJOs
331
332
```java
333
// Join POJOs using field names
334
DataSet<User> users = env.fromCollection(userList);
335
DataSet<EMail> emails = env.fromCollection(emailList);
336
337
DataSet<Tuple2<User, EMail>> joined = users
338
.join(emails)
339
.where("userIdentifier") // Field name from User POJO
340
.equalTo("userId"); // Field name from EMail POJO
341
```
342
343
### Statistical Computation
344
345
```java
346
// Calculate statistics from samples
347
DataSet<Long> samples = generateSamples();
348
DataSet<Long> totalCount = samples.reduce(new PiEstimation.SumReducer());
349
350
// Convert to statistical result
351
DataSet<Double> statistics = totalCount.map(new MapFunction<Long, Double>() {
352
@Override
353
public Double map(Long count) {
354
return calculateStatistic(count);
355
}
356
});
357
```
358
359
## Types
360
361
### Miscellaneous Data Types
362
363
```java { .api }
364
// Pi estimation types
365
Long sampleCount = 1000000L;
366
Long hitCount = 785398L;
367
Double piEstimate = 3.141592;
368
369
// Collection execution POJOs
370
CollectionExecutionExample.User user = new CollectionExecutionExample.User(1, "Alice");
371
CollectionExecutionExample.EMail email = new CollectionExecutionExample.EMail(1, "Subject", "Body");
372
373
// Standard Java types
374
Random random = new Random();
375
List<User> userList = new ArrayList<>();
376
Tuple2<User, EMail> userEmail = new Tuple2<>(user, email);
377
```