0
# Apache Flink Storm Examples
1
2
A comprehensive collection of examples demonstrating Apache Flink's Storm compatibility layer. This library enables developers to run Apache Storm topologies on Flink clusters through three main integration approaches: embedded mode (using Storm components within Flink streaming programs), full topology mode (running complete Storm topologies), and hybrid integration patterns.
3
4
## Package Information
5
6
- **Package Name**: flink-storm-examples_2.10
7
- **Package Type**: Maven
8
- **Language**: Java
9
- **Installation**: Add dependency to your Maven `pom.xml`:
10
11
```xml
12
<dependency>
13
<groupId>org.apache.flink</groupId>
14
<artifactId>flink-storm-examples_2.10</artifactId>
15
<version>1.3.3</version>
16
</dependency>
17
```
18
19
## Core Imports
20
21
```java
22
// Core Storm interfaces
23
import org.apache.storm.topology.IRichSpout;
24
import org.apache.storm.topology.IRichBolt;
25
import org.apache.storm.topology.TopologyBuilder;
26
import org.apache.storm.topology.OutputFieldsDeclarer;
27
import org.apache.storm.task.TopologyContext;
28
import org.apache.storm.task.OutputCollector;
29
import org.apache.storm.spout.SpoutOutputCollector;
30
import org.apache.storm.tuple.Tuple;
31
import org.apache.storm.tuple.Fields;
32
import org.apache.storm.tuple.Values;
33
34
// Utility classes for Storm-Flink integration
35
import org.apache.flink.storm.util.*;
36
37
// Wrapper classes for Storm components
38
import org.apache.flink.storm.wrappers.SpoutWrapper;
39
import org.apache.flink.storm.wrappers.BoltWrapper;
40
41
// Storm API integration
42
import org.apache.flink.storm.api.*;
43
44
// Example classes (choose based on use case)
45
import org.apache.flink.storm.wordcount.SpoutSourceWordCount;
46
import org.apache.flink.storm.wordcount.BoltTokenizerWordCount;
47
import org.apache.flink.storm.wordcount.WordCountTopology;
48
49
// Flink streaming API
50
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
51
import org.apache.flink.streaming.api.datastream.DataStream;
52
import org.apache.flink.api.java.tuple.Tuple2;
53
```
54
55
## Basic Usage
56
57
```java
58
import org.apache.flink.storm.wordcount.SpoutSourceWordCount;
59
import org.apache.flink.storm.wordcount.BoltTokenizerWordCount;
60
import org.apache.flink.storm.wordcount.WordCountLocal;
61
62
// Option 1: Run examples with embedded Storm components
63
public class FlinkStormExample {
64
public static void main(String[] args) throws Exception {
65
// Using Storm spout as data source in Flink streaming
66
SpoutSourceWordCount.main(new String[]{"input.txt", "output"});
67
68
// Using Storm bolt for processing in Flink streaming
69
BoltTokenizerWordCount.main(new String[]{"input.txt", "output"});
70
71
// Running complete Storm topology on Flink
72
WordCountLocal.main(new String[]{"input.txt", "output"});
73
}
74
}
75
```
76
77
## Architecture
78
79
The library is organized around three key integration patterns:
80
81
- **Embedded Mode**: Integrate individual Storm components (Spouts/Bolts) within Flink streaming programs using wrapper classes
82
- **Full Topology Mode**: Execute complete Storm topologies on Flink using topology builders and execution environments
83
- **Utility Framework**: Base classes and formatters for creating custom Storm-Flink integrations
84
85
Key components include:
86
87
- **Base Classes**: `AbstractLineSpout`, `AbstractBoltSink` provide foundations for custom components
88
- **Data Sources**: File-based and memory-based spouts for various input scenarios
89
- **Output Sinks**: Configurable output formatting with console and file writing capabilities
90
- **Storm Operators**: Production-ready Spouts and Bolts for common data processing tasks
91
- **Topology Builders**: Utilities for constructing complete Storm topologies with Flink execution
92
93
## Capabilities
94
95
### Utility Classes and Base Components
96
97
Core utility classes providing the foundation for Storm-Flink integration, including abstract base classes, data sources, output sinks, and formatting utilities.
98
99
```java { .api }
100
// Base classes
101
public abstract class AbstractLineSpout implements IRichSpout {
102
public static final String ATTRIBUTE_LINE = "line";
103
public abstract void nextTuple();
104
}
105
106
public abstract class AbstractBoltSink implements IRichBolt {
107
public AbstractBoltSink(OutputFormatter formatter);
108
protected abstract void writeExternal(String line);
109
}
110
111
// Data sources
112
public class InMemorySpout<T> extends AbstractLineSpout;
113
public class FileSpout extends AbstractLineSpout;
114
115
// Output formatters
116
public interface OutputFormatter extends Serializable {
117
String format(Tuple input);
118
}
119
```
120
121
[Utility Classes](./utility-classes.md)
122
123
### Word Count Examples
124
125
Comprehensive word count examples demonstrating various Storm-Flink integration patterns including spout-based sources, bolt-based processing, and complete topology execution.
126
127
```java { .api }
128
// Main example classes
129
public class SpoutSourceWordCount {
130
public static void main(String[] args) throws Exception;
131
public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>>;
132
}
133
134
public class BoltTokenizerWordCount {
135
public static void main(String[] args) throws Exception;
136
}
137
138
public class WordCountTopology {
139
public static TopologyBuilder buildTopology();
140
public static TopologyBuilder buildTopology(boolean indexOrName);
141
}
142
```
143
144
[Word Count Examples](./wordcount-examples.md)
145
146
### Storm Operators
147
148
Production-ready Storm operators including Spouts for data ingestion and Bolts for data processing, with support for both index-based and field name-based tuple access patterns.
149
150
```java { .api }
151
// Spouts
152
public class WordCountFileSpout extends FileSpout {
153
public WordCountFileSpout(String path);
154
}
155
156
public class WordCountInMemorySpout extends FiniteInMemorySpout {
157
public WordCountInMemorySpout();
158
}
159
160
// Bolts
161
public class BoltTokenizer implements IRichBolt {
162
public static final String ATTRIBUTE_WORD = "word";
163
public static final String ATTRIBUTE_COUNT = "count";
164
public void execute(Tuple input);
165
}
166
167
public class BoltCounter implements IRichBolt {
168
public void execute(Tuple input);
169
}
170
```
171
172
[Storm Operators](./storm-operators.md)
173
174
### Topology Builders and Remote Execution
175
176
Core topology construction utilities and remote cluster execution patterns for deploying Storm topologies on Flink clusters.
177
178
```java { .api }
179
// Topology builders
180
public class WordCountTopology {
181
public static final String spoutId = "source";
182
public static final String tokenierzerId = "tokenizer";
183
public static final String counterId = "counter";
184
public static final String sinkId = "sink";
185
186
public static TopologyBuilder buildTopology();
187
public static TopologyBuilder buildTopology(boolean indexOrName);
188
}
189
190
public class ExclamationTopology {
191
public static final String spoutId = "source";
192
public static final String firstBoltId = "exclamation1";
193
public static final String secondBoltId = "exclamation2";
194
public static final String sinkId = "sink";
195
196
public static TopologyBuilder buildTopology();
197
}
198
199
// Remote execution
200
public class WordCountRemoteByClient {
201
public static final String topologyId = "Storm WordCount";
202
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, NotAliveException;
203
}
204
205
public class WordCountRemoteBySubmitter {
206
public static final String topologyId = "Storm WordCount";
207
public static void main(String[] args) throws Exception;
208
}
209
```
210
211
### Additional Examples
212
213
Extended examples demonstrating advanced Storm-Flink integration patterns including stream splitting, joins, exclamation processing, and real-time data printing.
214
215
```java { .api }
216
// Stream processing examples
217
public class ExclamationLocal {
218
public static final String topologyId = "Streaming Exclamation";
219
public static void main(String[] args) throws Exception;
220
}
221
222
public class SpoutSplitExample {
223
public static void main(String[] args) throws Exception;
224
}
225
226
public class SingleJoinExample {
227
public static void main(String[] args) throws Exception;
228
}
229
```
230
231
[Additional Examples](./additional-examples.md)
232
233
## Deployment
234
235
The library includes three pre-built JAR files for cluster deployment:
236
237
1. **WordCount-SpoutSource.jar** - Spout-based word count example
238
2. **WordCount-BoltTokenizer.jar** - Bolt-based word count example
239
3. **WordCount-StormTopology.jar** - Complete Storm topology example
240
241
**Usage**: `bin/flink run <jar-file> [input-path] [output-path]`
242
243
## Integration Patterns
244
245
### Embedded Mode
246
Use Storm components within Flink streaming programs:
247
- Wrap Spouts with `SpoutWrapper<T>` for data sources
248
- Wrap Bolts with `BoltWrapper<IN, OUT>` for data processing
249
- Combine with native Flink operations in streaming pipelines
250
251
### Full Topology Mode
252
Execute complete Storm topologies on Flink:
253
- Use `TopologyBuilder` to construct Storm topologies
254
- Submit via `FlinkLocalCluster` for local testing
255
- Deploy to remote clusters using `FlinkClient` or Storm submitter patterns
256
257
### Data Access Patterns
258
- **Index-based**: Access tuple fields by position (e.g., `tuple.getValue(0)`)
259
- **Name-based**: Access tuple fields by name (e.g., `tuple.getValueByField("word")`)
260
- **POJO-based**: Use Plain Old Java Objects with field name mapping