CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-examples-streaming-2-10

Apache Flink streaming examples demonstrating various stream processing patterns and use cases

Pending
Overview
Eval results
Files

socket.mddocs/

Socket Streaming Examples

Real-time data processing from socket connections with windowing and aggregation operations. Demonstrates live data ingestion, time-based windowing, and windowed aggregations.

Capabilities

SocketWindowWordCount (Java)

Streaming windowed version of word count that connects to a server socket and processes strings in real-time with time-based windows.

/**
 * Windowed word count from socket text stream
 * Connects to server socket and processes text with 5-second time windows
 * @param args Command line arguments (--hostname, --port)
 */
public class SocketWindowWordCount {
    public static void main(String[] args) throws Exception;
}

Usage Example:

# Start a text server (in separate terminal)
nc -l 12345

# Run the socket word count example
java -cp flink-examples-streaming_2.10-1.3.3.jar \
  org.apache.flink.streaming.examples.socket.SocketWindowWordCount \
  --hostname localhost --port 12345

WordWithCount Data Class

POJO class for representing words with their counts in windowed aggregations.

/**
 * Data type for words with count used in socket windowing
 */
public static class WordWithCount {
    public String word;
    public long count;
    
    /** Default constructor for serialization */
    public WordWithCount();
    
    /** 
     * Constructor with word and count
     * @param word The word string
     * @param count Occurrence count
     */
    public WordWithCount(String word, long count);
    
    /**
     * String representation in format "word : count"
     * @return Formatted string representation
     */
    public String toString();
}

SocketWindowWordCount (Scala)

Scala implementation of socket-based windowed word count using case classes and functional API.

/**
 * Scala version of socket windowed word count
 * @param args Command line arguments (--hostname, --port)
 */
object SocketWindowWordCount {
    def main(args: Array[String]): Unit;
}

/** Data type for words with count using Scala case class */
case class WordWithCount(word: String, count: Long)

Usage Example:

// Scala API usage pattern
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream(hostname, port, '\n')

val windowCounts = text
  .flatMap { w => w.split("\\s") }
  .map { w => WordWithCount(w, 1) }
  .keyBy("word")
  .timeWindow(Time.seconds(5))
  .sum("count")

Key Patterns Demonstrated

Socket Data Ingestion

// Connect to socket text stream
DataStream<String> text = env.socketTextStream(hostname, port, "\n");

Windowed Processing Pipeline

DataStream<WordWithCount> windowCounts = text
    // Parse and create word objects
    .flatMap(new FlatMapFunction<String, WordWithCount>() {
        @Override
        public void flatMap(String value, Collector<WordWithCount> out) {
            for (String word : value.split("\\s")) {
                out.collect(new WordWithCount(word, 1L));
            }
        }
    })
    // Group by word field
    .keyBy("word")
    // Create 5-second time windows
    .timeWindow(Time.seconds(5))
    // Aggregate counts within each window
    .reduce(new ReduceFunction<WordWithCount>() {
        @Override
        public WordWithCount reduce(WordWithCount a, WordWithCount b) {
            return new WordWithCount(a.word, a.count + b.count);
        }
    });

Parameter Validation and Error Handling

final ParameterTool params = ParameterTool.fromArgs(args);
final String hostname = params.has("hostname") ? params.get("hostname") : "localhost";
final int port;

try {
    port = params.getInt("port");
} catch (Exception e) {
    System.err.println("No port specified. Please run 'SocketWindowWordCount " +
        "--hostname <hostname> --port <port>'");
    System.err.println("To start a simple text server, run 'netcat -l <port>'");
    return;
}

Parallel Output Control

// Print results with single thread for ordered output
windowCounts.print().setParallelism(1);

Window Operations

Time Windows

  • Window Size: 5 seconds (Time.seconds(5))
  • Window Type: Tumbling time windows
  • Trigger: Time-based (fires when window time expires)
  • Aggregation: Reduce function summing word counts

Key-Based Partitioning

  • Uses field-based keying: .keyBy("word")
  • Ensures all instances of same word go to same window
  • Enables parallel processing per word across windows

External System Requirements

Socket Server Setup

# Using netcat to create test socket server
nc -l 12345

# Type text lines to send to Flink application
hello world
hello flink streaming

Network Configuration

  • Default hostname: localhost
  • Required port: Must be specified via --port parameter
  • Protocol: TCP text stream with newline delimiters
  • Data format: Plain text lines

Dependencies

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.10</artifactId>
    <version>1.3.3</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.10</artifactId>
    <version>1.3.3</version>
</dependency>

Required Imports

Java Version

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

Scala Version

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-examples-streaming-2-10

docs

async.md

external-systems.md

index.md

iteration.md

joins.md

machine-learning.md

side-output.md

socket.md

utilities.md

windowing.md

wordcount.md

tile.json