or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

additional-examples.mdindex.mdstorm-operators.mdutility-classes.mdwordcount-examples.md
tile.json

additional-examples.mddocs/

Additional Examples

Extended examples demonstrating advanced Storm-Flink integration patterns beyond basic word counting, including stream splitting with multiple outputs, text processing with exclamation marks, stream joins, and real-time data printing capabilities.

Capabilities

Exclamation Processing Examples

Examples demonstrating text processing with Storm topologies that add exclamation marks to text streams.

ExclamationLocal

Local execution example for exclamation processing topology.

/**
 * Local execution example for exclamation processing topology
 */
public class ExclamationLocal {
    public static final String topologyId = "Streaming Exclamation";
    
    /**
     * Main entry point for local exclamation topology
     * @param args Command line arguments for configuration
     */
    public static void main(String[] args) throws Exception;
}

ExclamationWithBolt

Example showing Storm bolt usage within Flink streaming for text processing.

/**
 * Shows using Storm bolt within Flink streaming program for exclamation processing
 */
public class ExclamationWithBolt {
    /**
     * Main entry point for bolt-based exclamation processing
     * @param args Command line arguments
     */
    public static void main(String[] args) throws Exception;
    
    /**
     * Map function for adding exclamation marks to text
     */
    public static class ExclamationMap implements MapFunction<String, String> {
        public String map(String value) throws Exception;
    }
}

ExclamationWithSpout

Example showing Storm spout usage within Flink streaming for data sourcing.

/**
 * Shows using Storm spout within Flink streaming program
 */
public class ExclamationWithSpout {
    /**
     * Main entry point for spout-based exclamation processing
     * @param args Command line arguments
     */
    public static void main(String[] args) throws Exception;
    
    /**
     * Map function for processing spout data
     */
    public static class ExclamationMap implements MapFunction<String, String> {
        public String map(String value) throws Exception;
    }
}

ExclamationTopology

Topology builder for exclamation processing with configurable parameters.

/**
 * Builder for exclamation processing topology
 */
public class ExclamationTopology {
    public static final String spoutId = "source";
    public static final String firstBoltId = "exclamation1";
    public static final String secondBoltId = "exclamation2";
    public static final String sinkId = "sink";
    
    /**
     * Build exclamation processing topology
     * @return Configured TopologyBuilder
     */
    public static TopologyBuilder buildTopology();
    
    /**
     * Get exclamation count configuration
     * @return Number of exclamation marks to add
     */
    public static int getExclamation();
    
    /**
     * Parse command line parameters
     * @param args Command line arguments
     * @return true if parameters valid, false otherwise
     */
    public static boolean parseParameters(String[] args);
}

ExclamationBolt

Bolt operator that adds configurable exclamation marks to text.

/**
 * Bolt that adds configurable exclamation marks to text
 */
public class ExclamationBolt implements IRichBolt {
    public static final String EXCLAMATION_COUNT = "exclamation.count";
    
    /**
     * Prepare bolt for execution
     * @param conf Storm configuration
     * @param context Topology context
     * @param collector Output collector
     */
    public void prepare(Map conf, TopologyContext context, OutputCollector collector);
    
    /**
     * Cleanup bolt resources
     */
    public void cleanup();
    
    /**
     * Execute exclamation processing on tuple
     * @param tuple Input tuple containing text
     */
    public void execute(Tuple tuple);
    
    /**
     * Declare output fields
     * @param declarer Output field declarer
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer);
    
    /**
     * Get component configuration
     * @return Configuration map
     */
    public Map<String, Object> getComponentConfiguration();
}

Stream Splitting Examples

Examples demonstrating multiple output streams from Storm components with even/odd number splitting.

SpoutSplitExample

Example demonstrating spouts with multiple output streams and stream-specific processing.

/**
 * Demonstrates spouts with multiple output streams
 */
public class SpoutSplitExample {
    /**
     * Main entry point for stream splitting example
     * @param args Command line arguments
     */
    public static void main(String[] args) throws Exception;
    
    /**
     * Map function for enriching split stream data
     */
    public static class Enrich implements MapFunction<Integer, Tuple2<String, Integer>> {
        /**
         * Error tracking for stream processing
         */
        public static boolean errorOccured = false;
        
        public Tuple2<String, Integer> map(Integer value) throws Exception;
    }
}

RandomSpout

Spout generating random numbers with separate even and odd output streams.

/**
 * Spout generating random numbers with separate even/odd streams
 */
public class RandomSpout extends BaseRichSpout {
    public static final String EVEN_STREAM = "even";
    public static final String ODD_STREAM = "odd";
    
    /**
     * Create random spout with stream splitting
     * @param split true to enable stream splitting, false for single stream
     * @param seed Random seed for reproducible results
     */
    public RandomSpout(boolean split, long seed);
    
    /**
     * Initialize spout
     * @param conf Storm configuration
     * @param context Topology context
     * @param collector Spout output collector
     */
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
    
    /**
     * Emit next tuple to appropriate stream
     */
    public void nextTuple();
    
    /**
     * Declare output fields for both streams
     * @param declarer Output field declarer
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer);
}

VerifyAndEnrichBolt

Bolt for verifying and enriching data from split streams with error tracking.

/**
 * Bolt for verifying and enriching data from split streams
 */
public class VerifyAndEnrichBolt extends BaseRichBolt {
    /**
     * Global error tracking across bolt instances
     */
    public static boolean errorOccured = false;
    
    /**
     * Create verification bolt for specific stream type
     * @param evenOrOdd true for even stream processing, false for odd stream
     */
    public VerifyAndEnrichBolt(boolean evenOrOdd);
    
    /**
     * Prepare bolt for execution
     * @param stormConf Storm configuration
     * @param context Topology context
     * @param collector Output collector
     */
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
    
    /**
     * Execute verification and enrichment
     * @param input Input tuple from split stream
     */
    public void execute(Tuple input);
    
    /**
     * Declare output fields
     * @param declarer Output field declarer
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer);
}

Join Examples

Examples demonstrating stream joins in Storm topologies.

SingleJoinExample

Example demonstrating joins between multiple data streams in Storm.

/**
 * Example demonstrating joins in Storm topologies
 */
public class SingleJoinExample {
    /**
     * Main entry point for join example
     * @param args Command line arguments
     */
    public static void main(String[] args) throws Exception;
}

Print Examples

Examples for printing and displaying real-time data streams.

PrintSampleStream

Example for printing real-time streams with Twitter integration support.

/**
 * Example for printing real-time streams (requires Twitter API credentials)
 */
public class PrintSampleStream {
    /**
     * Main entry point for stream printing example
     * @param args Command line arguments with Twitter credentials
     */
    public static void main(String[] args) throws Exception;
}

Remote Execution Examples

Examples demonstrating remote Storm topology execution on Flink clusters.

WordCountRemoteByClient

Example showing remote topology submission using FlinkClient.

/**
 * Remote topology submission using FlinkClient
 */
public class WordCountRemoteByClient {
    public static final String topologyId = "Storm WordCount";
    
    /**
     * Main entry point for remote client submission
     * @param args Command line arguments containing cluster configuration
     */
    public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, NotAliveException;
}

WordCountRemoteBySubmitter

Example showing remote topology submission using FlinkSubmitter.

/**
 * Remote topology submission using FlinkSubmitter
 */
public class WordCountRemoteBySubmitter {
    public static final String topologyId = "Storm WordCount";
    
    /**
     * Main entry point for remote submitter
     * @param args Command line arguments
     */
    public static void main(String[] args) throws Exception;
}

Usage Examples

Running Exclamation Examples

// Local exclamation processing
ExclamationLocal.main(new String[]{});

// Embedded bolt usage
ExclamationWithBolt.main(new String[]{});

// Embedded spout usage  
ExclamationWithSpout.main(new String[]{});

Remote Topology Execution

import org.apache.flink.storm.wordcount.*;

// Submit topology to remote Flink cluster using client
WordCountRemoteByClient.main(new String[]{
    "localhost", "6123", "input.txt", "output.txt"
});

// Submit topology using FlinkSubmitter pattern
WordCountRemoteBySubmitter.main(new String[]{
    "input.txt", "output.txt" 
});

Types

// Storm exception types for remote execution
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.generated.NotAliveException;

// Core Storm tuple and field types
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

// Flink tuple types  
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;

// Map and function interfaces
import java.util.Map;
import java.io.Serializable;

// POJO classes for data exchange
public static class Sentence implements Serializable {
    private String sentence;
    
    public Sentence();
    public Sentence(String sentence);
    public String getSentence();
    public void setSentence(String sentence);
    public String toString();
}

Building Exclamation Topology

import org.apache.storm.topology.TopologyBuilder;
import org.apache.flink.storm.exclamation.ExclamationTopology;

// Build exclamation topology
TopologyBuilder builder = ExclamationTopology.buildTopology();

// Configure exclamation count
int count = ExclamationTopology.getExclamation();

Stream Splitting with RandomSpout

import org.apache.storm.topology.TopologyBuilder;
import org.apache.flink.storm.split.operators.*;

TopologyBuilder builder = new TopologyBuilder();

// Add random spout with stream splitting
RandomSpout spout = new RandomSpout(true, 12345L);
builder.setSpout("random", spout);

// Process even stream
VerifyAndEnrichBolt evenBolt = new VerifyAndEnrichBolt(true);
builder.setBolt("even-processor", evenBolt)
    .shuffleGrouping("random", RandomSpout.EVEN_STREAM);

// Process odd stream  
VerifyAndEnrichBolt oddBolt = new VerifyAndEnrichBolt(false);
builder.setBolt("odd-processor", oddBolt)
    .shuffleGrouping("random", RandomSpout.ODD_STREAM);

Custom Exclamation Processing

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.storm.exclamation.operators.ExclamationBolt;
import org.apache.flink.storm.wrappers.BoltWrapper;

public class CustomExclamation {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Create data source
        DataStream<String> source = env.fromElements(
            "Hello", "World", "Storm", "Flink"
        );
        
        // Apply exclamation bolt
        DataStream<String> processed = source.transform(
            "ExclamationBolt",
            BasicTypeInfo.STRING_TYPE_INFO,
            new BoltWrapper<String, String>(new ExclamationBolt())
        );
        
        processed.print();
        env.execute("Custom Exclamation");
    }
}

Multi-Stream Processing

import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.storm.split.SpoutSplitExample;

public class MultiStreamExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Create random number source
        DataStream<Integer> numbers = env.addSource(
            new SpoutWrapper<>(new RandomSpout(false, 42), 
                new String[]{ Utils.DEFAULT_STREAM_ID }, -1)
        );
        
        // Split stream based on even/odd
        SplitStream<Integer> split = numbers.split(new OutputSelector<Integer>() {
            @Override
            public Iterable<String> select(Integer value) {
                return value % 2 == 0 ? 
                    Collections.singletonList("even") : 
                    Collections.singletonList("odd");
            }
        });
        
        // Process even numbers
        DataStream<Integer> evenStream = split.select("even");
        evenStream.map(x -> "Even: " + x).print();
        
        // Process odd numbers
        DataStream<Integer> oddStream = split.select("odd");
        oddStream.map(x -> "Odd: " + x).print();
        
        env.execute("Multi-Stream Processing");
    }
}

Join Processing

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.time.Time;

public class StreamJoinExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Create two data streams
        DataStream<Tuple2<String, Integer>> stream1 = env.fromElements(
            new Tuple2<>("a", 1), new Tuple2<>("b", 2)
        );
        
        DataStream<Tuple2<String, String>> stream2 = env.fromElements(
            new Tuple2<>("a", "apple"), new Tuple2<>("b", "banana")
        );
        
        // Join streams by key within time window
        DataStream<Tuple3<String, Integer, String>> joined = stream1
            .join(stream2)
            .where(t -> t.f0)
            .equalTo(t -> t.f0)
            .window(TumblingEventTimeWindows.of(Time.seconds(5)))
            .apply((t1, t2) -> new Tuple3<>(t1.f0, t1.f1, t2.f1));
        
        joined.print();
        env.execute("Stream Join");
    }
}