or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

additional-examples.mdindex.mdstorm-operators.mdutility-classes.mdwordcount-examples.md

additional-examples.mddocs/

0

# Additional Examples

1

2

Extended examples demonstrating advanced Storm-Flink integration patterns beyond basic word counting, including stream splitting with multiple outputs, text processing with exclamation marks, stream joins, and real-time data printing capabilities.

3

4

## Capabilities

5

6

### Exclamation Processing Examples

7

8

Examples demonstrating text processing with Storm topologies that add exclamation marks to text streams.

9

10

#### ExclamationLocal

11

12

Local execution example for exclamation processing topology.

13

14

```java { .api }

15

/**

16

* Local execution example for exclamation processing topology

17

*/

18

public class ExclamationLocal {

19

public static final String topologyId = "Streaming Exclamation";

20

21

/**

22

* Main entry point for local exclamation topology

23

* @param args Command line arguments for configuration

24

*/

25

public static void main(String[] args) throws Exception;

26

}

27

```

28

29

#### ExclamationWithBolt

30

31

Example showing Storm bolt usage within Flink streaming for text processing.

32

33

```java { .api }

34

/**

35

* Shows using Storm bolt within Flink streaming program for exclamation processing

36

*/

37

public class ExclamationWithBolt {

38

/**

39

* Main entry point for bolt-based exclamation processing

40

* @param args Command line arguments

41

*/

42

public static void main(String[] args) throws Exception;

43

44

/**

45

* Map function for adding exclamation marks to text

46

*/

47

public static class ExclamationMap implements MapFunction<String, String> {

48

public String map(String value) throws Exception;

49

}

50

}

51

```

52

53

#### ExclamationWithSpout

54

55

Example showing Storm spout usage within Flink streaming for data sourcing.

56

57

```java { .api }

58

/**

59

* Shows using Storm spout within Flink streaming program

60

*/

61

public class ExclamationWithSpout {

62

/**

63

* Main entry point for spout-based exclamation processing

64

* @param args Command line arguments

65

*/

66

public static void main(String[] args) throws Exception;

67

68

/**

69

* Map function for processing spout data

70

*/

71

public static class ExclamationMap implements MapFunction<String, String> {

72

public String map(String value) throws Exception;

73

}

74

}

75

```

76

77

#### ExclamationTopology

78

79

Topology builder for exclamation processing with configurable parameters.

80

81

```java { .api }

82

/**

83

* Builder for exclamation processing topology

84

*/

85

public class ExclamationTopology {

86

public static final String spoutId = "source";

87

public static final String firstBoltId = "exclamation1";

88

public static final String secondBoltId = "exclamation2";

89

public static final String sinkId = "sink";

90

91

/**

92

* Build exclamation processing topology

93

* @return Configured TopologyBuilder

94

*/

95

public static TopologyBuilder buildTopology();

96

97

/**

98

* Get exclamation count configuration

99

* @return Number of exclamation marks to add

100

*/

101

public static int getExclamation();

102

103

/**

104

* Parse command line parameters

105

* @param args Command line arguments

106

* @return true if parameters valid, false otherwise

107

*/

108

public static boolean parseParameters(String[] args);

109

}

110

```

111

112

#### ExclamationBolt

113

114

Bolt operator that adds configurable exclamation marks to text.

115

116

```java { .api }

117

/**

118

* Bolt that adds configurable exclamation marks to text

119

*/

120

public class ExclamationBolt implements IRichBolt {

121

public static final String EXCLAMATION_COUNT = "exclamation.count";

122

123

/**

124

* Prepare bolt for execution

125

* @param conf Storm configuration

126

* @param context Topology context

127

* @param collector Output collector

128

*/

129

public void prepare(Map conf, TopologyContext context, OutputCollector collector);

130

131

/**

132

* Cleanup bolt resources

133

*/

134

public void cleanup();

135

136

/**

137

* Execute exclamation processing on tuple

138

* @param tuple Input tuple containing text

139

*/

140

public void execute(Tuple tuple);

141

142

/**

143

* Declare output fields

144

* @param declarer Output field declarer

145

*/

146

public void declareOutputFields(OutputFieldsDeclarer declarer);

147

148

/**

149

* Get component configuration

150

* @return Configuration map

151

*/

152

public Map<String, Object> getComponentConfiguration();

153

}

154

```

155

156

### Stream Splitting Examples

157

158

Examples demonstrating multiple output streams from Storm components with even/odd number splitting.

159

160

#### SpoutSplitExample

161

162

Example demonstrating spouts with multiple output streams and stream-specific processing.

163

164

```java { .api }

165

/**

166

* Demonstrates spouts with multiple output streams

167

*/

168

public class SpoutSplitExample {

169

/**

170

* Main entry point for stream splitting example

171

* @param args Command line arguments

172

*/

173

public static void main(String[] args) throws Exception;

174

175

/**

176

* Map function for enriching split stream data

177

*/

178

public static class Enrich implements MapFunction<Integer, Tuple2<String, Integer>> {

179

/**

180

* Error tracking for stream processing

181

*/

182

public static boolean errorOccured = false;

183

184

public Tuple2<String, Integer> map(Integer value) throws Exception;

185

}

186

}

187

```

188

189

#### RandomSpout

190

191

Spout generating random numbers with separate even and odd output streams.

192

193

```java { .api }

194

/**

195

* Spout generating random numbers with separate even/odd streams

196

*/

197

public class RandomSpout extends BaseRichSpout {

198

public static final String EVEN_STREAM = "even";

199

public static final String ODD_STREAM = "odd";

200

201

/**

202

* Create random spout with stream splitting

203

* @param split true to enable stream splitting, false for single stream

204

* @param seed Random seed for reproducible results

205

*/

206

public RandomSpout(boolean split, long seed);

207

208

/**

209

* Initialize spout

210

* @param conf Storm configuration

211

* @param context Topology context

212

* @param collector Spout output collector

213

*/

214

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector);

215

216

/**

217

* Emit next tuple to appropriate stream

218

*/

219

public void nextTuple();

220

221

/**

222

* Declare output fields for both streams

223

* @param declarer Output field declarer

224

*/

225

public void declareOutputFields(OutputFieldsDeclarer declarer);

226

}

227

```

228

229

#### VerifyAndEnrichBolt

230

231

Bolt for verifying and enriching data from split streams with error tracking.

232

233

```java { .api }

234

/**

235

* Bolt for verifying and enriching data from split streams

236

*/

237

public class VerifyAndEnrichBolt extends BaseRichBolt {

238

/**

239

* Global error tracking across bolt instances

240

*/

241

public static boolean errorOccured = false;

242

243

/**

244

* Create verification bolt for specific stream type

245

* @param evenOrOdd true for even stream processing, false for odd stream

246

*/

247

public VerifyAndEnrichBolt(boolean evenOrOdd);

248

249

/**

250

* Prepare bolt for execution

251

* @param stormConf Storm configuration

252

* @param context Topology context

253

* @param collector Output collector

254

*/

255

public void prepare(Map stormConf, TopologyContext context, OutputCollector collector);

256

257

/**

258

* Execute verification and enrichment

259

* @param input Input tuple from split stream

260

*/

261

public void execute(Tuple input);

262

263

/**

264

* Declare output fields

265

* @param declarer Output field declarer

266

*/

267

public void declareOutputFields(OutputFieldsDeclarer declarer);

268

}

269

```

270

271

### Join Examples

272

273

Examples demonstrating stream joins in Storm topologies.

274

275

#### SingleJoinExample

276

277

Example demonstrating joins between multiple data streams in Storm.

278

279

```java { .api }

280

/**

281

* Example demonstrating joins in Storm topologies

282

*/

283

public class SingleJoinExample {

284

/**

285

* Main entry point for join example

286

* @param args Command line arguments

287

*/

288

public static void main(String[] args) throws Exception;

289

}

290

```

291

292

### Print Examples

293

294

Examples for printing and displaying real-time data streams.

295

296

#### PrintSampleStream

297

298

Example for printing real-time streams with Twitter integration support.

299

300

```java { .api }

301

/**

302

* Example for printing real-time streams (requires Twitter API credentials)

303

*/

304

public class PrintSampleStream {

305

/**

306

* Main entry point for stream printing example

307

* @param args Command line arguments with Twitter credentials

308

*/

309

public static void main(String[] args) throws Exception;

310

}

311

```

312

313

### Remote Execution Examples

314

315

Examples demonstrating remote Storm topology execution on Flink clusters.

316

317

#### WordCountRemoteByClient

318

319

Example showing remote topology submission using FlinkClient.

320

321

```java { .api }

322

/**

323

* Remote topology submission using FlinkClient

324

*/

325

public class WordCountRemoteByClient {

326

public static final String topologyId = "Storm WordCount";

327

328

/**

329

* Main entry point for remote client submission

330

* @param args Command line arguments containing cluster configuration

331

*/

332

public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, NotAliveException;

333

}

334

```

335

336

#### WordCountRemoteBySubmitter

337

338

Example showing remote topology submission using FlinkSubmitter.

339

340

```java { .api }

341

/**

342

* Remote topology submission using FlinkSubmitter

343

*/

344

public class WordCountRemoteBySubmitter {

345

public static final String topologyId = "Storm WordCount";

346

347

/**

348

* Main entry point for remote submitter

349

* @param args Command line arguments

350

*/

351

public static void main(String[] args) throws Exception;

352

}

353

```

354

355

## Usage Examples

356

357

### Running Exclamation Examples

358

359

```java

360

// Local exclamation processing

361

ExclamationLocal.main(new String[]{});

362

363

// Embedded bolt usage

364

ExclamationWithBolt.main(new String[]{});

365

366

// Embedded spout usage

367

ExclamationWithSpout.main(new String[]{});

368

```

369

370

### Remote Topology Execution

371

372

```java

373

import org.apache.flink.storm.wordcount.*;

374

375

// Submit topology to remote Flink cluster using client

376

WordCountRemoteByClient.main(new String[]{

377

"localhost", "6123", "input.txt", "output.txt"

378

});

379

380

// Submit topology using FlinkSubmitter pattern

381

WordCountRemoteBySubmitter.main(new String[]{

382

"input.txt", "output.txt"

383

});

384

```

385

386

## Types

387

388

```java { .api }

389

// Storm exception types for remote execution

390

import org.apache.storm.generated.AlreadyAliveException;

391

import org.apache.storm.generated.InvalidTopologyException;

392

import org.apache.storm.generated.NotAliveException;

393

394

// Core Storm tuple and field types

395

import org.apache.storm.tuple.Tuple;

396

import org.apache.storm.tuple.Fields;

397

import org.apache.storm.tuple.Values;

398

399

// Flink tuple types

400

import org.apache.flink.api.java.tuple.Tuple1;

401

import org.apache.flink.api.java.tuple.Tuple2;

402

import org.apache.flink.api.java.tuple.Tuple3;

403

404

// Map and function interfaces

405

import java.util.Map;

406

import java.io.Serializable;

407

408

// POJO classes for data exchange

409

public static class Sentence implements Serializable {

410

private String sentence;

411

412

public Sentence();

413

public Sentence(String sentence);

414

public String getSentence();

415

public void setSentence(String sentence);

416

public String toString();

417

}

418

```

419

420

### Building Exclamation Topology

421

422

```java

423

import org.apache.storm.topology.TopologyBuilder;

424

import org.apache.flink.storm.exclamation.ExclamationTopology;

425

426

// Build exclamation topology

427

TopologyBuilder builder = ExclamationTopology.buildTopology();

428

429

// Configure exclamation count

430

int count = ExclamationTopology.getExclamation();

431

```

432

433

### Stream Splitting with RandomSpout

434

435

```java

436

import org.apache.storm.topology.TopologyBuilder;

437

import org.apache.flink.storm.split.operators.*;

438

439

TopologyBuilder builder = new TopologyBuilder();

440

441

// Add random spout with stream splitting

442

RandomSpout spout = new RandomSpout(true, 12345L);

443

builder.setSpout("random", spout);

444

445

// Process even stream

446

VerifyAndEnrichBolt evenBolt = new VerifyAndEnrichBolt(true);

447

builder.setBolt("even-processor", evenBolt)

448

.shuffleGrouping("random", RandomSpout.EVEN_STREAM);

449

450

// Process odd stream

451

VerifyAndEnrichBolt oddBolt = new VerifyAndEnrichBolt(false);

452

builder.setBolt("odd-processor", oddBolt)

453

.shuffleGrouping("random", RandomSpout.ODD_STREAM);

454

```

455

456

### Custom Exclamation Processing

457

458

```java

459

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

460

import org.apache.flink.storm.exclamation.operators.ExclamationBolt;

461

import org.apache.flink.storm.wrappers.BoltWrapper;

462

463

public class CustomExclamation {

464

public static void main(String[] args) throws Exception {

465

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

466

467

// Create data source

468

DataStream<String> source = env.fromElements(

469

"Hello", "World", "Storm", "Flink"

470

);

471

472

// Apply exclamation bolt

473

DataStream<String> processed = source.transform(

474

"ExclamationBolt",

475

BasicTypeInfo.STRING_TYPE_INFO,

476

new BoltWrapper<String, String>(new ExclamationBolt())

477

);

478

479

processed.print();

480

env.execute("Custom Exclamation");

481

}

482

}

483

```

484

485

### Multi-Stream Processing

486

487

```java

488

import org.apache.flink.streaming.api.datastream.SplitStream;

489

import org.apache.flink.storm.split.SpoutSplitExample;

490

491

public class MultiStreamExample {

492

public static void main(String[] args) throws Exception {

493

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

494

495

// Create random number source

496

DataStream<Integer> numbers = env.addSource(

497

new SpoutWrapper<>(new RandomSpout(false, 42),

498

new String[]{ Utils.DEFAULT_STREAM_ID }, -1)

499

);

500

501

// Split stream based on even/odd

502

SplitStream<Integer> split = numbers.split(new OutputSelector<Integer>() {

503

@Override

504

public Iterable<String> select(Integer value) {

505

return value % 2 == 0 ?

506

Collections.singletonList("even") :

507

Collections.singletonList("odd");

508

}

509

});

510

511

// Process even numbers

512

DataStream<Integer> evenStream = split.select("even");

513

evenStream.map(x -> "Even: " + x).print();

514

515

// Process odd numbers

516

DataStream<Integer> oddStream = split.select("odd");

517

oddStream.map(x -> "Odd: " + x).print();

518

519

env.execute("Multi-Stream Processing");

520

}

521

}

522

```

523

524

### Join Processing

525

526

```java

527

import org.apache.flink.streaming.api.datastream.DataStream;

528

import org.apache.flink.streaming.api.windowing.time.Time;

529

530

public class StreamJoinExample {

531

public static void main(String[] args) throws Exception {

532

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

533

534

// Create two data streams

535

DataStream<Tuple2<String, Integer>> stream1 = env.fromElements(

536

new Tuple2<>("a", 1), new Tuple2<>("b", 2)

537

);

538

539

DataStream<Tuple2<String, String>> stream2 = env.fromElements(

540

new Tuple2<>("a", "apple"), new Tuple2<>("b", "banana")

541

);

542

543

// Join streams by key within time window

544

DataStream<Tuple3<String, Integer, String>> joined = stream1

545

.join(stream2)

546

.where(t -> t.f0)

547

.equalTo(t -> t.f0)

548

.window(TumblingEventTimeWindows.of(Time.seconds(5)))

549

.apply((t1, t2) -> new Tuple3<>(t1.f0, t1.f1, t2.f1));

550

551

joined.print();

552

env.execute("Stream Join");

553

}

554

}

555

```