0
# Storm Operators
1
2
Production-ready Storm operators including Spouts for data ingestion and Bolts for data processing. These operators support both index-based and field name-based tuple access patterns, providing flexibility for different integration approaches and data processing requirements.
3
4
## Capabilities
5
6
### Data Source Spouts
7
8
Spouts for ingesting data from various sources including files and memory.
9
10
#### WordCountFileSpout
11
12
File-based spout for reading text data from local files for word counting operations.
13
14
```java { .api }
15
/**
16
* Spout reading text data from files for word counting
17
*/
18
public class WordCountFileSpout extends FileSpout {
19
/**
20
* Create file spout for word count data
21
* @param path Path to input text file
22
*/
23
public WordCountFileSpout(String path);
24
25
/**
26
* Declare output fields for word count processing
27
* @param declarer Output field declarer
28
*/
29
public void declareOutputFields(OutputFieldsDeclarer declarer);
30
}
31
```
32
33
#### WordCountInMemorySpout
34
35
Memory-based spout providing built-in text data for word counting examples and testing.
36
37
```java { .api }
38
/**
39
* Spout providing built-in text data for word counting
40
*/
41
public class WordCountInMemorySpout extends FiniteInMemorySpout {
42
/**
43
* Create in-memory spout with built-in word count data
44
*/
45
public WordCountInMemorySpout();
46
47
/**
48
* Declare output fields for word count processing
49
* @param declarer Output field declarer
50
*/
51
public void declareOutputFields(OutputFieldsDeclarer declarer);
52
}
53
```
54
55
### Processing Bolts
56
57
Bolts for tokenizing text and counting word occurrences with different tuple access patterns.
58
59
#### Index-Based Processing Bolts
60
61
Bolts that access tuple fields using index positions.
62
63
##### BoltTokenizer
64
65
Tokenizes sentences into words using tuple index access for high-performance processing.
66
67
```java { .api }
68
/**
69
* Tokenizes sentences into words using tuple index access
70
*/
71
public class BoltTokenizer implements IRichBolt {
72
public static final String ATTRIBUTE_WORD = "word";
73
public static final String ATTRIBUTE_COUNT = "count";
74
public static final int ATTRIBUTE_WORD_INDEX = 0;
75
public static final int ATTRIBUTE_COUNT_INDEX = 1;
76
77
/**
78
* Prepare bolt for execution
79
* @param stormConf Storm configuration
80
* @param context Topology context
81
* @param collector Output collector
82
*/
83
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
84
85
/**
86
* Execute tokenization on input tuple
87
* @param input Input tuple containing text to tokenize
88
*/
89
public void execute(Tuple input);
90
91
/**
92
* Cleanup bolt resources
93
*/
94
public void cleanup();
95
96
/**
97
* Declare output fields
98
* @param declarer Output field declarer
99
*/
100
public void declareOutputFields(OutputFieldsDeclarer declarer);
101
102
/**
103
* Get component configuration
104
* @return Configuration map
105
*/
106
public Map<String, Object> getComponentConfiguration();
107
}
108
```
109
110
##### BoltCounter
111
112
Counts word occurrences using tuple index access for efficient aggregation.
113
114
```java { .api }
115
/**
116
* Counts word occurrences using tuple index access
117
*/
118
public class BoltCounter implements IRichBolt {
119
public static final String ATTRIBUTE_WORD = "word";
120
public static final String ATTRIBUTE_COUNT = "count";
121
122
/**
123
* Prepare bolt for execution
124
* @param stormConf Storm configuration
125
* @param context Topology context
126
* @param collector Output collector
127
*/
128
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
129
130
/**
131
* Execute counting on input tuple
132
* @param input Input tuple containing word and count
133
*/
134
public void execute(Tuple input);
135
136
/**
137
* Cleanup bolt resources
138
*/
139
public void cleanup();
140
141
/**
142
* Declare output fields
143
* @param declarer Output field declarer
144
*/
145
public void declareOutputFields(OutputFieldsDeclarer declarer);
146
147
/**
148
* Get component configuration
149
* @return Configuration map
150
*/
151
public Map<String, Object> getComponentConfiguration();
152
}
153
```
154
155
#### Field Name-Based Processing Bolts
156
157
Bolts that access tuple fields using field names for better readability and maintainability.
158
159
##### BoltTokenizerByName
160
161
Tokenizes sentences using tuple field name access.
162
163
```java { .api }
164
/**
165
* Tokenizes sentences using tuple field name access
166
*/
167
public class BoltTokenizerByName implements IRichBolt {
168
public static final String ATTRIBUTE_WORD = "word";
169
170
/**
171
* Prepare bolt for execution
172
* @param stormConf Storm configuration
173
* @param context Topology context
174
* @param collector Output collector
175
*/
176
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
177
178
/**
179
* Execute tokenization using field name access
180
* @param input Input tuple containing text field
181
*/
182
public void execute(Tuple input);
183
184
/**
185
* Cleanup bolt resources
186
*/
187
public void cleanup();
188
189
/**
190
* Declare output fields
191
* @param declarer Output field declarer
192
*/
193
public void declareOutputFields(OutputFieldsDeclarer declarer);
194
195
/**
196
* Get component configuration
197
* @return Configuration map
198
*/
199
public Map<String, Object> getComponentConfiguration();
200
}
201
```
202
203
##### BoltCounterByName
204
205
Counts word occurrences using tuple field name access.
206
207
```java { .api }
208
/**
209
* Counts word occurrences using tuple field name access
210
*/
211
public class BoltCounterByName implements IRichBolt {
212
/**
213
* Prepare bolt for execution
214
* @param stormConf Storm configuration
215
* @param context Topology context
216
* @param collector Output collector
217
*/
218
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
219
220
/**
221
* Execute counting using field name access
222
* @param input Input tuple with named fields
223
*/
224
public void execute(Tuple input);
225
226
/**
227
* Cleanup bolt resources
228
*/
229
public void cleanup();
230
231
/**
232
* Declare output fields
233
* @param declarer Output field declarer
234
*/
235
public void declareOutputFields(OutputFieldsDeclarer declarer);
236
237
/**
238
* Get component configuration
239
* @return Configuration map
240
*/
241
public Map<String, Object> getComponentConfiguration();
242
}
243
```
244
245
### Data Wrapper Classes
246
247
Utility classes for wrapping test data in different formats.
248
249
#### WordCountDataTuple
250
251
Tuple wrapper for WordCount test data providing typed access to test sentences.
252
253
```java { .api }
254
/**
255
* Tuple wrapper for WordCount test data
256
*/
257
public class WordCountDataTuple {
258
/**
259
* Array of tuples containing test sentences
260
*/
261
public static Tuple1<String>[] TUPLES;
262
}
263
```
264
265
#### WordCountDataPojos
266
267
POJO wrapper for WordCount test data enabling object-oriented data access.
268
269
```java { .api }
270
/**
271
* POJO wrapper for WordCount test data
272
*/
273
public class WordCountDataPojos {
274
/**
275
* Array of sentence POJOs for testing
276
*/
277
public static Sentence[] SENTENCES;
278
279
/**
280
* POJO representing a sentence for word count processing
281
*/
282
public static class Sentence implements Serializable {
283
/**
284
* Default constructor
285
*/
286
public Sentence();
287
288
/**
289
* Constructor with sentence text
290
* @param sentence The sentence text
291
*/
292
public Sentence(String sentence);
293
294
/**
295
* Get sentence text
296
* @return The sentence text
297
*/
298
public String getSentence();
299
300
/**
301
* Set sentence text
302
* @param sentence The sentence text
303
*/
304
public void setSentence(String sentence);
305
306
/**
307
* String representation of sentence
308
* @return The sentence text
309
*/
310
public String toString();
311
}
312
}
313
```
314
315
## Usage Examples
316
317
### Creating Topology with Index-Based Access
318
319
```java
320
import org.apache.storm.topology.TopologyBuilder;
321
import org.apache.storm.tuple.Fields;
322
import org.apache.flink.storm.wordcount.operators.*;
323
324
TopologyBuilder builder = new TopologyBuilder();
325
326
// Add spout
327
builder.setSpout("source", new WordCountInMemorySpout());
328
329
// Add tokenizer bolt (index-based)
330
builder.setBolt("tokenizer", new BoltTokenizer(), 4)
331
.shuffleGrouping("source");
332
333
// Add counter bolt (index-based)
334
builder.setBolt("counter", new BoltCounter(), 4)
335
.fieldsGrouping("tokenizer", new Fields(BoltTokenizer.ATTRIBUTE_WORD));
336
```
337
338
### Creating Topology with Field Name-Based Access
339
340
```java
341
import org.apache.storm.topology.TopologyBuilder;
342
import org.apache.storm.tuple.Fields;
343
import org.apache.flink.storm.wordcount.operators.*;
344
345
TopologyBuilder builder = new TopologyBuilder();
346
347
// Add spout
348
builder.setSpout("source", new WordCountFileSpout("input.txt"));
349
350
// Add tokenizer bolt (name-based)
351
builder.setBolt("tokenizer", new BoltTokenizerByName(), 4)
352
.shuffleGrouping("source");
353
354
// Add counter bolt (name-based)
355
builder.setBolt("counter", new BoltCounterByName(), 4)
356
.fieldsGrouping("tokenizer", new Fields(BoltTokenizerByName.ATTRIBUTE_WORD));
357
```
358
359
### Using Data Sources
360
361
```java
362
import org.apache.flink.storm.wordcount.operators.*;
363
364
// File-based data source
365
WordCountFileSpout fileSpout = new WordCountFileSpout("/path/to/input.txt");
366
367
// Memory-based data source with built-in data
368
WordCountInMemorySpout memorySpout = new WordCountInMemorySpout();
369
```
370
371
### Custom Processing Pipeline
372
373
```java
374
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
375
import org.apache.flink.storm.wrappers.*;
376
import org.apache.flink.storm.wordcount.operators.*;
377
378
public class CustomStormPipeline {
379
public static void main(String[] args) throws Exception {
380
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
381
382
// Create spout wrapper
383
SpoutWrapper<String> spoutWrapper = new SpoutWrapper<>(
384
new WordCountInMemorySpout(),
385
new String[]{ Utils.DEFAULT_STREAM_ID },
386
-1
387
);
388
389
// Create bolt wrapper
390
BoltWrapper<String, Tuple2<String, Integer>> boltWrapper =
391
new BoltWrapper<>(new BoltTokenizer());
392
393
// Build processing pipeline
394
DataStream<String> source = env.addSource(spoutWrapper);
395
DataStream<Tuple2<String, Integer>> processed = source.transform(
396
"Tokenizer",
397
TypeExtractor.getForObject(new Tuple2<>("", 0)),
398
boltWrapper
399
);
400
401
// Aggregate and output
402
processed.keyBy(0).sum(1).print();
403
env.execute("Storm Processing Pipeline");
404
}
405
}
406
```
407
408
### Working with POJO Data
409
410
```java
411
import org.apache.flink.storm.wordcount.operators.WordCountDataPojos.*;
412
413
// Create sentence POJOs
414
Sentence sentence1 = new Sentence("Hello world from Storm");
415
Sentence sentence2 = new Sentence("Apache Flink processes streams");
416
417
// Access built-in test data
418
Sentence[] testSentences = WordCountDataPojos.SENTENCES;
419
for (Sentence sentence : testSentences) {
420
System.out.println("Processing: " + sentence.getSentence());
421
}
422
```
423
424
### Bolt Implementation Patterns
425
426
#### Index-Based Processing
427
428
```java
429
public class CustomIndexBolt implements IRichBolt {
430
private OutputCollector collector;
431
432
public void execute(Tuple input) {
433
// Access by index (fast)
434
String text = input.getString(0);
435
Integer count = input.getInteger(1);
436
437
// Process and emit
438
collector.emit(new Values(processedText, newCount));
439
collector.ack(input);
440
}
441
}
442
```
443
444
#### Field Name-Based Processing
445
446
```java
447
public class CustomNameBolt implements IRichBolt {
448
private OutputCollector collector;
449
450
public void execute(Tuple input) {
451
// Access by field name (readable)
452
String word = input.getStringByField("word");
453
Integer count = input.getIntegerByField("count");
454
455
// Process and emit
456
collector.emit(new Values(processedWord, newCount));
457
collector.ack(input);
458
}
459
}
460
```