or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async.mdexternal-systems.mdindex.mditeration.mdjoins.mdmachine-learning.mdside-output.mdsocket.mdutilities.mdwindowing.mdwordcount.md

socket.mddocs/

0

# Socket Streaming Examples

1

2

Real-time data processing from socket connections with windowing and aggregation operations. Demonstrates live data ingestion, time-based windowing, and windowed aggregations.

3

4

## Capabilities

5

6

### SocketWindowWordCount (Java)

7

8

Streaming windowed version of word count that connects to a server socket and processes strings in real-time with time-based windows.

9

10

```java { .api }

11

/**

12

* Windowed word count from socket text stream

13

* Connects to server socket and processes text with 5-second time windows

14

* @param args Command line arguments (--hostname, --port)

15

*/

16

public class SocketWindowWordCount {

17

public static void main(String[] args) throws Exception;

18

}

19

```

20

21

**Usage Example:**

22

23

```bash

24

# Start a text server (in separate terminal)

25

nc -l 12345

26

27

# Run the socket word count example

28

java -cp flink-examples-streaming_2.10-1.3.3.jar \

29

org.apache.flink.streaming.examples.socket.SocketWindowWordCount \

30

--hostname localhost --port 12345

31

```

32

33

### WordWithCount Data Class

34

35

POJO class for representing words with their counts in windowed aggregations.

36

37

```java { .api }

38

/**

39

* Data type for words with count used in socket windowing

40

*/

41

public static class WordWithCount {

42

public String word;

43

public long count;

44

45

/** Default constructor for serialization */

46

public WordWithCount();

47

48

/**

49

* Constructor with word and count

50

* @param word The word string

51

* @param count Occurrence count

52

*/

53

public WordWithCount(String word, long count);

54

55

/**

56

* String representation in format "word : count"

57

* @return Formatted string representation

58

*/

59

public String toString();

60

}

61

```

62

63

### SocketWindowWordCount (Scala)

64

65

Scala implementation of socket-based windowed word count using case classes and functional API.

66

67

```scala { .api }

68

/**

69

* Scala version of socket windowed word count

70

* @param args Command line arguments (--hostname, --port)

71

*/

72

object SocketWindowWordCount {

73

def main(args: Array[String]): Unit;

74

}

75

76

/** Data type for words with count using Scala case class */

77

case class WordWithCount(word: String, count: Long)

78

```

79

80

**Usage Example:**

81

82

```scala

83

// Scala API usage pattern

84

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

85

val text = env.socketTextStream(hostname, port, '\n')

86

87

val windowCounts = text

88

.flatMap { w => w.split("\\s") }

89

.map { w => WordWithCount(w, 1) }

90

.keyBy("word")

91

.timeWindow(Time.seconds(5))

92

.sum("count")

93

```

94

95

## Key Patterns Demonstrated

96

97

### Socket Data Ingestion

98

```java

99

// Connect to socket text stream

100

DataStream<String> text = env.socketTextStream(hostname, port, "\n");

101

```

102

103

### Windowed Processing Pipeline

104

```java

105

DataStream<WordWithCount> windowCounts = text

106

// Parse and create word objects

107

.flatMap(new FlatMapFunction<String, WordWithCount>() {

108

@Override

109

public void flatMap(String value, Collector<WordWithCount> out) {

110

for (String word : value.split("\\s")) {

111

out.collect(new WordWithCount(word, 1L));

112

}

113

}

114

})

115

// Group by word field

116

.keyBy("word")

117

// Create 5-second time windows

118

.timeWindow(Time.seconds(5))

119

// Aggregate counts within each window

120

.reduce(new ReduceFunction<WordWithCount>() {

121

@Override

122

public WordWithCount reduce(WordWithCount a, WordWithCount b) {

123

return new WordWithCount(a.word, a.count + b.count);

124

}

125

});

126

```

127

128

### Parameter Validation and Error Handling

129

```java

130

final ParameterTool params = ParameterTool.fromArgs(args);

131

final String hostname = params.has("hostname") ? params.get("hostname") : "localhost";

132

final int port;

133

134

try {

135

port = params.getInt("port");

136

} catch (Exception e) {

137

System.err.println("No port specified. Please run 'SocketWindowWordCount " +

138

"--hostname <hostname> --port <port>'");

139

System.err.println("To start a simple text server, run 'netcat -l <port>'");

140

return;

141

}

142

```

143

144

### Parallel Output Control

145

```java

146

// Print results with single thread for ordered output

147

windowCounts.print().setParallelism(1);

148

```

149

150

## Window Operations

151

152

### Time Windows

153

- **Window Size**: 5 seconds (`Time.seconds(5)`)

154

- **Window Type**: Tumbling time windows

155

- **Trigger**: Time-based (fires when window time expires)

156

- **Aggregation**: Reduce function summing word counts

157

158

### Key-Based Partitioning

159

- Uses field-based keying: `.keyBy("word")`

160

- Ensures all instances of same word go to same window

161

- Enables parallel processing per word across windows

162

163

## External System Requirements

164

165

### Socket Server Setup

166

```bash

167

# Using netcat to create test socket server

168

nc -l 12345

169

170

# Type text lines to send to Flink application

171

hello world

172

hello flink streaming

173

```

174

175

### Network Configuration

176

- **Default hostname**: localhost

177

- **Required port**: Must be specified via --port parameter

178

- **Protocol**: TCP text stream with newline delimiters

179

- **Data format**: Plain text lines

180

181

## Dependencies

182

183

```xml

184

<dependency>

185

<groupId>org.apache.flink</groupId>

186

<artifactId>flink-streaming-java_2.10</artifactId>

187

<version>1.3.3</version>

188

</dependency>

189

190

<dependency>

191

<groupId>org.apache.flink</groupId>

192

<artifactId>flink-streaming-scala_2.10</artifactId>

193

<version>1.3.3</version>

194

</dependency>

195

```

196

197

## Required Imports

198

199

### Java Version

200

```java

201

import org.apache.flink.api.common.functions.FlatMapFunction;

202

import org.apache.flink.api.common.functions.ReduceFunction;

203

import org.apache.flink.api.java.utils.ParameterTool;

204

import org.apache.flink.streaming.api.datastream.DataStream;

205

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

206

import org.apache.flink.streaming.api.windowing.time.Time;

207

import org.apache.flink.util.Collector;

208

```

209

210

### Scala Version

211

```scala

212

import org.apache.flink.api.java.utils.ParameterTool

213

import org.apache.flink.streaming.api.scala._

214

import org.apache.flink.streaming.api.windowing.time.Time

215

```