0
# Utility Classes and Base Components
1
2
Core utility classes providing the foundation for Storm-Flink integration. These classes include abstract base classes for creating custom components, data sources for various input scenarios, output sinks with configurable formatting, and utilities for tuple formatting.
3
4
## Capabilities
5
6
### Abstract Base Classes
7
8
Foundation classes for building custom Storm components compatible with Flink execution.
9
10
#### AbstractLineSpout
11
12
Base class for creating line-based data source spouts.
13
14
```java { .api }
15
/**
16
* Abstract base class for spouts that read line-based text data
17
*/
18
public abstract class AbstractLineSpout implements IRichSpout {
19
public static final String ATTRIBUTE_LINE = "line";
20
21
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
22
public void close();
23
public void activate();
24
public void deactivate();
25
public void ack(Object msgId);
26
public void fail(Object msgId);
27
public void declareOutputFields(OutputFieldsDeclarer declarer);
28
public Map<String, Object> getComponentConfiguration();
29
30
// Implement in subclasses
31
public abstract void nextTuple();
32
}
33
```
34
35
#### AbstractBoltSink
36
37
Base class for creating output sink bolts with configurable formatting.
38
39
```java { .api }
40
/**
41
* Abstract base class for bolt sinks that write processed data
42
*/
43
public abstract class AbstractBoltSink implements IRichBolt {
44
public AbstractBoltSink(OutputFormatter formatter);
45
46
public final void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
47
public final void execute(Tuple input);
48
public void cleanup();
49
public final void declareOutputFields(OutputFieldsDeclarer declarer);
50
public Map<String, Object> getComponentConfiguration();
51
52
// Implement in subclasses
53
protected abstract void prepareSimple(Map<?, ?> stormConf, TopologyContext context);
54
protected abstract void writeExternal(String line);
55
}
56
```
57
58
### Data Source Spouts
59
60
Spouts for reading data from various sources including memory arrays and files.
61
62
#### InMemorySpout
63
64
Spout for reading data from in-memory arrays.
65
66
```java { .api }
67
/**
68
* Spout for reading data from in-memory arrays
69
* @param <T> Type of data elements
70
*/
71
public class InMemorySpout<T> extends AbstractLineSpout {
72
/**
73
* Create spout with data source array
74
* @param source Array of data elements to emit
75
*/
76
public InMemorySpout(T[] source);
77
78
public void nextTuple();
79
}
80
```
81
82
#### FiniteInMemorySpout
83
84
Memory-based spout that terminates when data is exhausted.
85
86
```java { .api }
87
/**
88
* Memory spout that terminates when data is exhausted
89
*/
90
public class FiniteInMemorySpout extends InMemorySpout<String> implements FiniteSpout {
91
/**
92
* Create finite spout with string data source
93
* @param source Array of strings to emit
94
*/
95
public FiniteInMemorySpout(String[] source);
96
97
/**
98
* Check if spout has reached end of data
99
* @return true if no more data available
100
*/
101
public boolean reachedEnd();
102
}
103
```
104
105
#### FileSpout
106
107
Spout for reading lines from local files.
108
109
```java { .api }
110
/**
111
* Spout for reading lines from local files
112
*/
113
public class FileSpout extends AbstractLineSpout {
114
public static final String INPUT_FILE_PATH = "input.path";
115
116
/**
117
* Create file spout with path from configuration
118
*/
119
public FileSpout();
120
121
/**
122
* Create file spout with specific path
123
* @param path Path to input file
124
*/
125
public FileSpout(String path);
126
127
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
128
public void close();
129
public void nextTuple();
130
}
131
```
132
133
#### FiniteFileSpout
134
135
File-based spout that terminates at end of file.
136
137
```java { .api }
138
/**
139
* File spout that terminates at end of file
140
*/
141
public class FiniteFileSpout extends FileSpout implements FiniteSpout {
142
public FiniteFileSpout();
143
public FiniteFileSpout(String path);
144
145
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
146
public void nextTuple();
147
public boolean reachedEnd();
148
}
149
```
150
151
### Output Sinks
152
153
Configurable output sinks for writing processed data to various destinations.
154
155
#### BoltPrintSink
156
157
Sink that prints tuples to standard output.
158
159
```java { .api }
160
/**
161
* Sink that prints tuples to standard output
162
*/
163
public class BoltPrintSink extends AbstractBoltSink {
164
/**
165
* Create print sink with formatter
166
* @param formatter Formatter for tuple output
167
*/
168
public BoltPrintSink(OutputFormatter formatter);
169
170
protected void prepareSimple(Map stormConf, TopologyContext context);
171
protected void writeExternal(String line);
172
}
173
```
174
175
#### BoltFileSink
176
177
Sink that writes tuples to files.
178
179
```java { .api }
180
/**
181
* Sink that writes tuples to files
182
*/
183
public class BoltFileSink extends AbstractBoltSink {
184
/**
185
* Create file sink with path and default formatter
186
* @param path Output file path
187
*/
188
public BoltFileSink(String path);
189
190
/**
191
* Create file sink with path and custom formatter
192
* @param path Output file path
193
* @param formatter Formatter for tuple output
194
*/
195
public BoltFileSink(String path, OutputFormatter formatter);
196
197
protected void prepareSimple(Map stormConf, TopologyContext context);
198
protected void writeExternal(String line);
199
public void cleanup();
200
}
201
```
202
203
### Output Formatters
204
205
Interfaces and implementations for formatting tuples for output.
206
207
#### OutputFormatter Interface
208
209
Interface for custom tuple formatting.
210
211
```java { .api }
212
/**
213
* Interface for custom tuple formatting
214
*/
215
public interface OutputFormatter extends Serializable {
216
/**
217
* Format tuple for output
218
* @param input Tuple to format
219
* @return Formatted string representation
220
*/
221
String format(Tuple input);
222
}
223
```
224
225
#### SimpleOutputFormatter
226
227
Formats single-field tuples as strings.
228
229
```java { .api }
230
/**
231
* Formats single-field tuples as strings
232
*/
233
public class SimpleOutputFormatter implements OutputFormatter {
234
/**
235
* Format single-field tuple as string
236
* @param input Tuple with single field
237
* @return String representation of first field
238
*/
239
public String format(Tuple input);
240
}
241
```
242
243
#### TupleOutputFormatter
244
245
Formats multi-field tuples in (field1,field2,...) format.
246
247
```java { .api }
248
/**
249
* Formats multi-field tuples in (field1,field2,...) format
250
*/
251
public class TupleOutputFormatter implements OutputFormatter {
252
/**
253
* Format multi-field tuple in parentheses format
254
* @param input Tuple with multiple fields
255
* @return String in format (field1,field2,...)
256
*/
257
public String format(Tuple input);
258
}
259
```
260
261
## Usage Examples
262
263
### Creating Custom Data Sources
264
265
```java
266
import org.apache.flink.storm.util.InMemorySpout;
267
268
// Create spout with string data
269
String[] data = {"Hello", "World", "Storm", "Flink"};
270
InMemorySpout<String> spout = new InMemorySpout<>(data);
271
272
// Create finite spout that stops automatically
273
FiniteInMemorySpout finiteSpout = new FiniteInMemorySpout(data);
274
```
275
276
### Creating File-Based Sources
277
278
```java
279
import org.apache.flink.storm.util.FileSpout;
280
import org.apache.flink.storm.util.FiniteFileSpout;
281
282
// Read from specific file
283
FileSpout fileSpout = new FileSpout("/path/to/input.txt");
284
285
// Read file and stop at end
286
FiniteFileSpout finiteFileSpout = new FiniteFileSpout("/path/to/input.txt");
287
```
288
289
### Creating Output Sinks
290
291
```java
292
import org.apache.flink.storm.util.*;
293
294
// Print to console with simple formatting
295
OutputFormatter simpleFormatter = new SimpleOutputFormatter();
296
BoltPrintSink printSink = new BoltPrintSink(simpleFormatter);
297
298
// Write to file with tuple formatting
299
OutputFormatter tupleFormatter = new TupleOutputFormatter();
300
BoltFileSink fileSink = new BoltFileSink("/path/to/output.txt", tupleFormatter);
301
```
302
303
### Building Custom Components
304
305
```java
306
import org.apache.flink.storm.util.AbstractLineSpout;
307
308
public class CustomSpout extends AbstractLineSpout {
309
private String[] data;
310
private int index = 0;
311
312
public CustomSpout(String[] data) {
313
this.data = data;
314
}
315
316
@Override
317
public void nextTuple() {
318
if (index < data.length) {
319
collector.emit(new Values(data[index++]));
320
}
321
}
322
}
323
```