or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

additional-examples.mdindex.mdstorm-operators.mdutility-classes.mdwordcount-examples.md

storm-operators.mddocs/

0

# Storm Operators

1

2

Production-ready Storm operators including Spouts for data ingestion and Bolts for data processing. These operators support both index-based and field name-based tuple access patterns, providing flexibility for different integration approaches and data processing requirements.

3

4

## Capabilities

5

6

### Data Source Spouts

7

8

Spouts for ingesting data from various sources including files and memory.

9

10

#### WordCountFileSpout

11

12

File-based spout for reading text data from local files for word counting operations.

13

14

```java { .api }

15

/**

16

* Spout reading text data from files for word counting

17

*/

18

public class WordCountFileSpout extends FileSpout {

19

/**

20

* Create file spout for word count data

21

* @param path Path to input text file

22

*/

23

public WordCountFileSpout(String path);

24

25

/**

26

* Declare output fields for word count processing

27

* @param declarer Output field declarer

28

*/

29

public void declareOutputFields(OutputFieldsDeclarer declarer);

30

}

31

```

32

33

#### WordCountInMemorySpout

34

35

Memory-based spout providing built-in text data for word counting examples and testing.

36

37

```java { .api }

38

/**

39

* Spout providing built-in text data for word counting

40

*/

41

public class WordCountInMemorySpout extends FiniteInMemorySpout {

42

/**

43

* Create in-memory spout with built-in word count data

44

*/

45

public WordCountInMemorySpout();

46

47

/**

48

* Declare output fields for word count processing

49

* @param declarer Output field declarer

50

*/

51

public void declareOutputFields(OutputFieldsDeclarer declarer);

52

}

53

```

54

55

### Processing Bolts

56

57

Bolts for tokenizing text and counting word occurrences with different tuple access patterns.

58

59

#### Index-Based Processing Bolts

60

61

Bolts that access tuple fields using index positions.

62

63

##### BoltTokenizer

64

65

Tokenizes sentences into words using tuple index access for high-performance processing.

66

67

```java { .api }

68

/**

69

* Tokenizes sentences into words using tuple index access

70

*/

71

public class BoltTokenizer implements IRichBolt {

72

public static final String ATTRIBUTE_WORD = "word";

73

public static final String ATTRIBUTE_COUNT = "count";

74

public static final int ATTRIBUTE_WORD_INDEX = 0;

75

public static final int ATTRIBUTE_COUNT_INDEX = 1;

76

77

/**

78

* Prepare bolt for execution

79

* @param stormConf Storm configuration

80

* @param context Topology context

81

* @param collector Output collector

82

*/

83

public void prepare(Map stormConf, TopologyContext context, OutputCollector collector);

84

85

/**

86

* Execute tokenization on input tuple

87

* @param input Input tuple containing text to tokenize

88

*/

89

public void execute(Tuple input);

90

91

/**

92

* Cleanup bolt resources

93

*/

94

public void cleanup();

95

96

/**

97

* Declare output fields

98

* @param declarer Output field declarer

99

*/

100

public void declareOutputFields(OutputFieldsDeclarer declarer);

101

102

/**

103

* Get component configuration

104

* @return Configuration map

105

*/

106

public Map<String, Object> getComponentConfiguration();

107

}

108

```

109

110

##### BoltCounter

111

112

Counts word occurrences using tuple index access for efficient aggregation.

113

114

```java { .api }

115

/**

116

* Counts word occurrences using tuple index access

117

*/

118

public class BoltCounter implements IRichBolt {

119

public static final String ATTRIBUTE_WORD = "word";

120

public static final String ATTRIBUTE_COUNT = "count";

121

122

/**

123

* Prepare bolt for execution

124

* @param stormConf Storm configuration

125

* @param context Topology context

126

* @param collector Output collector

127

*/

128

public void prepare(Map stormConf, TopologyContext context, OutputCollector collector);

129

130

/**

131

* Execute counting on input tuple

132

* @param input Input tuple containing word and count

133

*/

134

public void execute(Tuple input);

135

136

/**

137

* Cleanup bolt resources

138

*/

139

public void cleanup();

140

141

/**

142

* Declare output fields

143

* @param declarer Output field declarer

144

*/

145

public void declareOutputFields(OutputFieldsDeclarer declarer);

146

147

/**

148

* Get component configuration

149

* @return Configuration map

150

*/

151

public Map<String, Object> getComponentConfiguration();

152

}

153

```

154

155

#### Field Name-Based Processing Bolts

156

157

Bolts that access tuple fields using field names for better readability and maintainability.

158

159

##### BoltTokenizerByName

160

161

Tokenizes sentences using tuple field name access.

162

163

```java { .api }

164

/**

165

* Tokenizes sentences using tuple field name access

166

*/

167

public class BoltTokenizerByName implements IRichBolt {

168

public static final String ATTRIBUTE_WORD = "word";

169

170

/**

171

* Prepare bolt for execution

172

* @param stormConf Storm configuration

173

* @param context Topology context

174

* @param collector Output collector

175

*/

176

public void prepare(Map stormConf, TopologyContext context, OutputCollector collector);

177

178

/**

179

* Execute tokenization using field name access

180

* @param input Input tuple containing text field

181

*/

182

public void execute(Tuple input);

183

184

/**

185

* Cleanup bolt resources

186

*/

187

public void cleanup();

188

189

/**

190

* Declare output fields

191

* @param declarer Output field declarer

192

*/

193

public void declareOutputFields(OutputFieldsDeclarer declarer);

194

195

/**

196

* Get component configuration

197

* @return Configuration map

198

*/

199

public Map<String, Object> getComponentConfiguration();

200

}

201

```

202

203

##### BoltCounterByName

204

205

Counts word occurrences using tuple field name access.

206

207

```java { .api }

208

/**

209

* Counts word occurrences using tuple field name access

210

*/

211

public class BoltCounterByName implements IRichBolt {

212

/**

213

* Prepare bolt for execution

214

* @param stormConf Storm configuration

215

* @param context Topology context

216

* @param collector Output collector

217

*/

218

public void prepare(Map stormConf, TopologyContext context, OutputCollector collector);

219

220

/**

221

* Execute counting using field name access

222

* @param input Input tuple with named fields

223

*/

224

public void execute(Tuple input);

225

226

/**

227

* Cleanup bolt resources

228

*/

229

public void cleanup();

230

231

/**

232

* Declare output fields

233

* @param declarer Output field declarer

234

*/

235

public void declareOutputFields(OutputFieldsDeclarer declarer);

236

237

/**

238

* Get component configuration

239

* @return Configuration map

240

*/

241

public Map<String, Object> getComponentConfiguration();

242

}

243

```

244

245

### Data Wrapper Classes

246

247

Utility classes for wrapping test data in different formats.

248

249

#### WordCountDataTuple

250

251

Tuple wrapper for WordCount test data providing typed access to test sentences.

252

253

```java { .api }

254

/**

255

* Tuple wrapper for WordCount test data

256

*/

257

public class WordCountDataTuple {

258

/**

259

* Array of tuples containing test sentences

260

*/

261

public static Tuple1<String>[] TUPLES;

262

}

263

```

264

265

#### WordCountDataPojos

266

267

POJO wrapper for WordCount test data enabling object-oriented data access.

268

269

```java { .api }

270

/**

271

* POJO wrapper for WordCount test data

272

*/

273

public class WordCountDataPojos {

274

/**

275

* Array of sentence POJOs for testing

276

*/

277

public static Sentence[] SENTENCES;

278

279

/**

280

* POJO representing a sentence for word count processing

281

*/

282

public static class Sentence implements Serializable {

283

/**

284

* Default constructor

285

*/

286

public Sentence();

287

288

/**

289

* Constructor with sentence text

290

* @param sentence The sentence text

291

*/

292

public Sentence(String sentence);

293

294

/**

295

* Get sentence text

296

* @return The sentence text

297

*/

298

public String getSentence();

299

300

/**

301

* Set sentence text

302

* @param sentence The sentence text

303

*/

304

public void setSentence(String sentence);

305

306

/**

307

* String representation of sentence

308

* @return The sentence text

309

*/

310

public String toString();

311

}

312

}

313

```

314

315

## Usage Examples

316

317

### Creating Topology with Index-Based Access

318

319

```java

320

import org.apache.storm.topology.TopologyBuilder;

321

import org.apache.storm.tuple.Fields;

322

import org.apache.flink.storm.wordcount.operators.*;

323

324

TopologyBuilder builder = new TopologyBuilder();

325

326

// Add spout

327

builder.setSpout("source", new WordCountInMemorySpout());

328

329

// Add tokenizer bolt (index-based)

330

builder.setBolt("tokenizer", new BoltTokenizer(), 4)

331

.shuffleGrouping("source");

332

333

// Add counter bolt (index-based)

334

builder.setBolt("counter", new BoltCounter(), 4)

335

.fieldsGrouping("tokenizer", new Fields(BoltTokenizer.ATTRIBUTE_WORD));

336

```

337

338

### Creating Topology with Field Name-Based Access

339

340

```java

341

import org.apache.storm.topology.TopologyBuilder;

342

import org.apache.storm.tuple.Fields;

343

import org.apache.flink.storm.wordcount.operators.*;

344

345

TopologyBuilder builder = new TopologyBuilder();

346

347

// Add spout

348

builder.setSpout("source", new WordCountFileSpout("input.txt"));

349

350

// Add tokenizer bolt (name-based)

351

builder.setBolt("tokenizer", new BoltTokenizerByName(), 4)

352

.shuffleGrouping("source");

353

354

// Add counter bolt (name-based)

355

builder.setBolt("counter", new BoltCounterByName(), 4)

356

.fieldsGrouping("tokenizer", new Fields(BoltTokenizerByName.ATTRIBUTE_WORD));

357

```

358

359

### Using Data Sources

360

361

```java

362

import org.apache.flink.storm.wordcount.operators.*;

363

364

// File-based data source

365

WordCountFileSpout fileSpout = new WordCountFileSpout("/path/to/input.txt");

366

367

// Memory-based data source with built-in data

368

WordCountInMemorySpout memorySpout = new WordCountInMemorySpout();

369

```

370

371

### Custom Processing Pipeline

372

373

```java

374

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

375

import org.apache.flink.storm.wrappers.*;

376

import org.apache.flink.storm.wordcount.operators.*;

377

378

public class CustomStormPipeline {

379

public static void main(String[] args) throws Exception {

380

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

381

382

// Create spout wrapper

383

SpoutWrapper<String> spoutWrapper = new SpoutWrapper<>(

384

new WordCountInMemorySpout(),

385

new String[]{ Utils.DEFAULT_STREAM_ID },

386

-1

387

);

388

389

// Create bolt wrapper

390

BoltWrapper<String, Tuple2<String, Integer>> boltWrapper =

391

new BoltWrapper<>(new BoltTokenizer());

392

393

// Build processing pipeline

394

DataStream<String> source = env.addSource(spoutWrapper);

395

DataStream<Tuple2<String, Integer>> processed = source.transform(

396

"Tokenizer",

397

TypeExtractor.getForObject(new Tuple2<>("", 0)),

398

boltWrapper

399

);

400

401

// Aggregate and output

402

processed.keyBy(0).sum(1).print();

403

env.execute("Storm Processing Pipeline");

404

}

405

}

406

```

407

408

### Working with POJO Data

409

410

```java

411

import org.apache.flink.storm.wordcount.operators.WordCountDataPojos.*;

412

413

// Create sentence POJOs

414

Sentence sentence1 = new Sentence("Hello world from Storm");

415

Sentence sentence2 = new Sentence("Apache Flink processes streams");

416

417

// Access built-in test data

418

Sentence[] testSentences = WordCountDataPojos.SENTENCES;

419

for (Sentence sentence : testSentences) {

420

System.out.println("Processing: " + sentence.getSentence());

421

}

422

```

423

424

### Bolt Implementation Patterns

425

426

#### Index-Based Processing

427

428

```java

429

public class CustomIndexBolt implements IRichBolt {

430

private OutputCollector collector;

431

432

public void execute(Tuple input) {

433

// Access by index (fast)

434

String text = input.getString(0);

435

Integer count = input.getInteger(1);

436

437

// Process and emit

438

collector.emit(new Values(processedText, newCount));

439

collector.ack(input);

440

}

441

}

442

```

443

444

#### Field Name-Based Processing

445

446

```java

447

public class CustomNameBolt implements IRichBolt {

448

private OutputCollector collector;

449

450

public void execute(Tuple input) {

451

// Access by field name (readable)

452

String word = input.getStringByField("word");

453

Integer count = input.getIntegerByField("count");

454

455

// Process and emit

456

collector.emit(new Values(processedWord, newCount));

457

collector.ack(input);

458

}

459

}

460

```