Apache Flink Storm compatibility examples demonstrating Storm topology integration with embedded mode and full topology execution patterns.
npx @tessl/cli install tessl/maven-org-apache-flink--flink-storm-examples-2-10@1.3.0A comprehensive collection of examples demonstrating Apache Flink's Storm compatibility layer. This library enables developers to run Apache Storm topologies on Flink clusters through three main integration approaches: embedded mode (using Storm components within Flink streaming programs), full topology mode (running complete Storm topologies), and hybrid integration patterns.
pom.xml:<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-storm-examples_2.10</artifactId>
<version>1.3.3</version>
</dependency>// Core Storm interfaces
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
// Utility classes for Storm-Flink integration
import org.apache.flink.storm.util.*;
// Wrapper classes for Storm components
import org.apache.flink.storm.wrappers.SpoutWrapper;
import org.apache.flink.storm.wrappers.BoltWrapper;
// Storm API integration
import org.apache.flink.storm.api.*;
// Example classes (choose based on use case)
import org.apache.flink.storm.wordcount.SpoutSourceWordCount;
import org.apache.flink.storm.wordcount.BoltTokenizerWordCount;
import org.apache.flink.storm.wordcount.WordCountTopology;
// Flink streaming API
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.storm.wordcount.SpoutSourceWordCount;
import org.apache.flink.storm.wordcount.BoltTokenizerWordCount;
import org.apache.flink.storm.wordcount.WordCountLocal;
// Option 1: Run examples with embedded Storm components
public class FlinkStormExample {
public static void main(String[] args) throws Exception {
// Using Storm spout as data source in Flink streaming
SpoutSourceWordCount.main(new String[]{"input.txt", "output"});
// Using Storm bolt for processing in Flink streaming
BoltTokenizerWordCount.main(new String[]{"input.txt", "output"});
// Running complete Storm topology on Flink
WordCountLocal.main(new String[]{"input.txt", "output"});
}
}The library is organized around three key integration patterns:
Key components include:
AbstractLineSpout, AbstractBoltSink provide foundations for custom componentsCore utility classes providing the foundation for Storm-Flink integration, including abstract base classes, data sources, output sinks, and formatting utilities.
// Base classes
public abstract class AbstractLineSpout implements IRichSpout {
public static final String ATTRIBUTE_LINE = "line";
public abstract void nextTuple();
}
public abstract class AbstractBoltSink implements IRichBolt {
public AbstractBoltSink(OutputFormatter formatter);
protected abstract void writeExternal(String line);
}
// Data sources
public class InMemorySpout<T> extends AbstractLineSpout;
public class FileSpout extends AbstractLineSpout;
// Output formatters
public interface OutputFormatter extends Serializable {
String format(Tuple input);
}Comprehensive word count examples demonstrating various Storm-Flink integration patterns including spout-based sources, bolt-based processing, and complete topology execution.
// Main example classes
public class SpoutSourceWordCount {
public static void main(String[] args) throws Exception;
public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>>;
}
public class BoltTokenizerWordCount {
public static void main(String[] args) throws Exception;
}
public class WordCountTopology {
public static TopologyBuilder buildTopology();
public static TopologyBuilder buildTopology(boolean indexOrName);
}Production-ready Storm operators including Spouts for data ingestion and Bolts for data processing, with support for both index-based and field name-based tuple access patterns.
// Spouts
public class WordCountFileSpout extends FileSpout {
public WordCountFileSpout(String path);
}
public class WordCountInMemorySpout extends FiniteInMemorySpout {
public WordCountInMemorySpout();
}
// Bolts
public class BoltTokenizer implements IRichBolt {
public static final String ATTRIBUTE_WORD = "word";
public static final String ATTRIBUTE_COUNT = "count";
public void execute(Tuple input);
}
public class BoltCounter implements IRichBolt {
public void execute(Tuple input);
}Core topology construction utilities and remote cluster execution patterns for deploying Storm topologies on Flink clusters.
// Topology builders
public class WordCountTopology {
public static final String spoutId = "source";
public static final String tokenierzerId = "tokenizer";
public static final String counterId = "counter";
public static final String sinkId = "sink";
public static TopologyBuilder buildTopology();
public static TopologyBuilder buildTopology(boolean indexOrName);
}
public class ExclamationTopology {
public static final String spoutId = "source";
public static final String firstBoltId = "exclamation1";
public static final String secondBoltId = "exclamation2";
public static final String sinkId = "sink";
public static TopologyBuilder buildTopology();
}
// Remote execution
public class WordCountRemoteByClient {
public static final String topologyId = "Storm WordCount";
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, NotAliveException;
}
public class WordCountRemoteBySubmitter {
public static final String topologyId = "Storm WordCount";
public static void main(String[] args) throws Exception;
}Extended examples demonstrating advanced Storm-Flink integration patterns including stream splitting, joins, exclamation processing, and real-time data printing.
// Stream processing examples
public class ExclamationLocal {
public static final String topologyId = "Streaming Exclamation";
public static void main(String[] args) throws Exception;
}
public class SpoutSplitExample {
public static void main(String[] args) throws Exception;
}
public class SingleJoinExample {
public static void main(String[] args) throws Exception;
}The library includes three pre-built JAR files for cluster deployment:
Usage: bin/flink run <jar-file> [input-path] [output-path]
Use Storm components within Flink streaming programs:
SpoutWrapper<T> for data sourcesBoltWrapper<IN, OUT> for data processingExecute complete Storm topologies on Flink:
TopologyBuilder to construct Storm topologiesFlinkLocalCluster for local testingFlinkClient or Storm submitter patternstuple.getValue(0))tuple.getValueByField("word"))