Apache Flink streaming examples demonstrating various stream processing patterns and use cases
—
Real-time data processing from socket connections with windowing and aggregation operations. Demonstrates live data ingestion, time-based windowing, and windowed aggregations.
Streaming windowed version of word count that connects to a server socket and processes strings in real-time with time-based windows.
/**
* Windowed word count from socket text stream
* Connects to server socket and processes text with 5-second time windows
* @param args Command line arguments (--hostname, --port)
*/
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception;
}Usage Example:
# Start a text server (in separate terminal)
nc -l 12345
# Run the socket word count example
java -cp flink-examples-streaming_2.10-1.3.3.jar \
org.apache.flink.streaming.examples.socket.SocketWindowWordCount \
--hostname localhost --port 12345POJO class for representing words with their counts in windowed aggregations.
/**
* Data type for words with count used in socket windowing
*/
public static class WordWithCount {
public String word;
public long count;
/** Default constructor for serialization */
public WordWithCount();
/**
* Constructor with word and count
* @param word The word string
* @param count Occurrence count
*/
public WordWithCount(String word, long count);
/**
* String representation in format "word : count"
* @return Formatted string representation
*/
public String toString();
}Scala implementation of socket-based windowed word count using case classes and functional API.
/**
* Scala version of socket windowed word count
* @param args Command line arguments (--hostname, --port)
*/
object SocketWindowWordCount {
def main(args: Array[String]): Unit;
}
/** Data type for words with count using Scala case class */
case class WordWithCount(word: String, count: Long)Usage Example:
// Scala API usage pattern
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream(hostname, port, '\n')
val windowCounts = text
.flatMap { w => w.split("\\s") }
.map { w => WordWithCount(w, 1) }
.keyBy("word")
.timeWindow(Time.seconds(5))
.sum("count")// Connect to socket text stream
DataStream<String> text = env.socketTextStream(hostname, port, "\n");DataStream<WordWithCount> windowCounts = text
// Parse and create word objects
.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String value, Collector<WordWithCount> out) {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
// Group by word field
.keyBy("word")
// Create 5-second time windows
.timeWindow(Time.seconds(5))
// Aggregate counts within each window
.reduce(new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});final ParameterTool params = ParameterTool.fromArgs(args);
final String hostname = params.has("hostname") ? params.get("hostname") : "localhost";
final int port;
try {
port = params.getInt("port");
} catch (Exception e) {
System.err.println("No port specified. Please run 'SocketWindowWordCount " +
"--hostname <hostname> --port <port>'");
System.err.println("To start a simple text server, run 'netcat -l <port>'");
return;
}// Print results with single thread for ordered output
windowCounts.print().setParallelism(1);Time.seconds(5)).keyBy("word")# Using netcat to create test socket server
nc -l 12345
# Type text lines to send to Flink application
hello world
hello flink streaming<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>1.3.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.10</artifactId>
<version>1.3.3</version>
</dependency>import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.TimeInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-examples-streaming-2-10