or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

additional-examples.mdindex.mdstorm-operators.mdutility-classes.mdwordcount-examples.md
tile.json

wordcount-examples.mddocs/

Word Count Examples

Comprehensive word count examples demonstrating various Storm-Flink integration patterns. These examples show embedded mode usage (Storm components within Flink streaming), complete topology execution, and different data access patterns including index-based, field name-based, and POJO-based approaches.

Capabilities

Embedded Mode Examples

Examples integrating individual Storm components within Flink streaming programs.

SpoutSourceWordCount

Word count example using Storm spout as data source within Flink streaming program.

/**
 * Word count using Storm spout as data source in Flink streaming
 */
public class SpoutSourceWordCount {
    /**
     * Main entry point for spout-based word count
     * @param args Command line arguments: [input-path] [output-path]
     */
    public static void main(String[] args) throws Exception;
    
    /**
     * Tokenizer that splits sentences into words
     */
    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception;
    }
}

Usage Example:

// Run with default data
SpoutSourceWordCount.main(new String[]{});

// Run with file input/output
SpoutSourceWordCount.main(new String[]{"input.txt", "output.txt"});

BoltTokenizerWordCount

Word count example using Storm bolt for tokenization within Flink streaming program.

/**
 * Word count using Storm bolt for tokenization in Flink streaming
 */
public class BoltTokenizerWordCount {
    /**
     * Main entry point for bolt-based word count
     * @param args Command line arguments: [input-path] [output-path]
     */
    public static void main(String[] args) throws Exception;
}

Field Name Access Variants

Word count examples demonstrating field name-based tuple access patterns.

/**
 * Word count accessing tuple fields by name rather than index
 */
public class BoltTokenizerWordCountWithNames {
    public static void main(String[] args) throws Exception;
}

/**
 * Word count using POJO input types with field name access
 */
public class BoltTokenizerWordCountPojo {
    public static void main(String[] args) throws Exception;
}

Full Topology Examples

Examples executing complete Storm topologies on Flink clusters.

Local Topology Execution

Examples for local testing and development.

/**
 * Local execution of complete Storm topology for word counting
 */
public class WordCountLocal {
    public static final String topologyId = "Storm WordCount";
    
    /**
     * Main entry point for local topology execution
     * @param args Command line arguments: [input-path] [output-path]  
     */
    public static void main(String[] args) throws Exception;
}

/**
 * Local topology execution using field names instead of indices
 */
public class WordCountLocalByName {
    /**
     * Main entry point for local execution with field names
     * @param args Command line arguments: [input-path] [output-path]
     */
    public static void main(String[] args) throws Exception;
}

Remote Topology Submission

Examples for submitting topologies to remote Flink clusters.

/**
 * Remote cluster submission using FlinkClient
 */
public class WordCountRemoteByClient {
    public static final String topologyId = "Storm WordCount";
    
    /**
     * Submit topology to remote cluster via client
     * @param args Command line arguments: [input-path] [output-path]
     */
    public static void main(String[] args) throws Exception;
}

/**
 * Remote cluster submission using Storm submitter pattern
 */
public class WordCountRemoteBySubmitter {
    /**
     * Submit topology to remote cluster via submitter
     * @param args Command line arguments: [input-path] [output-path]
     */
    public static void main(String[] args) throws Exception;
}

Topology Builder

Utility for constructing word count Storm topologies with various configuration options.

WordCountTopology

Builder for word count Storm topologies supporting both index-based and field name-based access patterns.

/**
 * Builder for word count Storm topologies
 */
public class WordCountTopology {
    public static final String spoutId = "source";
    public static final String tokenierzerId = "tokenizer"; 
    public static final String counterId = "counter";
    public static final String sinkId = "sink";
    
    /**
     * Build topology with default index-based access
     * @return Configured TopologyBuilder
     */
    public static TopologyBuilder buildTopology();
    
    /**
     * Build topology with configurable access pattern
     * @param indexOrName true for index-based access, false for field name-based
     * @return Configured TopologyBuilder  
     */
    public static TopologyBuilder buildTopology(boolean indexOrName);
    
    /**
     * Parse command line parameters
     * @param args Command line arguments
     * @return true if parameters valid, false otherwise
     */
    static boolean parseParameters(String[] args);
}

Usage Example:

import org.apache.storm.topology.TopologyBuilder;
import org.apache.flink.storm.wordcount.WordCountTopology;

// Build topology with index-based access
TopologyBuilder builder = WordCountTopology.buildTopology();

// Build topology with field name-based access  
TopologyBuilder builderByName = WordCountTopology.buildTopology(false);

Integration Patterns

Embedded Mode Pattern

Integrating Storm components within Flink streaming programs:

import org.apache.flink.storm.wrappers.SpoutWrapper;
import org.apache.flink.storm.wrappers.BoltWrapper;
import org.apache.storm.utils.Utils;

// Using Storm spout in Flink streaming
DataStream<String> source = env.addSource(
    new SpoutWrapper<String>(new WordCountFileSpout("input.txt"),
        new String[] { Utils.DEFAULT_STREAM_ID }, -1),
    TypeExtractor.getForClass(String.class)
).setParallelism(1);

// Using Storm bolt in Flink streaming  
DataStream<Tuple2<String, Integer>> tokenized = source.transform(
    "BoltTokenizer",
    TypeExtractor.getForObject(new Tuple2<String, Integer>("", 0)),
    new BoltWrapper<String, Tuple2<String, Integer>>(new BoltTokenizer())
);

Full Topology Pattern

Building and executing complete Storm topologies:

import org.apache.storm.topology.TopologyBuilder;
import org.apache.flink.storm.api.FlinkTopology;
import org.apache.flink.storm.api.FlinkLocalCluster;

// Build topology
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("source", new WordCountInMemorySpout());
builder.setBolt("tokenizer", new BoltTokenizer(), 4).shuffleGrouping("source");
builder.setBolt("counter", new BoltCounter(), 4).fieldsGrouping("tokenizer", new Fields("word"));
builder.setBolt("sink", new BoltPrintSink(new TupleOutputFormatter())).shuffleGrouping("counter");

// Submit to local cluster
FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
cluster.submitTopology("WordCount", new Config(), FlinkTopology.createTopology(builder));

Data Access Patterns

Index-Based Access

Accessing tuple fields by position:

public class IndexBasedBolt implements IRichBolt {
    public void execute(Tuple input) {
        String word = input.getString(0);  // First field
        Integer count = input.getInteger(1);  // Second field
        // Process data...
    }
}

Field Name-Based Access

Accessing tuple fields by name:

public class NameBasedBolt implements IRichBolt {
    public void execute(Tuple input) {
        String word = input.getStringByField("word");
        Integer count = input.getIntegerByField("count");
        // Process data...
    }
}

POJO-Based Access

Using Plain Old Java Objects:

public class WordCountPojo {
    public static class Sentence implements Serializable {
        private String sentence;
        
        public Sentence() {}
        public Sentence(String sentence) { this.sentence = sentence; }
        
        public String getSentence() { return sentence; }
        public void setSentence(String sentence) { this.sentence = sentence; }
    }
}

Usage Examples

Running Examples

// Embedded mode examples
SpoutSourceWordCount.main(new String[]{"input.txt", "output.txt"});
BoltTokenizerWordCount.main(new String[]{"input.txt", "output.txt"});

// Full topology examples  
WordCountLocal.main(new String[]{"input.txt", "output.txt"});
WordCountRemoteByClient.main(new String[]{"input.txt", "output.txt"});

// Field name variants
BoltTokenizerWordCountWithNames.main(new String[]{"input.txt", "output.txt"});
WordCountLocalByName.main(new String[]{"input.txt", "output.txt"});

Custom Integration

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.storm.wordcount.operators.*;

public class CustomWordCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Create data source
        DataStream<String> source = env.addSource(
            new SpoutWrapper<>(new WordCountInMemorySpout(), 
                new String[]{ Utils.DEFAULT_STREAM_ID }, -1)
        );
        
        // Process with Storm bolt
        DataStream<Tuple2<String, Integer>> processed = source.transform(
            "Tokenizer", 
            TypeExtractor.getForObject(new Tuple2<>("", 0)),
            new BoltWrapper<>(new BoltTokenizer())
        );
        
        // Aggregate and output
        processed.keyBy(0).sum(1).print();
        env.execute("Custom Word Count");
    }
}