0
# Socket Streaming Examples
1
2
Real-time data processing from socket connections with windowing and aggregation operations. Demonstrates live data ingestion, time-based windowing, and windowed aggregations.
3
4
## Capabilities
5
6
### SocketWindowWordCount (Java)
7
8
Streaming windowed version of word count that connects to a server socket and processes strings in real-time with time-based windows.
9
10
```java { .api }
11
/**
12
* Windowed word count from socket text stream
13
* Connects to server socket and processes text with 5-second time windows
14
* @param args Command line arguments (--hostname, --port)
15
*/
16
public class SocketWindowWordCount {
17
public static void main(String[] args) throws Exception;
18
}
19
```
20
21
**Usage Example:**
22
23
```bash
24
# Start a text server (in separate terminal)
25
nc -l 12345
26
27
# Run the socket word count example
28
java -cp flink-examples-streaming_2.10-1.3.3.jar \
29
org.apache.flink.streaming.examples.socket.SocketWindowWordCount \
30
--hostname localhost --port 12345
31
```
32
33
### WordWithCount Data Class
34
35
POJO class for representing words with their counts in windowed aggregations.
36
37
```java { .api }
38
/**
39
* Data type for words with count used in socket windowing
40
*/
41
public static class WordWithCount {
42
public String word;
43
public long count;
44
45
/** Default constructor for serialization */
46
public WordWithCount();
47
48
/**
49
* Constructor with word and count
50
* @param word The word string
51
* @param count Occurrence count
52
*/
53
public WordWithCount(String word, long count);
54
55
/**
56
* String representation in format "word : count"
57
* @return Formatted string representation
58
*/
59
public String toString();
60
}
61
```
62
63
### SocketWindowWordCount (Scala)
64
65
Scala implementation of socket-based windowed word count using case classes and functional API.
66
67
```scala { .api }
68
/**
69
* Scala version of socket windowed word count
70
* @param args Command line arguments (--hostname, --port)
71
*/
72
object SocketWindowWordCount {
73
def main(args: Array[String]): Unit;
74
}
75
76
/** Data type for words with count using Scala case class */
77
case class WordWithCount(word: String, count: Long)
78
```
79
80
**Usage Example:**
81
82
```scala
83
// Scala API usage pattern
84
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
85
val text = env.socketTextStream(hostname, port, '\n')
86
87
val windowCounts = text
88
.flatMap { w => w.split("\\s") }
89
.map { w => WordWithCount(w, 1) }
90
.keyBy("word")
91
.timeWindow(Time.seconds(5))
92
.sum("count")
93
```
94
95
## Key Patterns Demonstrated
96
97
### Socket Data Ingestion
98
```java
99
// Connect to socket text stream
100
DataStream<String> text = env.socketTextStream(hostname, port, "\n");
101
```
102
103
### Windowed Processing Pipeline
104
```java
105
DataStream<WordWithCount> windowCounts = text
106
// Parse and create word objects
107
.flatMap(new FlatMapFunction<String, WordWithCount>() {
108
@Override
109
public void flatMap(String value, Collector<WordWithCount> out) {
110
for (String word : value.split("\\s")) {
111
out.collect(new WordWithCount(word, 1L));
112
}
113
}
114
})
115
// Group by word field
116
.keyBy("word")
117
// Create 5-second time windows
118
.timeWindow(Time.seconds(5))
119
// Aggregate counts within each window
120
.reduce(new ReduceFunction<WordWithCount>() {
121
@Override
122
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
123
return new WordWithCount(a.word, a.count + b.count);
124
}
125
});
126
```
127
128
### Parameter Validation and Error Handling
129
```java
130
final ParameterTool params = ParameterTool.fromArgs(args);
131
final String hostname = params.has("hostname") ? params.get("hostname") : "localhost";
132
final int port;
133
134
try {
135
port = params.getInt("port");
136
} catch (Exception e) {
137
System.err.println("No port specified. Please run 'SocketWindowWordCount " +
138
"--hostname <hostname> --port <port>'");
139
System.err.println("To start a simple text server, run 'netcat -l <port>'");
140
return;
141
}
142
```
143
144
### Parallel Output Control
145
```java
146
// Print results with single thread for ordered output
147
windowCounts.print().setParallelism(1);
148
```
149
150
## Window Operations
151
152
### Time Windows
153
- **Window Size**: 5 seconds (`Time.seconds(5)`)
154
- **Window Type**: Tumbling time windows
155
- **Trigger**: Time-based (fires when window time expires)
156
- **Aggregation**: Reduce function summing word counts
157
158
### Key-Based Partitioning
159
- Uses field-based keying: `.keyBy("word")`
160
- Ensures all instances of same word go to same window
161
- Enables parallel processing per word across windows
162
163
## External System Requirements
164
165
### Socket Server Setup
166
```bash
167
# Using netcat to create test socket server
168
nc -l 12345
169
170
# Type text lines to send to Flink application
171
hello world
172
hello flink streaming
173
```
174
175
### Network Configuration
176
- **Default hostname**: localhost
177
- **Required port**: Must be specified via --port parameter
178
- **Protocol**: TCP text stream with newline delimiters
179
- **Data format**: Plain text lines
180
181
## Dependencies
182
183
```xml
184
<dependency>
185
<groupId>org.apache.flink</groupId>
186
<artifactId>flink-streaming-java_2.10</artifactId>
187
<version>1.3.3</version>
188
</dependency>
189
190
<dependency>
191
<groupId>org.apache.flink</groupId>
192
<artifactId>flink-streaming-scala_2.10</artifactId>
193
<version>1.3.3</version>
194
</dependency>
195
```
196
197
## Required Imports
198
199
### Java Version
200
```java
201
import org.apache.flink.api.common.functions.FlatMapFunction;
202
import org.apache.flink.api.common.functions.ReduceFunction;
203
import org.apache.flink.api.java.utils.ParameterTool;
204
import org.apache.flink.streaming.api.datastream.DataStream;
205
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
206
import org.apache.flink.streaming.api.windowing.time.Time;
207
import org.apache.flink.util.Collector;
208
```
209
210
### Scala Version
211
```scala
212
import org.apache.flink.api.java.utils.ParameterTool
213
import org.apache.flink.streaming.api.scala._
214
import org.apache.flink.streaming.api.windowing.time.Time
215
```