0
# Stream Joins
1
2
Time-based stream joins with coordinated watermarks and window-based join operations. Demonstrates joining two data streams within time windows using event-time processing.
3
4
## Capabilities
5
6
### WindowJoin (Java)
7
8
Window-based join of two data streams with coordinated watermarks and time-based join conditions.
9
10
```java { .api }
11
/**
12
* Example of windowed stream joins
13
* Joins two streams within time windows using event-time processing
14
* @param args Command line arguments (--input path, --output path)
15
*/
16
public class WindowJoin {
17
public static void main(String[] args) throws Exception;
18
}
19
```
20
21
**Usage Example:**
22
23
```bash
24
# Run with default sample data
25
java -cp flink-examples-streaming_2.10-1.3.3.jar \
26
org.apache.flink.streaming.examples.join.WindowJoin
27
28
# Run with file input
29
java -cp flink-examples-streaming_2.10-1.3.3.jar \
30
org.apache.flink.streaming.examples.join.WindowJoin \
31
--input /path/to/input.txt --output /path/to/results.txt
32
```
33
34
### WindowJoin (Scala)
35
36
Scala implementation of windowed stream joins using functional API and case classes.
37
38
```scala { .api }
39
/**
40
* Scala version of windowed stream joins
41
* @param args Command line arguments
42
*/
43
object WindowJoin {
44
def main(args: Array[String]): Unit;
45
}
46
```
47
48
## Join Patterns
49
50
### Time Window Join Setup
51
52
```java
53
// Enable event time processing
54
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
55
56
// Create two data streams with timestamps
57
DataStream<Tuple3<String, String, Long>> orangeStream = env
58
.addSource(new ThrottledIterator<>(OrangeSourceData.ORANGE_DATA.iterator(), elementsPerSecond))
59
.assignTimestampsAndWatermarks(new Tuple3TimestampExtractor());
60
61
DataStream<Tuple3<String, String, Long>> greenStream = env
62
.addSource(new ThrottledIterator<>(GreenSourceData.GREEN_DATA.iterator(), elementsPerSecond))
63
.assignTimestampsAndWatermarks(new Tuple3TimestampExtractor());
64
65
// Perform windowed join
66
DataStream<Tuple6<String, String, String, String, Long, Long>> joinedStream = orangeStream
67
.join(greenStream)
68
.where(new KeySelector<Tuple3<String, String, Long>, String>() {
69
@Override
70
public String getKey(Tuple3<String, String, Long> value) throws Exception {
71
return value.f0; // Join on first field
72
}
73
})
74
.equalTo(new KeySelector<Tuple3<String, String, Long>, String>() {
75
@Override
76
public String getKey(Tuple3<String, String, Long> value) throws Exception {
77
return value.f0; // Join on first field
78
}
79
})
80
.window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize)))
81
.apply(new JoinFunction<Tuple3<String, String, Long>, Tuple3<String, String, Long>,
82
Tuple6<String, String, String, String, Long, Long>>() {
83
@Override
84
public Tuple6<String, String, String, String, Long, Long> join(
85
Tuple3<String, String, Long> orange,
86
Tuple3<String, String, Long> green) throws Exception {
87
return new Tuple6<>(orange.f0, orange.f1, green.f1,
88
"JOINED", orange.f2, green.f2);
89
}
90
});
91
```
92
93
### Timestamp and Watermark Assignment
94
95
```java
96
private static class Tuple3TimestampExtractor
97
extends AscendingTimestampExtractor<Tuple3<String, String, Long>> {
98
99
@Override
100
public long extractAscendingTimestamp(Tuple3<String, String, Long> element) {
101
return element.f2; // Use third field as timestamp
102
}
103
}
104
```
105
106
### Key-Based Join Logic
107
108
```java
109
// Define join keys
110
KeySelector<Tuple3<String, String, Long>, String> keySelector =
111
new KeySelector<Tuple3<String, String, Long>, String>() {
112
@Override
113
public String getKey(Tuple3<String, String, Long> value) throws Exception {
114
return value.f0; // Join on first field (ID)
115
}
116
};
117
118
// Apply join with key selectors
119
orangeStream.join(greenStream)
120
.where(keySelector)
121
.equalTo(keySelector)
122
.window(TumblingEventTimeWindows.of(Time.seconds(windowSizeSeconds)))
123
.apply(joinFunction);
124
```
125
126
## Data Structures
127
128
### Input Data Format
129
130
```java
131
// Input tuples: (id, value, timestamp)
132
Tuple3<String, String, Long> inputElement;
133
// f0: Join key (ID)
134
// f1: Data value
135
// f2: Event timestamp (milliseconds)
136
```
137
138
### Join Result Format
139
140
```java
141
// Join result: (id, leftValue, rightValue, joinType, leftTimestamp, rightTimestamp)
142
Tuple6<String, String, String, String, Long, Long> joinResult;
143
// f0: Join key
144
// f1: Left stream value
145
// f2: Right stream value
146
// f3: Join type indicator ("JOINED")
147
// f4: Left stream timestamp
148
// f5: Right stream timestamp
149
```
150
151
## Sample Data Sources
152
153
### Orange Stream Data
154
155
```java
156
public class WindowJoinSampleData {
157
public static final String[] ORANGE_DATA = {
158
"orange-1,orange-data-1,1000",
159
"orange-2,orange-data-2,2000",
160
"orange-3,orange-data-3,3000",
161
// More sample data...
162
};
163
}
164
```
165
166
### Throttled Data Generation
167
168
```java
169
// Use ThrottledIterator for controlled data emission
170
DataStream<Tuple3<String, String, Long>> orangeStream = env
171
.addSource(new ThrottledIterator<>(
172
OrangeSourceData.ORANGE_DATA.iterator(),
173
elementsPerSecond // Control emission rate
174
))
175
.assignTimestampsAndWatermarks(new Tuple3TimestampExtractor());
176
```
177
178
## Window Configuration
179
180
### Tumbling Event Time Windows
181
182
```java
183
// Fixed-size non-overlapping windows based on event time
184
.window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize)))
185
```
186
187
### Sliding Event Time Windows
188
189
```java
190
// Overlapping windows with slide interval
191
.window(SlidingEventTimeWindows.of(
192
Time.milliseconds(windowSize), // Window size
193
Time.milliseconds(slideInterval) // Slide interval
194
))
195
```
196
197
### Session Windows for Joins
198
199
```java
200
// Variable-size windows based on activity gaps
201
.window(EventTimeSessionWindows.withGap(Time.minutes(sessionGapMinutes)))
202
```
203
204
## Join Function Implementations
205
206
### Basic Join Function
207
208
```java
209
private static class BasicJoinFunction
210
implements JoinFunction<Tuple3<String, String, Long>,
211
Tuple3<String, String, Long>,
212
Tuple6<String, String, String, String, Long, Long>> {
213
214
@Override
215
public Tuple6<String, String, String, String, Long, Long> join(
216
Tuple3<String, String, Long> left,
217
Tuple3<String, String, Long> right) throws Exception {
218
219
return new Tuple6<>(
220
left.f0, // Join key
221
left.f1, // Left value
222
right.f1, // Right value
223
"INNER_JOIN", // Join type
224
left.f2, // Left timestamp
225
right.f2 // Right timestamp
226
);
227
}
228
}
229
```
230
231
### Rich Join Function with State
232
233
```java
234
private static class StatefulJoinFunction
235
extends RichJoinFunction<Tuple3<String, String, Long>,
236
Tuple3<String, String, Long>,
237
Tuple6<String, String, String, String, Long, Long>> {
238
239
private ValueState<Long> joinCountState;
240
241
@Override
242
public void open(Configuration parameters) throws Exception {
243
ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>(
244
"joinCount", Long.class, 0L);
245
joinCountState = getRuntimeContext().getState(descriptor);
246
}
247
248
@Override
249
public Tuple6<String, String, String, String, Long, Long> join(
250
Tuple3<String, String, Long> left,
251
Tuple3<String, String, Long> right) throws Exception {
252
253
Long currentCount = joinCountState.value();
254
joinCountState.update(currentCount + 1);
255
256
return new Tuple6<>(left.f0, left.f1, right.f1,
257
"JOIN_" + currentCount, left.f2, right.f2);
258
}
259
}
260
```
261
262
## Advanced Join Patterns
263
264
### CoGroup for Custom Join Logic
265
266
```java
267
// CoGroup allows custom join logic including outer joins
268
orangeStream.coGroup(greenStream)
269
.where(keySelector)
270
.equalTo(keySelector)
271
.window(TumblingEventTimeWindows.of(Time.seconds(windowSizeSeconds)))
272
.apply(new CoGroupFunction<Tuple3<String, String, Long>,
273
Tuple3<String, String, Long>,
274
String>() {
275
@Override
276
public void coGroup(
277
Iterable<Tuple3<String, String, Long>> left,
278
Iterable<Tuple3<String, String, Long>> right,
279
Collector<String> out) throws Exception {
280
281
// Custom join logic - can handle outer joins
282
for (Tuple3<String, String, Long> leftElement : left) {
283
boolean hasMatch = false;
284
for (Tuple3<String, String, Long> rightElement : right) {
285
out.collect("INNER: " + leftElement + " + " + rightElement);
286
hasMatch = true;
287
}
288
if (!hasMatch) {
289
out.collect("LEFT_OUTER: " + leftElement + " + null");
290
}
291
}
292
}
293
});
294
```
295
296
### Interval Join
297
298
```java
299
// Join elements within time intervals
300
orangeStream.keyBy(keySelector)
301
.intervalJoin(greenStream.keyBy(keySelector))
302
.between(Time.milliseconds(-100), Time.milliseconds(100)) // ±100ms window
303
.process(new ProcessJoinFunction<Tuple3<String, String, Long>,
304
Tuple3<String, String, Long>,
305
String>() {
306
@Override
307
public void processElement(
308
Tuple3<String, String, Long> left,
309
Tuple3<String, String, Long> right,
310
Context ctx,
311
Collector<String> out) throws Exception {
312
out.collect("INTERVAL_JOIN: " + left + " + " + right);
313
}
314
});
315
```
316
317
## Event Time and Watermarks
318
319
### Watermark Strategy
320
321
```java
322
// Assign watermarks with bounded out-of-orderness
323
orangeStream.assignTimestampsAndWatermarks(
324
WatermarkStrategy.<Tuple3<String, String, Long>>forBoundedOutOfOrderness(
325
Duration.ofMillis(100)) // 100ms max out-of-order
326
.withTimestampAssigner((element, timestamp) -> element.f2)
327
);
328
```
329
330
### Late Data Handling
331
332
```java
333
// Configure late data handling
334
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
335
.allowedLateness(Time.seconds(2)) // Allow 2 seconds late data
336
.sideOutputLateData(lateDataTag) // Collect late data in side output
337
```
338
339
## Dependencies
340
341
```xml
342
<dependency>
343
<groupId>org.apache.flink</groupId>
344
<artifactId>flink-streaming-java_2.10</artifactId>
345
<version>1.3.3</version>
346
</dependency>
347
```
348
349
## Required Imports
350
351
```java
352
import org.apache.flink.api.common.functions.JoinFunction;
353
import org.apache.flink.api.java.functions.KeySelector;
354
import org.apache.flink.api.java.tuple.Tuple3;
355
import org.apache.flink.api.java.tuple.Tuple6;
356
import org.apache.flink.streaming.api.TimeCharacteristic;
357
import org.apache.flink.streaming.api.datastream.DataStream;
358
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
359
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
360
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
361
import org.apache.flink.streaming.api.windowing.time.Time;
362
```