Extended examples demonstrating advanced Storm-Flink integration patterns beyond basic word counting, including stream splitting with multiple outputs, text processing with exclamation marks, stream joins, and real-time data printing capabilities.
Examples demonstrating text processing with Storm topologies that add exclamation marks to text streams.
Local execution example for exclamation processing topology.
/**
* Local execution example for exclamation processing topology
*/
public class ExclamationLocal {
public static final String topologyId = "Streaming Exclamation";
/**
* Main entry point for local exclamation topology
* @param args Command line arguments for configuration
*/
public static void main(String[] args) throws Exception;
}Example showing Storm bolt usage within Flink streaming for text processing.
/**
* Shows using Storm bolt within Flink streaming program for exclamation processing
*/
public class ExclamationWithBolt {
/**
* Main entry point for bolt-based exclamation processing
* @param args Command line arguments
*/
public static void main(String[] args) throws Exception;
/**
* Map function for adding exclamation marks to text
*/
public static class ExclamationMap implements MapFunction<String, String> {
public String map(String value) throws Exception;
}
}Example showing Storm spout usage within Flink streaming for data sourcing.
/**
* Shows using Storm spout within Flink streaming program
*/
public class ExclamationWithSpout {
/**
* Main entry point for spout-based exclamation processing
* @param args Command line arguments
*/
public static void main(String[] args) throws Exception;
/**
* Map function for processing spout data
*/
public static class ExclamationMap implements MapFunction<String, String> {
public String map(String value) throws Exception;
}
}Topology builder for exclamation processing with configurable parameters.
/**
* Builder for exclamation processing topology
*/
public class ExclamationTopology {
public static final String spoutId = "source";
public static final String firstBoltId = "exclamation1";
public static final String secondBoltId = "exclamation2";
public static final String sinkId = "sink";
/**
* Build exclamation processing topology
* @return Configured TopologyBuilder
*/
public static TopologyBuilder buildTopology();
/**
* Get exclamation count configuration
* @return Number of exclamation marks to add
*/
public static int getExclamation();
/**
* Parse command line parameters
* @param args Command line arguments
* @return true if parameters valid, false otherwise
*/
public static boolean parseParameters(String[] args);
}Bolt operator that adds configurable exclamation marks to text.
/**
* Bolt that adds configurable exclamation marks to text
*/
public class ExclamationBolt implements IRichBolt {
public static final String EXCLAMATION_COUNT = "exclamation.count";
/**
* Prepare bolt for execution
* @param conf Storm configuration
* @param context Topology context
* @param collector Output collector
*/
public void prepare(Map conf, TopologyContext context, OutputCollector collector);
/**
* Cleanup bolt resources
*/
public void cleanup();
/**
* Execute exclamation processing on tuple
* @param tuple Input tuple containing text
*/
public void execute(Tuple tuple);
/**
* Declare output fields
* @param declarer Output field declarer
*/
public void declareOutputFields(OutputFieldsDeclarer declarer);
/**
* Get component configuration
* @return Configuration map
*/
public Map<String, Object> getComponentConfiguration();
}Examples demonstrating multiple output streams from Storm components with even/odd number splitting.
Example demonstrating spouts with multiple output streams and stream-specific processing.
/**
* Demonstrates spouts with multiple output streams
*/
public class SpoutSplitExample {
/**
* Main entry point for stream splitting example
* @param args Command line arguments
*/
public static void main(String[] args) throws Exception;
/**
* Map function for enriching split stream data
*/
public static class Enrich implements MapFunction<Integer, Tuple2<String, Integer>> {
/**
* Error tracking for stream processing
*/
public static boolean errorOccured = false;
public Tuple2<String, Integer> map(Integer value) throws Exception;
}
}Spout generating random numbers with separate even and odd output streams.
/**
* Spout generating random numbers with separate even/odd streams
*/
public class RandomSpout extends BaseRichSpout {
public static final String EVEN_STREAM = "even";
public static final String ODD_STREAM = "odd";
/**
* Create random spout with stream splitting
* @param split true to enable stream splitting, false for single stream
* @param seed Random seed for reproducible results
*/
public RandomSpout(boolean split, long seed);
/**
* Initialize spout
* @param conf Storm configuration
* @param context Topology context
* @param collector Spout output collector
*/
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
/**
* Emit next tuple to appropriate stream
*/
public void nextTuple();
/**
* Declare output fields for both streams
* @param declarer Output field declarer
*/
public void declareOutputFields(OutputFieldsDeclarer declarer);
}Bolt for verifying and enriching data from split streams with error tracking.
/**
* Bolt for verifying and enriching data from split streams
*/
public class VerifyAndEnrichBolt extends BaseRichBolt {
/**
* Global error tracking across bolt instances
*/
public static boolean errorOccured = false;
/**
* Create verification bolt for specific stream type
* @param evenOrOdd true for even stream processing, false for odd stream
*/
public VerifyAndEnrichBolt(boolean evenOrOdd);
/**
* Prepare bolt for execution
* @param stormConf Storm configuration
* @param context Topology context
* @param collector Output collector
*/
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
/**
* Execute verification and enrichment
* @param input Input tuple from split stream
*/
public void execute(Tuple input);
/**
* Declare output fields
* @param declarer Output field declarer
*/
public void declareOutputFields(OutputFieldsDeclarer declarer);
}Examples demonstrating stream joins in Storm topologies.
Example demonstrating joins between multiple data streams in Storm.
/**
* Example demonstrating joins in Storm topologies
*/
public class SingleJoinExample {
/**
* Main entry point for join example
* @param args Command line arguments
*/
public static void main(String[] args) throws Exception;
}Examples for printing and displaying real-time data streams.
Example for printing real-time streams with Twitter integration support.
/**
* Example for printing real-time streams (requires Twitter API credentials)
*/
public class PrintSampleStream {
/**
* Main entry point for stream printing example
* @param args Command line arguments with Twitter credentials
*/
public static void main(String[] args) throws Exception;
}Examples demonstrating remote Storm topology execution on Flink clusters.
Example showing remote topology submission using FlinkClient.
/**
* Remote topology submission using FlinkClient
*/
public class WordCountRemoteByClient {
public static final String topologyId = "Storm WordCount";
/**
* Main entry point for remote client submission
* @param args Command line arguments containing cluster configuration
*/
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, NotAliveException;
}Example showing remote topology submission using FlinkSubmitter.
/**
* Remote topology submission using FlinkSubmitter
*/
public class WordCountRemoteBySubmitter {
public static final String topologyId = "Storm WordCount";
/**
* Main entry point for remote submitter
* @param args Command line arguments
*/
public static void main(String[] args) throws Exception;
}// Local exclamation processing
ExclamationLocal.main(new String[]{});
// Embedded bolt usage
ExclamationWithBolt.main(new String[]{});
// Embedded spout usage
ExclamationWithSpout.main(new String[]{});import org.apache.flink.storm.wordcount.*;
// Submit topology to remote Flink cluster using client
WordCountRemoteByClient.main(new String[]{
"localhost", "6123", "input.txt", "output.txt"
});
// Submit topology using FlinkSubmitter pattern
WordCountRemoteBySubmitter.main(new String[]{
"input.txt", "output.txt"
});// Storm exception types for remote execution
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.generated.NotAliveException;
// Core Storm tuple and field types
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
// Flink tuple types
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
// Map and function interfaces
import java.util.Map;
import java.io.Serializable;
// POJO classes for data exchange
public static class Sentence implements Serializable {
private String sentence;
public Sentence();
public Sentence(String sentence);
public String getSentence();
public void setSentence(String sentence);
public String toString();
}import org.apache.storm.topology.TopologyBuilder;
import org.apache.flink.storm.exclamation.ExclamationTopology;
// Build exclamation topology
TopologyBuilder builder = ExclamationTopology.buildTopology();
// Configure exclamation count
int count = ExclamationTopology.getExclamation();import org.apache.storm.topology.TopologyBuilder;
import org.apache.flink.storm.split.operators.*;
TopologyBuilder builder = new TopologyBuilder();
// Add random spout with stream splitting
RandomSpout spout = new RandomSpout(true, 12345L);
builder.setSpout("random", spout);
// Process even stream
VerifyAndEnrichBolt evenBolt = new VerifyAndEnrichBolt(true);
builder.setBolt("even-processor", evenBolt)
.shuffleGrouping("random", RandomSpout.EVEN_STREAM);
// Process odd stream
VerifyAndEnrichBolt oddBolt = new VerifyAndEnrichBolt(false);
builder.setBolt("odd-processor", oddBolt)
.shuffleGrouping("random", RandomSpout.ODD_STREAM);import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.storm.exclamation.operators.ExclamationBolt;
import org.apache.flink.storm.wrappers.BoltWrapper;
public class CustomExclamation {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create data source
DataStream<String> source = env.fromElements(
"Hello", "World", "Storm", "Flink"
);
// Apply exclamation bolt
DataStream<String> processed = source.transform(
"ExclamationBolt",
BasicTypeInfo.STRING_TYPE_INFO,
new BoltWrapper<String, String>(new ExclamationBolt())
);
processed.print();
env.execute("Custom Exclamation");
}
}import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.storm.split.SpoutSplitExample;
public class MultiStreamExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create random number source
DataStream<Integer> numbers = env.addSource(
new SpoutWrapper<>(new RandomSpout(false, 42),
new String[]{ Utils.DEFAULT_STREAM_ID }, -1)
);
// Split stream based on even/odd
SplitStream<Integer> split = numbers.split(new OutputSelector<Integer>() {
@Override
public Iterable<String> select(Integer value) {
return value % 2 == 0 ?
Collections.singletonList("even") :
Collections.singletonList("odd");
}
});
// Process even numbers
DataStream<Integer> evenStream = split.select("even");
evenStream.map(x -> "Even: " + x).print();
// Process odd numbers
DataStream<Integer> oddStream = split.select("odd");
oddStream.map(x -> "Odd: " + x).print();
env.execute("Multi-Stream Processing");
}
}import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.time.Time;
public class StreamJoinExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create two data streams
DataStream<Tuple2<String, Integer>> stream1 = env.fromElements(
new Tuple2<>("a", 1), new Tuple2<>("b", 2)
);
DataStream<Tuple2<String, String>> stream2 = env.fromElements(
new Tuple2<>("a", "apple"), new Tuple2<>("b", "banana")
);
// Join streams by key within time window
DataStream<Tuple3<String, Integer, String>> joined = stream1
.join(stream2)
.where(t -> t.f0)
.equalTo(t -> t.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply((t1, t2) -> new Tuple3<>(t1.f0, t1.f1, t2.f1));
joined.print();
env.execute("Stream Join");
}
}