Production-ready Storm operators including Spouts for data ingestion and Bolts for data processing. These operators support both index-based and field name-based tuple access patterns, providing flexibility for different integration approaches and data processing requirements.
Spouts for ingesting data from various sources including files and memory.
File-based spout for reading text data from local files for word counting operations.
/**
* Spout reading text data from files for word counting
*/
public class WordCountFileSpout extends FileSpout {
/**
* Create file spout for word count data
* @param path Path to input text file
*/
public WordCountFileSpout(String path);
/**
* Declare output fields for word count processing
* @param declarer Output field declarer
*/
public void declareOutputFields(OutputFieldsDeclarer declarer);
}Memory-based spout providing built-in text data for word counting examples and testing.
/**
* Spout providing built-in text data for word counting
*/
public class WordCountInMemorySpout extends FiniteInMemorySpout {
/**
* Create in-memory spout with built-in word count data
*/
public WordCountInMemorySpout();
/**
* Declare output fields for word count processing
* @param declarer Output field declarer
*/
public void declareOutputFields(OutputFieldsDeclarer declarer);
}Bolts for tokenizing text and counting word occurrences with different tuple access patterns.
Bolts that access tuple fields using index positions.
Tokenizes sentences into words using tuple index access for high-performance processing.
/**
* Tokenizes sentences into words using tuple index access
*/
public class BoltTokenizer implements IRichBolt {
public static final String ATTRIBUTE_WORD = "word";
public static final String ATTRIBUTE_COUNT = "count";
public static final int ATTRIBUTE_WORD_INDEX = 0;
public static final int ATTRIBUTE_COUNT_INDEX = 1;
/**
* Prepare bolt for execution
* @param stormConf Storm configuration
* @param context Topology context
* @param collector Output collector
*/
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
/**
* Execute tokenization on input tuple
* @param input Input tuple containing text to tokenize
*/
public void execute(Tuple input);
/**
* Cleanup bolt resources
*/
public void cleanup();
/**
* Declare output fields
* @param declarer Output field declarer
*/
public void declareOutputFields(OutputFieldsDeclarer declarer);
/**
* Get component configuration
* @return Configuration map
*/
public Map<String, Object> getComponentConfiguration();
}Counts word occurrences using tuple index access for efficient aggregation.
/**
* Counts word occurrences using tuple index access
*/
public class BoltCounter implements IRichBolt {
public static final String ATTRIBUTE_WORD = "word";
public static final String ATTRIBUTE_COUNT = "count";
/**
* Prepare bolt for execution
* @param stormConf Storm configuration
* @param context Topology context
* @param collector Output collector
*/
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
/**
* Execute counting on input tuple
* @param input Input tuple containing word and count
*/
public void execute(Tuple input);
/**
* Cleanup bolt resources
*/
public void cleanup();
/**
* Declare output fields
* @param declarer Output field declarer
*/
public void declareOutputFields(OutputFieldsDeclarer declarer);
/**
* Get component configuration
* @return Configuration map
*/
public Map<String, Object> getComponentConfiguration();
}Bolts that access tuple fields using field names for better readability and maintainability.
Tokenizes sentences using tuple field name access.
/**
* Tokenizes sentences using tuple field name access
*/
public class BoltTokenizerByName implements IRichBolt {
public static final String ATTRIBUTE_WORD = "word";
/**
* Prepare bolt for execution
* @param stormConf Storm configuration
* @param context Topology context
* @param collector Output collector
*/
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
/**
* Execute tokenization using field name access
* @param input Input tuple containing text field
*/
public void execute(Tuple input);
/**
* Cleanup bolt resources
*/
public void cleanup();
/**
* Declare output fields
* @param declarer Output field declarer
*/
public void declareOutputFields(OutputFieldsDeclarer declarer);
/**
* Get component configuration
* @return Configuration map
*/
public Map<String, Object> getComponentConfiguration();
}Counts word occurrences using tuple field name access.
/**
* Counts word occurrences using tuple field name access
*/
public class BoltCounterByName implements IRichBolt {
/**
* Prepare bolt for execution
* @param stormConf Storm configuration
* @param context Topology context
* @param collector Output collector
*/
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
/**
* Execute counting using field name access
* @param input Input tuple with named fields
*/
public void execute(Tuple input);
/**
* Cleanup bolt resources
*/
public void cleanup();
/**
* Declare output fields
* @param declarer Output field declarer
*/
public void declareOutputFields(OutputFieldsDeclarer declarer);
/**
* Get component configuration
* @return Configuration map
*/
public Map<String, Object> getComponentConfiguration();
}Utility classes for wrapping test data in different formats.
Tuple wrapper for WordCount test data providing typed access to test sentences.
/**
* Tuple wrapper for WordCount test data
*/
public class WordCountDataTuple {
/**
* Array of tuples containing test sentences
*/
public static Tuple1<String>[] TUPLES;
}POJO wrapper for WordCount test data enabling object-oriented data access.
/**
* POJO wrapper for WordCount test data
*/
public class WordCountDataPojos {
/**
* Array of sentence POJOs for testing
*/
public static Sentence[] SENTENCES;
/**
* POJO representing a sentence for word count processing
*/
public static class Sentence implements Serializable {
/**
* Default constructor
*/
public Sentence();
/**
* Constructor with sentence text
* @param sentence The sentence text
*/
public Sentence(String sentence);
/**
* Get sentence text
* @return The sentence text
*/
public String getSentence();
/**
* Set sentence text
* @param sentence The sentence text
*/
public void setSentence(String sentence);
/**
* String representation of sentence
* @return The sentence text
*/
public String toString();
}
}import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.flink.storm.wordcount.operators.*;
TopologyBuilder builder = new TopologyBuilder();
// Add spout
builder.setSpout("source", new WordCountInMemorySpout());
// Add tokenizer bolt (index-based)
builder.setBolt("tokenizer", new BoltTokenizer(), 4)
.shuffleGrouping("source");
// Add counter bolt (index-based)
builder.setBolt("counter", new BoltCounter(), 4)
.fieldsGrouping("tokenizer", new Fields(BoltTokenizer.ATTRIBUTE_WORD));import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.flink.storm.wordcount.operators.*;
TopologyBuilder builder = new TopologyBuilder();
// Add spout
builder.setSpout("source", new WordCountFileSpout("input.txt"));
// Add tokenizer bolt (name-based)
builder.setBolt("tokenizer", new BoltTokenizerByName(), 4)
.shuffleGrouping("source");
// Add counter bolt (name-based)
builder.setBolt("counter", new BoltCounterByName(), 4)
.fieldsGrouping("tokenizer", new Fields(BoltTokenizerByName.ATTRIBUTE_WORD));import org.apache.flink.storm.wordcount.operators.*;
// File-based data source
WordCountFileSpout fileSpout = new WordCountFileSpout("/path/to/input.txt");
// Memory-based data source with built-in data
WordCountInMemorySpout memorySpout = new WordCountInMemorySpout();import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.storm.wrappers.*;
import org.apache.flink.storm.wordcount.operators.*;
public class CustomStormPipeline {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create spout wrapper
SpoutWrapper<String> spoutWrapper = new SpoutWrapper<>(
new WordCountInMemorySpout(),
new String[]{ Utils.DEFAULT_STREAM_ID },
-1
);
// Create bolt wrapper
BoltWrapper<String, Tuple2<String, Integer>> boltWrapper =
new BoltWrapper<>(new BoltTokenizer());
// Build processing pipeline
DataStream<String> source = env.addSource(spoutWrapper);
DataStream<Tuple2<String, Integer>> processed = source.transform(
"Tokenizer",
TypeExtractor.getForObject(new Tuple2<>("", 0)),
boltWrapper
);
// Aggregate and output
processed.keyBy(0).sum(1).print();
env.execute("Storm Processing Pipeline");
}
}import org.apache.flink.storm.wordcount.operators.WordCountDataPojos.*;
// Create sentence POJOs
Sentence sentence1 = new Sentence("Hello world from Storm");
Sentence sentence2 = new Sentence("Apache Flink processes streams");
// Access built-in test data
Sentence[] testSentences = WordCountDataPojos.SENTENCES;
for (Sentence sentence : testSentences) {
System.out.println("Processing: " + sentence.getSentence());
}public class CustomIndexBolt implements IRichBolt {
private OutputCollector collector;
public void execute(Tuple input) {
// Access by index (fast)
String text = input.getString(0);
Integer count = input.getInteger(1);
// Process and emit
collector.emit(new Values(processedText, newCount));
collector.ack(input);
}
}public class CustomNameBolt implements IRichBolt {
private OutputCollector collector;
public void execute(Tuple input) {
// Access by field name (readable)
String word = input.getStringByField("word");
Integer count = input.getIntegerByField("count");
// Process and emit
collector.emit(new Values(processedWord, newCount));
collector.ack(input);
}
}