or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

clustering.mddistributed-copy.mdgraph-processing.mdindex.mdmisc-examples.mdrelational-processing.mdword-count.md

misc-examples.mddocs/

0

# Miscellaneous Examples

1

2

Additional examples demonstrating various Flink patterns including Pi estimation using Monte Carlo method, collection-based execution, and POJO usage patterns.

3

4

## Capabilities

5

6

### Pi Estimation

7

8

Monte Carlo method implementation for estimating the value of Pi using random sampling and parallel computation.

9

10

```java { .api }

11

/**

12

* Pi estimation using Monte Carlo method.

13

* Usage: PiEstimation --samples <n>

14

*/

15

public class PiEstimation {

16

public static void main(String[] args) throws Exception;

17

18

/**

19

* Monte Carlo sampler that generates random points and counts those inside unit circle

20

*/

21

public static class Sampler implements MapFunction<Long, Long> {

22

/**

23

* Generates random samples and counts hits inside unit circle

24

* @param numSamples Number of samples to generate

25

* @return Number of samples that fall inside unit circle

26

*/

27

@Override

28

public Long map(Long numSamples) throws Exception;

29

}

30

31

/**

32

* Reducer for summing sample counts

33

*/

34

public static final class SumReducer implements ReduceFunction<Long> {

35

/**

36

* Sums two long values

37

* @param value1 First value

38

* @param value2 Second value

39

* @return Sum of the two values

40

*/

41

@Override

42

public Long reduce(Long value1, Long value2) throws Exception;

43

}

44

}

45

```

46

47

**Usage Examples:**

48

49

```java

50

// Run Pi estimation with default sample size

51

String[] emptyArgs = {};

52

PiEstimation.main(emptyArgs);

53

54

// Run with custom sample size

55

String[] args = {"--samples", "1000000"};

56

PiEstimation.main(args);

57

58

// Use Pi estimation components in custom computation

59

DataSet<Long> sampleCounts = env.fromElements(10000L, 10000L, 10000L, 10000L);

60

DataSet<Long> hits = sampleCounts.map(new PiEstimation.Sampler());

61

DataSet<Long> totalHits = hits.reduce(new PiEstimation.SumReducer());

62

63

// Calculate Pi estimate

64

DataSet<Double> piEstimate = totalHits.map(new MapFunction<Long, Double>() {

65

@Override

66

public Double map(Long hits) {

67

long totalSamples = 40000L; // 4 * 10000

68

return 4.0 * hits / totalSamples;

69

}

70

});

71

```

72

73

### Collection Execution Example

74

75

Demonstrates local collection-based execution patterns and POJO usage in Flink programs.

76

77

```java { .api }

78

/**

79

* Collection-based execution example demonstrating local processing with POJOs.

80

* Usage: CollectionExecutionExample

81

*/

82

public class CollectionExecutionExample {

83

public static void main(String[] args) throws Exception;

84

85

/**

86

* User POJO with identifier and name fields

87

*/

88

public static class User {

89

public int userIdentifier;

90

public String name;

91

92

public User();

93

public User(int userIdentifier, String name);

94

95

@Override

96

public String toString();

97

}

98

99

/**

100

* Email POJO with user ID, subject, and body fields

101

*/

102

public static class EMail {

103

public int userId;

104

public String subject;

105

public String body;

106

107

public EMail();

108

public EMail(int userId, String subject, String body);

109

110

@Override

111

public String toString();

112

}

113

}

114

```

115

116

**Usage Examples:**

117

118

```java

119

// Run collection execution example

120

String[] emptyArgs = {};

121

CollectionExecutionExample.main(emptyArgs);

122

123

// Use POJOs in custom collection-based processing

124

import org.apache.flink.examples.java.misc.CollectionExecutionExample.User;

125

import org.apache.flink.examples.java.misc.CollectionExecutionExample.EMail;

126

127

// Create sample data

128

List<User> users = Arrays.asList(

129

new User(1, "Alice"),

130

new User(2, "Bob"),

131

new User(3, "Charlie")

132

);

133

134

List<EMail> emails = Arrays.asList(

135

new EMail(1, "Welcome", "Welcome to our service"),

136

new EMail(2, "Newsletter", "Monthly newsletter"),

137

new EMail(1, "Reminder", "Don't forget to...")

138

);

139

140

// Process with Flink

141

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

142

DataSet<User> userDataSet = env.fromCollection(users);

143

DataSet<EMail> emailDataSet = env.fromCollection(emails);

144

145

// Join users with their emails

146

DataSet<Tuple2<User, EMail>> userEmails = userDataSet

147

.join(emailDataSet)

148

.where("userIdentifier")

149

.equalTo("userId");

150

```

151

152

## Algorithm Implementations

153

154

### Monte Carlo Pi Estimation

155

156

The Pi estimation algorithm uses the mathematical property that the ratio of points falling inside a unit circle to total points approximates π/4:

157

158

```java

159

// Monte Carlo sampling logic

160

public Long map(Long numSamples) throws Exception {

161

long count = 0;

162

Random random = new Random();

163

164

for (long i = 0; i < numSamples; i++) {

165

double x = random.nextDouble();

166

double y = random.nextDouble();

167

168

// Check if point is inside unit circle

169

if (x * x + y * y <= 1) {

170

count++;

171

}

172

}

173

174

return count;

175

}

176

177

// Pi calculation from sample results

178

double pi = 4.0 * totalHitsInsideCircle / totalSamples;

179

```

180

181

### Collection-based Processing Pattern

182

183

Demonstrates local execution with in-memory collections:

184

185

```java

186

// Create execution environment for local processing

187

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

188

189

// Create DataSets from Java collections

190

DataSet<User> users = env.fromCollection(userList);

191

DataSet<EMail> emails = env.fromCollection(emailList);

192

193

// Process data locally

194

DataSet<String> userNames = users.map(user -> user.name);

195

List<String> results = userNames.collect(); // Execute locally and collect results

196

```

197

198

## Data Patterns

199

200

### POJO Usage Guidelines

201

202

The examples demonstrate proper POJO (Plain Old Java Object) usage in Flink:

203

204

**POJO Requirements:**

205

- Public no-argument constructor

206

- Public fields or public getter/setter methods

207

- Serializable (implicitly through Flink's serialization)

208

209

```java

210

// Correct POJO structure

211

public static class User {

212

public int userIdentifier; // Public field

213

public String name; // Public field

214

215

public User() {} // No-argument constructor

216

217

public User(int userIdentifier, String name) { // Optional constructor

218

this.userIdentifier = userIdentifier;

219

this.name = name;

220

}

221

222

@Override

223

public String toString() { // Optional but recommended

224

return "User{id=" + userIdentifier + ", name='" + name + "'}";

225

}

226

}

227

```

228

229

### Random Number Generation

230

231

Pi estimation demonstrates thread-safe random number generation in parallel execution:

232

233

```java

234

public static class Sampler implements MapFunction<Long, Long> {

235

@Override

236

public Long map(Long numSamples) throws Exception {

237

// Create local Random instance for thread safety

238

Random random = new Random();

239

240

long count = 0;

241

for (long i = 0; i < numSamples; i++) {

242

// Generate random coordinates

243

double x = random.nextDouble(); // [0.0, 1.0)

244

double y = random.nextDouble(); // [0.0, 1.0)

245

246

// Mathematical test for unit circle

247

if (x * x + y * y <= 1) {

248

count++;

249

}

250

}

251

252

return count;

253

}

254

}

255

```

256

257

## Execution Patterns

258

259

### Local Collection Execution

260

261

Collection execution example shows how to run Flink programs locally with in-memory data:

262

263

```java

264

// Set up local execution environment

265

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

266

267

// Create data from Java collections

268

List<User> userData = createUserList();

269

DataSet<User> users = env.fromCollection(userData);

270

271

// Process data

272

DataSet<String> result = users.map(user -> "Hello " + user.name);

273

274

// Execute and collect results locally

275

List<String> output = result.collect();

276

for (String greeting : output) {

277

System.out.println(greeting);

278

}

279

```

280

281

### Parallel Sample Generation

282

283

Pi estimation demonstrates parallel sample generation across multiple workers:

284

285

```java

286

// Create sample tasks for parallel execution

287

int numParallelSamples = 4;

288

long samplesPerTask = totalSamples / numParallelSamples;

289

290

DataSet<Long> sampleTasks = env.fromElements(

291

samplesPerTask, samplesPerTask, samplesPerTask, samplesPerTask

292

);

293

294

// Execute sampling in parallel

295

DataSet<Long> hitCounts = sampleTasks.map(new PiEstimation.Sampler());

296

297

// Reduce results

298

DataSet<Long> totalHits = hitCounts.reduce(new PiEstimation.SumReducer());

299

```

300

301

### Parameter Handling

302

303

Both examples demonstrate different parameter handling approaches:

304

305

```java

306

// Pi estimation parameter handling

307

ParameterTool params = ParameterTool.fromArgs(args);

308

long numSamples = params.getLong("samples", 1000000L); // Default 1M samples

309

310

// Collection example (no parameters needed)

311

// Demonstrates self-contained execution with embedded data

312

```

313

314

## Common Usage Patterns

315

316

### POJO Field Access

317

318

```java

319

// Field-based access (public fields)

320

User user = new User();

321

user.userIdentifier = 123;

322

user.name = "Alice";

323

324

EMail email = new EMail();

325

email.userId = user.userIdentifier;

326

email.subject = "Welcome";

327

email.body = "Hello " + user.name;

328

```

329

330

### Join Operations with POJOs

331

332

```java

333

// Join POJOs using field names

334

DataSet<User> users = env.fromCollection(userList);

335

DataSet<EMail> emails = env.fromCollection(emailList);

336

337

DataSet<Tuple2<User, EMail>> joined = users

338

.join(emails)

339

.where("userIdentifier") // Field name from User POJO

340

.equalTo("userId"); // Field name from EMail POJO

341

```

342

343

### Statistical Computation

344

345

```java

346

// Calculate statistics from samples

347

DataSet<Long> samples = generateSamples();

348

DataSet<Long> totalCount = samples.reduce(new PiEstimation.SumReducer());

349

350

// Convert to statistical result

351

DataSet<Double> statistics = totalCount.map(new MapFunction<Long, Double>() {

352

@Override

353

public Double map(Long count) {

354

return calculateStatistic(count);

355

}

356

});

357

```

358

359

## Types

360

361

### Miscellaneous Data Types

362

363

```java { .api }

364

// Pi estimation types

365

Long sampleCount = 1000000L;

366

Long hitCount = 785398L;

367

Double piEstimate = 3.141592;

368

369

// Collection execution POJOs

370

CollectionExecutionExample.User user = new CollectionExecutionExample.User(1, "Alice");

371

CollectionExecutionExample.EMail email = new CollectionExecutionExample.EMail(1, "Subject", "Body");

372

373

// Standard Java types

374

Random random = new Random();

375

List<User> userList = new ArrayList<>();

376

Tuple2<User, EMail> userEmail = new Tuple2<>(user, email);

377

```