or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

additional-examples.mdindex.mdstorm-operators.mdutility-classes.mdwordcount-examples.md

utility-classes.mddocs/

0

# Utility Classes and Base Components

1

2

Core utility classes providing the foundation for Storm-Flink integration. These classes include abstract base classes for creating custom components, data sources for various input scenarios, output sinks with configurable formatting, and utilities for tuple formatting.

3

4

## Capabilities

5

6

### Abstract Base Classes

7

8

Foundation classes for building custom Storm components compatible with Flink execution.

9

10

#### AbstractLineSpout

11

12

Base class for creating line-based data source spouts.

13

14

```java { .api }

15

/**

16

* Abstract base class for spouts that read line-based text data

17

*/

18

public abstract class AbstractLineSpout implements IRichSpout {

19

public static final String ATTRIBUTE_LINE = "line";

20

21

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector);

22

public void close();

23

public void activate();

24

public void deactivate();

25

public void ack(Object msgId);

26

public void fail(Object msgId);

27

public void declareOutputFields(OutputFieldsDeclarer declarer);

28

public Map<String, Object> getComponentConfiguration();

29

30

// Implement in subclasses

31

public abstract void nextTuple();

32

}

33

```

34

35

#### AbstractBoltSink

36

37

Base class for creating output sink bolts with configurable formatting.

38

39

```java { .api }

40

/**

41

* Abstract base class for bolt sinks that write processed data

42

*/

43

public abstract class AbstractBoltSink implements IRichBolt {

44

public AbstractBoltSink(OutputFormatter formatter);

45

46

public final void prepare(Map stormConf, TopologyContext context, OutputCollector collector);

47

public final void execute(Tuple input);

48

public void cleanup();

49

public final void declareOutputFields(OutputFieldsDeclarer declarer);

50

public Map<String, Object> getComponentConfiguration();

51

52

// Implement in subclasses

53

protected abstract void prepareSimple(Map<?, ?> stormConf, TopologyContext context);

54

protected abstract void writeExternal(String line);

55

}

56

```

57

58

### Data Source Spouts

59

60

Spouts for reading data from various sources including memory arrays and files.

61

62

#### InMemorySpout

63

64

Spout for reading data from in-memory arrays.

65

66

```java { .api }

67

/**

68

* Spout for reading data from in-memory arrays

69

* @param <T> Type of data elements

70

*/

71

public class InMemorySpout<T> extends AbstractLineSpout {

72

/**

73

* Create spout with data source array

74

* @param source Array of data elements to emit

75

*/

76

public InMemorySpout(T[] source);

77

78

public void nextTuple();

79

}

80

```

81

82

#### FiniteInMemorySpout

83

84

Memory-based spout that terminates when data is exhausted.

85

86

```java { .api }

87

/**

88

* Memory spout that terminates when data is exhausted

89

*/

90

public class FiniteInMemorySpout extends InMemorySpout<String> implements FiniteSpout {

91

/**

92

* Create finite spout with string data source

93

* @param source Array of strings to emit

94

*/

95

public FiniteInMemorySpout(String[] source);

96

97

/**

98

* Check if spout has reached end of data

99

* @return true if no more data available

100

*/

101

public boolean reachedEnd();

102

}

103

```

104

105

#### FileSpout

106

107

Spout for reading lines from local files.

108

109

```java { .api }

110

/**

111

* Spout for reading lines from local files

112

*/

113

public class FileSpout extends AbstractLineSpout {

114

public static final String INPUT_FILE_PATH = "input.path";

115

116

/**

117

* Create file spout with path from configuration

118

*/

119

public FileSpout();

120

121

/**

122

* Create file spout with specific path

123

* @param path Path to input file

124

*/

125

public FileSpout(String path);

126

127

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector);

128

public void close();

129

public void nextTuple();

130

}

131

```

132

133

#### FiniteFileSpout

134

135

File-based spout that terminates at end of file.

136

137

```java { .api }

138

/**

139

* File spout that terminates at end of file

140

*/

141

public class FiniteFileSpout extends FileSpout implements FiniteSpout {

142

public FiniteFileSpout();

143

public FiniteFileSpout(String path);

144

145

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector);

146

public void nextTuple();

147

public boolean reachedEnd();

148

}

149

```

150

151

### Output Sinks

152

153

Configurable output sinks for writing processed data to various destinations.

154

155

#### BoltPrintSink

156

157

Sink that prints tuples to standard output.

158

159

```java { .api }

160

/**

161

* Sink that prints tuples to standard output

162

*/

163

public class BoltPrintSink extends AbstractBoltSink {

164

/**

165

* Create print sink with formatter

166

* @param formatter Formatter for tuple output

167

*/

168

public BoltPrintSink(OutputFormatter formatter);

169

170

protected void prepareSimple(Map stormConf, TopologyContext context);

171

protected void writeExternal(String line);

172

}

173

```

174

175

#### BoltFileSink

176

177

Sink that writes tuples to files.

178

179

```java { .api }

180

/**

181

* Sink that writes tuples to files

182

*/

183

public class BoltFileSink extends AbstractBoltSink {

184

/**

185

* Create file sink with path and default formatter

186

* @param path Output file path

187

*/

188

public BoltFileSink(String path);

189

190

/**

191

* Create file sink with path and custom formatter

192

* @param path Output file path

193

* @param formatter Formatter for tuple output

194

*/

195

public BoltFileSink(String path, OutputFormatter formatter);

196

197

protected void prepareSimple(Map stormConf, TopologyContext context);

198

protected void writeExternal(String line);

199

public void cleanup();

200

}

201

```

202

203

### Output Formatters

204

205

Interfaces and implementations for formatting tuples for output.

206

207

#### OutputFormatter Interface

208

209

Interface for custom tuple formatting.

210

211

```java { .api }

212

/**

213

* Interface for custom tuple formatting

214

*/

215

public interface OutputFormatter extends Serializable {

216

/**

217

* Format tuple for output

218

* @param input Tuple to format

219

* @return Formatted string representation

220

*/

221

String format(Tuple input);

222

}

223

```

224

225

#### SimpleOutputFormatter

226

227

Formats single-field tuples as strings.

228

229

```java { .api }

230

/**

231

* Formats single-field tuples as strings

232

*/

233

public class SimpleOutputFormatter implements OutputFormatter {

234

/**

235

* Format single-field tuple as string

236

* @param input Tuple with single field

237

* @return String representation of first field

238

*/

239

public String format(Tuple input);

240

}

241

```

242

243

#### TupleOutputFormatter

244

245

Formats multi-field tuples in (field1,field2,...) format.

246

247

```java { .api }

248

/**

249

* Formats multi-field tuples in (field1,field2,...) format

250

*/

251

public class TupleOutputFormatter implements OutputFormatter {

252

/**

253

* Format multi-field tuple in parentheses format

254

* @param input Tuple with multiple fields

255

* @return String in format (field1,field2,...)

256

*/

257

public String format(Tuple input);

258

}

259

```

260

261

## Usage Examples

262

263

### Creating Custom Data Sources

264

265

```java

266

import org.apache.flink.storm.util.InMemorySpout;

267

268

// Create spout with string data

269

String[] data = {"Hello", "World", "Storm", "Flink"};

270

InMemorySpout<String> spout = new InMemorySpout<>(data);

271

272

// Create finite spout that stops automatically

273

FiniteInMemorySpout finiteSpout = new FiniteInMemorySpout(data);

274

```

275

276

### Creating File-Based Sources

277

278

```java

279

import org.apache.flink.storm.util.FileSpout;

280

import org.apache.flink.storm.util.FiniteFileSpout;

281

282

// Read from specific file

283

FileSpout fileSpout = new FileSpout("/path/to/input.txt");

284

285

// Read file and stop at end

286

FiniteFileSpout finiteFileSpout = new FiniteFileSpout("/path/to/input.txt");

287

```

288

289

### Creating Output Sinks

290

291

```java

292

import org.apache.flink.storm.util.*;

293

294

// Print to console with simple formatting

295

OutputFormatter simpleFormatter = new SimpleOutputFormatter();

296

BoltPrintSink printSink = new BoltPrintSink(simpleFormatter);

297

298

// Write to file with tuple formatting

299

OutputFormatter tupleFormatter = new TupleOutputFormatter();

300

BoltFileSink fileSink = new BoltFileSink("/path/to/output.txt", tupleFormatter);

301

```

302

303

### Building Custom Components

304

305

```java

306

import org.apache.flink.storm.util.AbstractLineSpout;

307

308

public class CustomSpout extends AbstractLineSpout {

309

private String[] data;

310

private int index = 0;

311

312

public CustomSpout(String[] data) {

313

this.data = data;

314

}

315

316

@Override

317

public void nextTuple() {

318

if (index < data.length) {

319

collector.emit(new Values(data[index++]));

320

}

321

}

322

}

323

```