0
# Additional Examples
1
2
Extended examples demonstrating advanced Storm-Flink integration patterns beyond basic word counting, including stream splitting with multiple outputs, text processing with exclamation marks, stream joins, and real-time data printing capabilities.
3
4
## Capabilities
5
6
### Exclamation Processing Examples
7
8
Examples demonstrating text processing with Storm topologies that add exclamation marks to text streams.
9
10
#### ExclamationLocal
11
12
Local execution example for exclamation processing topology.
13
14
```java { .api }
15
/**
16
* Local execution example for exclamation processing topology
17
*/
18
public class ExclamationLocal {
19
public static final String topologyId = "Streaming Exclamation";
20
21
/**
22
* Main entry point for local exclamation topology
23
* @param args Command line arguments for configuration
24
*/
25
public static void main(String[] args) throws Exception;
26
}
27
```
28
29
#### ExclamationWithBolt
30
31
Example showing Storm bolt usage within Flink streaming for text processing.
32
33
```java { .api }
34
/**
35
* Shows using Storm bolt within Flink streaming program for exclamation processing
36
*/
37
public class ExclamationWithBolt {
38
/**
39
* Main entry point for bolt-based exclamation processing
40
* @param args Command line arguments
41
*/
42
public static void main(String[] args) throws Exception;
43
44
/**
45
* Map function for adding exclamation marks to text
46
*/
47
public static class ExclamationMap implements MapFunction<String, String> {
48
public String map(String value) throws Exception;
49
}
50
}
51
```
52
53
#### ExclamationWithSpout
54
55
Example showing Storm spout usage within Flink streaming for data sourcing.
56
57
```java { .api }
58
/**
59
* Shows using Storm spout within Flink streaming program
60
*/
61
public class ExclamationWithSpout {
62
/**
63
* Main entry point for spout-based exclamation processing
64
* @param args Command line arguments
65
*/
66
public static void main(String[] args) throws Exception;
67
68
/**
69
* Map function for processing spout data
70
*/
71
public static class ExclamationMap implements MapFunction<String, String> {
72
public String map(String value) throws Exception;
73
}
74
}
75
```
76
77
#### ExclamationTopology
78
79
Topology builder for exclamation processing with configurable parameters.
80
81
```java { .api }
82
/**
83
* Builder for exclamation processing topology
84
*/
85
public class ExclamationTopology {
86
public static final String spoutId = "source";
87
public static final String firstBoltId = "exclamation1";
88
public static final String secondBoltId = "exclamation2";
89
public static final String sinkId = "sink";
90
91
/**
92
* Build exclamation processing topology
93
* @return Configured TopologyBuilder
94
*/
95
public static TopologyBuilder buildTopology();
96
97
/**
98
* Get exclamation count configuration
99
* @return Number of exclamation marks to add
100
*/
101
public static int getExclamation();
102
103
/**
104
* Parse command line parameters
105
* @param args Command line arguments
106
* @return true if parameters valid, false otherwise
107
*/
108
public static boolean parseParameters(String[] args);
109
}
110
```
111
112
#### ExclamationBolt
113
114
Bolt operator that adds configurable exclamation marks to text.
115
116
```java { .api }
117
/**
118
* Bolt that adds configurable exclamation marks to text
119
*/
120
public class ExclamationBolt implements IRichBolt {
121
public static final String EXCLAMATION_COUNT = "exclamation.count";
122
123
/**
124
* Prepare bolt for execution
125
* @param conf Storm configuration
126
* @param context Topology context
127
* @param collector Output collector
128
*/
129
public void prepare(Map conf, TopologyContext context, OutputCollector collector);
130
131
/**
132
* Cleanup bolt resources
133
*/
134
public void cleanup();
135
136
/**
137
* Execute exclamation processing on tuple
138
* @param tuple Input tuple containing text
139
*/
140
public void execute(Tuple tuple);
141
142
/**
143
* Declare output fields
144
* @param declarer Output field declarer
145
*/
146
public void declareOutputFields(OutputFieldsDeclarer declarer);
147
148
/**
149
* Get component configuration
150
* @return Configuration map
151
*/
152
public Map<String, Object> getComponentConfiguration();
153
}
154
```
155
156
### Stream Splitting Examples
157
158
Examples demonstrating multiple output streams from Storm components with even/odd number splitting.
159
160
#### SpoutSplitExample
161
162
Example demonstrating spouts with multiple output streams and stream-specific processing.
163
164
```java { .api }
165
/**
166
* Demonstrates spouts with multiple output streams
167
*/
168
public class SpoutSplitExample {
169
/**
170
* Main entry point for stream splitting example
171
* @param args Command line arguments
172
*/
173
public static void main(String[] args) throws Exception;
174
175
/**
176
* Map function for enriching split stream data
177
*/
178
public static class Enrich implements MapFunction<Integer, Tuple2<String, Integer>> {
179
/**
180
* Error tracking for stream processing
181
*/
182
public static boolean errorOccured = false;
183
184
public Tuple2<String, Integer> map(Integer value) throws Exception;
185
}
186
}
187
```
188
189
#### RandomSpout
190
191
Spout generating random numbers with separate even and odd output streams.
192
193
```java { .api }
194
/**
195
* Spout generating random numbers with separate even/odd streams
196
*/
197
public class RandomSpout extends BaseRichSpout {
198
public static final String EVEN_STREAM = "even";
199
public static final String ODD_STREAM = "odd";
200
201
/**
202
* Create random spout with stream splitting
203
* @param split true to enable stream splitting, false for single stream
204
* @param seed Random seed for reproducible results
205
*/
206
public RandomSpout(boolean split, long seed);
207
208
/**
209
* Initialize spout
210
* @param conf Storm configuration
211
* @param context Topology context
212
* @param collector Spout output collector
213
*/
214
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
215
216
/**
217
* Emit next tuple to appropriate stream
218
*/
219
public void nextTuple();
220
221
/**
222
* Declare output fields for both streams
223
* @param declarer Output field declarer
224
*/
225
public void declareOutputFields(OutputFieldsDeclarer declarer);
226
}
227
```
228
229
#### VerifyAndEnrichBolt
230
231
Bolt for verifying and enriching data from split streams with error tracking.
232
233
```java { .api }
234
/**
235
* Bolt for verifying and enriching data from split streams
236
*/
237
public class VerifyAndEnrichBolt extends BaseRichBolt {
238
/**
239
* Global error tracking across bolt instances
240
*/
241
public static boolean errorOccured = false;
242
243
/**
244
* Create verification bolt for specific stream type
245
* @param evenOrOdd true for even stream processing, false for odd stream
246
*/
247
public VerifyAndEnrichBolt(boolean evenOrOdd);
248
249
/**
250
* Prepare bolt for execution
251
* @param stormConf Storm configuration
252
* @param context Topology context
253
* @param collector Output collector
254
*/
255
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
256
257
/**
258
* Execute verification and enrichment
259
* @param input Input tuple from split stream
260
*/
261
public void execute(Tuple input);
262
263
/**
264
* Declare output fields
265
* @param declarer Output field declarer
266
*/
267
public void declareOutputFields(OutputFieldsDeclarer declarer);
268
}
269
```
270
271
### Join Examples
272
273
Examples demonstrating stream joins in Storm topologies.
274
275
#### SingleJoinExample
276
277
Example demonstrating joins between multiple data streams in Storm.
278
279
```java { .api }
280
/**
281
* Example demonstrating joins in Storm topologies
282
*/
283
public class SingleJoinExample {
284
/**
285
* Main entry point for join example
286
* @param args Command line arguments
287
*/
288
public static void main(String[] args) throws Exception;
289
}
290
```
291
292
### Print Examples
293
294
Examples for printing and displaying real-time data streams.
295
296
#### PrintSampleStream
297
298
Example for printing real-time streams with Twitter integration support.
299
300
```java { .api }
301
/**
302
* Example for printing real-time streams (requires Twitter API credentials)
303
*/
304
public class PrintSampleStream {
305
/**
306
* Main entry point for stream printing example
307
* @param args Command line arguments with Twitter credentials
308
*/
309
public static void main(String[] args) throws Exception;
310
}
311
```
312
313
### Remote Execution Examples
314
315
Examples demonstrating remote Storm topology execution on Flink clusters.
316
317
#### WordCountRemoteByClient
318
319
Example showing remote topology submission using FlinkClient.
320
321
```java { .api }
322
/**
323
* Remote topology submission using FlinkClient
324
*/
325
public class WordCountRemoteByClient {
326
public static final String topologyId = "Storm WordCount";
327
328
/**
329
* Main entry point for remote client submission
330
* @param args Command line arguments containing cluster configuration
331
*/
332
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, NotAliveException;
333
}
334
```
335
336
#### WordCountRemoteBySubmitter
337
338
Example showing remote topology submission using FlinkSubmitter.
339
340
```java { .api }
341
/**
342
* Remote topology submission using FlinkSubmitter
343
*/
344
public class WordCountRemoteBySubmitter {
345
public static final String topologyId = "Storm WordCount";
346
347
/**
348
* Main entry point for remote submitter
349
* @param args Command line arguments
350
*/
351
public static void main(String[] args) throws Exception;
352
}
353
```
354
355
## Usage Examples
356
357
### Running Exclamation Examples
358
359
```java
360
// Local exclamation processing
361
ExclamationLocal.main(new String[]{});
362
363
// Embedded bolt usage
364
ExclamationWithBolt.main(new String[]{});
365
366
// Embedded spout usage
367
ExclamationWithSpout.main(new String[]{});
368
```
369
370
### Remote Topology Execution
371
372
```java
373
import org.apache.flink.storm.wordcount.*;
374
375
// Submit topology to remote Flink cluster using client
376
WordCountRemoteByClient.main(new String[]{
377
"localhost", "6123", "input.txt", "output.txt"
378
});
379
380
// Submit topology using FlinkSubmitter pattern
381
WordCountRemoteBySubmitter.main(new String[]{
382
"input.txt", "output.txt"
383
});
384
```
385
386
## Types
387
388
```java { .api }
389
// Storm exception types for remote execution
390
import org.apache.storm.generated.AlreadyAliveException;
391
import org.apache.storm.generated.InvalidTopologyException;
392
import org.apache.storm.generated.NotAliveException;
393
394
// Core Storm tuple and field types
395
import org.apache.storm.tuple.Tuple;
396
import org.apache.storm.tuple.Fields;
397
import org.apache.storm.tuple.Values;
398
399
// Flink tuple types
400
import org.apache.flink.api.java.tuple.Tuple1;
401
import org.apache.flink.api.java.tuple.Tuple2;
402
import org.apache.flink.api.java.tuple.Tuple3;
403
404
// Map and function interfaces
405
import java.util.Map;
406
import java.io.Serializable;
407
408
// POJO classes for data exchange
409
public static class Sentence implements Serializable {
410
private String sentence;
411
412
public Sentence();
413
public Sentence(String sentence);
414
public String getSentence();
415
public void setSentence(String sentence);
416
public String toString();
417
}
418
```
419
420
### Building Exclamation Topology
421
422
```java
423
import org.apache.storm.topology.TopologyBuilder;
424
import org.apache.flink.storm.exclamation.ExclamationTopology;
425
426
// Build exclamation topology
427
TopologyBuilder builder = ExclamationTopology.buildTopology();
428
429
// Configure exclamation count
430
int count = ExclamationTopology.getExclamation();
431
```
432
433
### Stream Splitting with RandomSpout
434
435
```java
436
import org.apache.storm.topology.TopologyBuilder;
437
import org.apache.flink.storm.split.operators.*;
438
439
TopologyBuilder builder = new TopologyBuilder();
440
441
// Add random spout with stream splitting
442
RandomSpout spout = new RandomSpout(true, 12345L);
443
builder.setSpout("random", spout);
444
445
// Process even stream
446
VerifyAndEnrichBolt evenBolt = new VerifyAndEnrichBolt(true);
447
builder.setBolt("even-processor", evenBolt)
448
.shuffleGrouping("random", RandomSpout.EVEN_STREAM);
449
450
// Process odd stream
451
VerifyAndEnrichBolt oddBolt = new VerifyAndEnrichBolt(false);
452
builder.setBolt("odd-processor", oddBolt)
453
.shuffleGrouping("random", RandomSpout.ODD_STREAM);
454
```
455
456
### Custom Exclamation Processing
457
458
```java
459
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
460
import org.apache.flink.storm.exclamation.operators.ExclamationBolt;
461
import org.apache.flink.storm.wrappers.BoltWrapper;
462
463
public class CustomExclamation {
464
public static void main(String[] args) throws Exception {
465
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
466
467
// Create data source
468
DataStream<String> source = env.fromElements(
469
"Hello", "World", "Storm", "Flink"
470
);
471
472
// Apply exclamation bolt
473
DataStream<String> processed = source.transform(
474
"ExclamationBolt",
475
BasicTypeInfo.STRING_TYPE_INFO,
476
new BoltWrapper<String, String>(new ExclamationBolt())
477
);
478
479
processed.print();
480
env.execute("Custom Exclamation");
481
}
482
}
483
```
484
485
### Multi-Stream Processing
486
487
```java
488
import org.apache.flink.streaming.api.datastream.SplitStream;
489
import org.apache.flink.storm.split.SpoutSplitExample;
490
491
public class MultiStreamExample {
492
public static void main(String[] args) throws Exception {
493
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
494
495
// Create random number source
496
DataStream<Integer> numbers = env.addSource(
497
new SpoutWrapper<>(new RandomSpout(false, 42),
498
new String[]{ Utils.DEFAULT_STREAM_ID }, -1)
499
);
500
501
// Split stream based on even/odd
502
SplitStream<Integer> split = numbers.split(new OutputSelector<Integer>() {
503
@Override
504
public Iterable<String> select(Integer value) {
505
return value % 2 == 0 ?
506
Collections.singletonList("even") :
507
Collections.singletonList("odd");
508
}
509
});
510
511
// Process even numbers
512
DataStream<Integer> evenStream = split.select("even");
513
evenStream.map(x -> "Even: " + x).print();
514
515
// Process odd numbers
516
DataStream<Integer> oddStream = split.select("odd");
517
oddStream.map(x -> "Odd: " + x).print();
518
519
env.execute("Multi-Stream Processing");
520
}
521
}
522
```
523
524
### Join Processing
525
526
```java
527
import org.apache.flink.streaming.api.datastream.DataStream;
528
import org.apache.flink.streaming.api.windowing.time.Time;
529
530
public class StreamJoinExample {
531
public static void main(String[] args) throws Exception {
532
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
533
534
// Create two data streams
535
DataStream<Tuple2<String, Integer>> stream1 = env.fromElements(
536
new Tuple2<>("a", 1), new Tuple2<>("b", 2)
537
);
538
539
DataStream<Tuple2<String, String>> stream2 = env.fromElements(
540
new Tuple2<>("a", "apple"), new Tuple2<>("b", "banana")
541
);
542
543
// Join streams by key within time window
544
DataStream<Tuple3<String, Integer, String>> joined = stream1
545
.join(stream2)
546
.where(t -> t.f0)
547
.equalTo(t -> t.f0)
548
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
549
.apply((t1, t2) -> new Tuple3<>(t1.f0, t1.f1, t2.f1));
550
551
joined.print();
552
env.execute("Stream Join");
553
}
554
}
555
```