or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

additional-examples.mdindex.mdstorm-operators.mdutility-classes.mdwordcount-examples.md
tile.json

storm-operators.mddocs/

Storm Operators

Production-ready Storm operators including Spouts for data ingestion and Bolts for data processing. These operators support both index-based and field name-based tuple access patterns, providing flexibility for different integration approaches and data processing requirements.

Capabilities

Data Source Spouts

Spouts for ingesting data from various sources including files and memory.

WordCountFileSpout

File-based spout for reading text data from local files for word counting operations.

/**
 * Spout reading text data from files for word counting
 */
public class WordCountFileSpout extends FileSpout {
    /**
     * Create file spout for word count data
     * @param path Path to input text file
     */
    public WordCountFileSpout(String path);
    
    /**
     * Declare output fields for word count processing
     * @param declarer Output field declarer
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer);
}

WordCountInMemorySpout

Memory-based spout providing built-in text data for word counting examples and testing.

/**
 * Spout providing built-in text data for word counting
 */
public class WordCountInMemorySpout extends FiniteInMemorySpout {
    /**
     * Create in-memory spout with built-in word count data
     */
    public WordCountInMemorySpout();
    
    /**
     * Declare output fields for word count processing
     * @param declarer Output field declarer  
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer);
}

Processing Bolts

Bolts for tokenizing text and counting word occurrences with different tuple access patterns.

Index-Based Processing Bolts

Bolts that access tuple fields using index positions.

BoltTokenizer

Tokenizes sentences into words using tuple index access for high-performance processing.

/**
 * Tokenizes sentences into words using tuple index access
 */
public class BoltTokenizer implements IRichBolt {
    public static final String ATTRIBUTE_WORD = "word";
    public static final String ATTRIBUTE_COUNT = "count";
    public static final int ATTRIBUTE_WORD_INDEX = 0;
    public static final int ATTRIBUTE_COUNT_INDEX = 1;
    
    /**
     * Prepare bolt for execution
     * @param stormConf Storm configuration
     * @param context Topology context
     * @param collector Output collector
     */
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
    
    /**
     * Execute tokenization on input tuple
     * @param input Input tuple containing text to tokenize
     */
    public void execute(Tuple input);
    
    /**
     * Cleanup bolt resources
     */
    public void cleanup();
    
    /**
     * Declare output fields
     * @param declarer Output field declarer
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer);
    
    /**
     * Get component configuration
     * @return Configuration map
     */
    public Map<String, Object> getComponentConfiguration();
}
BoltCounter

Counts word occurrences using tuple index access for efficient aggregation.

/**
 * Counts word occurrences using tuple index access
 */
public class BoltCounter implements IRichBolt {
    public static final String ATTRIBUTE_WORD = "word";
    public static final String ATTRIBUTE_COUNT = "count";
    
    /**
     * Prepare bolt for execution
     * @param stormConf Storm configuration
     * @param context Topology context
     * @param collector Output collector
     */
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
    
    /**
     * Execute counting on input tuple
     * @param input Input tuple containing word and count
     */
    public void execute(Tuple input);
    
    /**
     * Cleanup bolt resources
     */
    public void cleanup();
    
    /**
     * Declare output fields
     * @param declarer Output field declarer
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer);
    
    /**
     * Get component configuration
     * @return Configuration map
     */
    public Map<String, Object> getComponentConfiguration();
}

Field Name-Based Processing Bolts

Bolts that access tuple fields using field names for better readability and maintainability.

BoltTokenizerByName

Tokenizes sentences using tuple field name access.

/**
 * Tokenizes sentences using tuple field name access
 */
public class BoltTokenizerByName implements IRichBolt {
    public static final String ATTRIBUTE_WORD = "word";
    
    /**
     * Prepare bolt for execution
     * @param stormConf Storm configuration
     * @param context Topology context
     * @param collector Output collector
     */
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
    
    /**
     * Execute tokenization using field name access
     * @param input Input tuple containing text field
     */
    public void execute(Tuple input);
    
    /**
     * Cleanup bolt resources
     */
    public void cleanup();
    
    /**
     * Declare output fields
     * @param declarer Output field declarer
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer);
    
    /**
     * Get component configuration
     * @return Configuration map
     */
    public Map<String, Object> getComponentConfiguration();
}
BoltCounterByName

Counts word occurrences using tuple field name access.

/**
 * Counts word occurrences using tuple field name access
 */
public class BoltCounterByName implements IRichBolt {
    /**
     * Prepare bolt for execution
     * @param stormConf Storm configuration
     * @param context Topology context  
     * @param collector Output collector
     */
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
    
    /**
     * Execute counting using field name access
     * @param input Input tuple with named fields
     */
    public void execute(Tuple input);
    
    /**
     * Cleanup bolt resources
     */
    public void cleanup();
    
    /**
     * Declare output fields
     * @param declarer Output field declarer
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer);
    
    /**
     * Get component configuration
     * @return Configuration map
     */
    public Map<String, Object> getComponentConfiguration();
}

Data Wrapper Classes

Utility classes for wrapping test data in different formats.

WordCountDataTuple

Tuple wrapper for WordCount test data providing typed access to test sentences.

/**
 * Tuple wrapper for WordCount test data
 */
public class WordCountDataTuple {
    /**
     * Array of tuples containing test sentences
     */
    public static Tuple1<String>[] TUPLES;
}

WordCountDataPojos

POJO wrapper for WordCount test data enabling object-oriented data access.

/**
 * POJO wrapper for WordCount test data
 */
public class WordCountDataPojos {
    /**
     * Array of sentence POJOs for testing
     */
    public static Sentence[] SENTENCES;
    
    /**
     * POJO representing a sentence for word count processing
     */
    public static class Sentence implements Serializable {
        /**
         * Default constructor
         */
        public Sentence();
        
        /**
         * Constructor with sentence text
         * @param sentence The sentence text
         */
        public Sentence(String sentence);
        
        /**
         * Get sentence text
         * @return The sentence text
         */
        public String getSentence();
        
        /**
         * Set sentence text
         * @param sentence The sentence text
         */
        public void setSentence(String sentence);
        
        /**
         * String representation of sentence
         * @return The sentence text
         */
        public String toString();
    }
}

Usage Examples

Creating Topology with Index-Based Access

import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.flink.storm.wordcount.operators.*;

TopologyBuilder builder = new TopologyBuilder();

// Add spout
builder.setSpout("source", new WordCountInMemorySpout());

// Add tokenizer bolt (index-based)
builder.setBolt("tokenizer", new BoltTokenizer(), 4)
    .shuffleGrouping("source");

// Add counter bolt (index-based)  
builder.setBolt("counter", new BoltCounter(), 4)
    .fieldsGrouping("tokenizer", new Fields(BoltTokenizer.ATTRIBUTE_WORD));

Creating Topology with Field Name-Based Access

import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.flink.storm.wordcount.operators.*;

TopologyBuilder builder = new TopologyBuilder();

// Add spout
builder.setSpout("source", new WordCountFileSpout("input.txt"));

// Add tokenizer bolt (name-based)
builder.setBolt("tokenizer", new BoltTokenizerByName(), 4)
    .shuffleGrouping("source");

// Add counter bolt (name-based)
builder.setBolt("counter", new BoltCounterByName(), 4)
    .fieldsGrouping("tokenizer", new Fields(BoltTokenizerByName.ATTRIBUTE_WORD));

Using Data Sources

import org.apache.flink.storm.wordcount.operators.*;

// File-based data source
WordCountFileSpout fileSpout = new WordCountFileSpout("/path/to/input.txt");

// Memory-based data source with built-in data
WordCountInMemorySpout memorySpout = new WordCountInMemorySpout();

Custom Processing Pipeline

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.storm.wrappers.*;
import org.apache.flink.storm.wordcount.operators.*;

public class CustomStormPipeline {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Create spout wrapper
        SpoutWrapper<String> spoutWrapper = new SpoutWrapper<>(
            new WordCountInMemorySpout(),
            new String[]{ Utils.DEFAULT_STREAM_ID },
            -1
        );
        
        // Create bolt wrapper  
        BoltWrapper<String, Tuple2<String, Integer>> boltWrapper = 
            new BoltWrapper<>(new BoltTokenizer());
        
        // Build processing pipeline
        DataStream<String> source = env.addSource(spoutWrapper);
        DataStream<Tuple2<String, Integer>> processed = source.transform(
            "Tokenizer", 
            TypeExtractor.getForObject(new Tuple2<>("", 0)),
            boltWrapper
        );
        
        // Aggregate and output
        processed.keyBy(0).sum(1).print();
        env.execute("Storm Processing Pipeline");
    }
}

Working with POJO Data

import org.apache.flink.storm.wordcount.operators.WordCountDataPojos.*;

// Create sentence POJOs
Sentence sentence1 = new Sentence("Hello world from Storm");
Sentence sentence2 = new Sentence("Apache Flink processes streams");

// Access built-in test data
Sentence[] testSentences = WordCountDataPojos.SENTENCES;
for (Sentence sentence : testSentences) {
    System.out.println("Processing: " + sentence.getSentence());
}

Bolt Implementation Patterns

Index-Based Processing

public class CustomIndexBolt implements IRichBolt {
    private OutputCollector collector;
    
    public void execute(Tuple input) {
        // Access by index (fast)
        String text = input.getString(0);
        Integer count = input.getInteger(1);
        
        // Process and emit
        collector.emit(new Values(processedText, newCount));
        collector.ack(input);
    }
}

Field Name-Based Processing

public class CustomNameBolt implements IRichBolt {
    private OutputCollector collector;
    
    public void execute(Tuple input) {
        // Access by field name (readable)
        String word = input.getStringByField("word");
        Integer count = input.getIntegerByField("count");
        
        // Process and emit
        collector.emit(new Values(processedWord, newCount));
        collector.ack(input);
    }
}