Comprehensive word count examples demonstrating various Storm-Flink integration patterns. These examples show embedded mode usage (Storm components within Flink streaming), complete topology execution, and different data access patterns including index-based, field name-based, and POJO-based approaches.
Examples integrating individual Storm components within Flink streaming programs.
Word count example using Storm spout as data source within Flink streaming program.
/**
* Word count using Storm spout as data source in Flink streaming
*/
public class SpoutSourceWordCount {
/**
* Main entry point for spout-based word count
* @param args Command line arguments: [input-path] [output-path]
*/
public static void main(String[] args) throws Exception;
/**
* Tokenizer that splits sentences into words
*/
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception;
}
}Usage Example:
// Run with default data
SpoutSourceWordCount.main(new String[]{});
// Run with file input/output
SpoutSourceWordCount.main(new String[]{"input.txt", "output.txt"});Word count example using Storm bolt for tokenization within Flink streaming program.
/**
* Word count using Storm bolt for tokenization in Flink streaming
*/
public class BoltTokenizerWordCount {
/**
* Main entry point for bolt-based word count
* @param args Command line arguments: [input-path] [output-path]
*/
public static void main(String[] args) throws Exception;
}Word count examples demonstrating field name-based tuple access patterns.
/**
* Word count accessing tuple fields by name rather than index
*/
public class BoltTokenizerWordCountWithNames {
public static void main(String[] args) throws Exception;
}
/**
* Word count using POJO input types with field name access
*/
public class BoltTokenizerWordCountPojo {
public static void main(String[] args) throws Exception;
}Examples executing complete Storm topologies on Flink clusters.
Examples for local testing and development.
/**
* Local execution of complete Storm topology for word counting
*/
public class WordCountLocal {
public static final String topologyId = "Storm WordCount";
/**
* Main entry point for local topology execution
* @param args Command line arguments: [input-path] [output-path]
*/
public static void main(String[] args) throws Exception;
}
/**
* Local topology execution using field names instead of indices
*/
public class WordCountLocalByName {
/**
* Main entry point for local execution with field names
* @param args Command line arguments: [input-path] [output-path]
*/
public static void main(String[] args) throws Exception;
}Examples for submitting topologies to remote Flink clusters.
/**
* Remote cluster submission using FlinkClient
*/
public class WordCountRemoteByClient {
public static final String topologyId = "Storm WordCount";
/**
* Submit topology to remote cluster via client
* @param args Command line arguments: [input-path] [output-path]
*/
public static void main(String[] args) throws Exception;
}
/**
* Remote cluster submission using Storm submitter pattern
*/
public class WordCountRemoteBySubmitter {
/**
* Submit topology to remote cluster via submitter
* @param args Command line arguments: [input-path] [output-path]
*/
public static void main(String[] args) throws Exception;
}Utility for constructing word count Storm topologies with various configuration options.
Builder for word count Storm topologies supporting both index-based and field name-based access patterns.
/**
* Builder for word count Storm topologies
*/
public class WordCountTopology {
public static final String spoutId = "source";
public static final String tokenierzerId = "tokenizer";
public static final String counterId = "counter";
public static final String sinkId = "sink";
/**
* Build topology with default index-based access
* @return Configured TopologyBuilder
*/
public static TopologyBuilder buildTopology();
/**
* Build topology with configurable access pattern
* @param indexOrName true for index-based access, false for field name-based
* @return Configured TopologyBuilder
*/
public static TopologyBuilder buildTopology(boolean indexOrName);
/**
* Parse command line parameters
* @param args Command line arguments
* @return true if parameters valid, false otherwise
*/
static boolean parseParameters(String[] args);
}Usage Example:
import org.apache.storm.topology.TopologyBuilder;
import org.apache.flink.storm.wordcount.WordCountTopology;
// Build topology with index-based access
TopologyBuilder builder = WordCountTopology.buildTopology();
// Build topology with field name-based access
TopologyBuilder builderByName = WordCountTopology.buildTopology(false);Integrating Storm components within Flink streaming programs:
import org.apache.flink.storm.wrappers.SpoutWrapper;
import org.apache.flink.storm.wrappers.BoltWrapper;
import org.apache.storm.utils.Utils;
// Using Storm spout in Flink streaming
DataStream<String> source = env.addSource(
new SpoutWrapper<String>(new WordCountFileSpout("input.txt"),
new String[] { Utils.DEFAULT_STREAM_ID }, -1),
TypeExtractor.getForClass(String.class)
).setParallelism(1);
// Using Storm bolt in Flink streaming
DataStream<Tuple2<String, Integer>> tokenized = source.transform(
"BoltTokenizer",
TypeExtractor.getForObject(new Tuple2<String, Integer>("", 0)),
new BoltWrapper<String, Tuple2<String, Integer>>(new BoltTokenizer())
);Building and executing complete Storm topologies:
import org.apache.storm.topology.TopologyBuilder;
import org.apache.flink.storm.api.FlinkTopology;
import org.apache.flink.storm.api.FlinkLocalCluster;
// Build topology
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("source", new WordCountInMemorySpout());
builder.setBolt("tokenizer", new BoltTokenizer(), 4).shuffleGrouping("source");
builder.setBolt("counter", new BoltCounter(), 4).fieldsGrouping("tokenizer", new Fields("word"));
builder.setBolt("sink", new BoltPrintSink(new TupleOutputFormatter())).shuffleGrouping("counter");
// Submit to local cluster
FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
cluster.submitTopology("WordCount", new Config(), FlinkTopology.createTopology(builder));Accessing tuple fields by position:
public class IndexBasedBolt implements IRichBolt {
public void execute(Tuple input) {
String word = input.getString(0); // First field
Integer count = input.getInteger(1); // Second field
// Process data...
}
}Accessing tuple fields by name:
public class NameBasedBolt implements IRichBolt {
public void execute(Tuple input) {
String word = input.getStringByField("word");
Integer count = input.getIntegerByField("count");
// Process data...
}
}Using Plain Old Java Objects:
public class WordCountPojo {
public static class Sentence implements Serializable {
private String sentence;
public Sentence() {}
public Sentence(String sentence) { this.sentence = sentence; }
public String getSentence() { return sentence; }
public void setSentence(String sentence) { this.sentence = sentence; }
}
}// Embedded mode examples
SpoutSourceWordCount.main(new String[]{"input.txt", "output.txt"});
BoltTokenizerWordCount.main(new String[]{"input.txt", "output.txt"});
// Full topology examples
WordCountLocal.main(new String[]{"input.txt", "output.txt"});
WordCountRemoteByClient.main(new String[]{"input.txt", "output.txt"});
// Field name variants
BoltTokenizerWordCountWithNames.main(new String[]{"input.txt", "output.txt"});
WordCountLocalByName.main(new String[]{"input.txt", "output.txt"});import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.storm.wordcount.operators.*;
public class CustomWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create data source
DataStream<String> source = env.addSource(
new SpoutWrapper<>(new WordCountInMemorySpout(),
new String[]{ Utils.DEFAULT_STREAM_ID }, -1)
);
// Process with Storm bolt
DataStream<Tuple2<String, Integer>> processed = source.transform(
"Tokenizer",
TypeExtractor.getForObject(new Tuple2<>("", 0)),
new BoltWrapper<>(new BoltTokenizer())
);
// Aggregate and output
processed.keyBy(0).sum(1).print();
env.execute("Custom Word Count");
}
}