or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async.mdexternal-systems.mdindex.mditeration.mdjoins.mdmachine-learning.mdside-output.mdsocket.mdutilities.mdwindowing.mdwordcount.md

joins.mddocs/

0

# Stream Joins

1

2

Time-based stream joins with coordinated watermarks and window-based join operations. Demonstrates joining two data streams within time windows using event-time processing.

3

4

## Capabilities

5

6

### WindowJoin (Java)

7

8

Window-based join of two data streams with coordinated watermarks and time-based join conditions.

9

10

```java { .api }

11

/**

12

* Example of windowed stream joins

13

* Joins two streams within time windows using event-time processing

14

* @param args Command line arguments (--input path, --output path)

15

*/

16

public class WindowJoin {

17

public static void main(String[] args) throws Exception;

18

}

19

```

20

21

**Usage Example:**

22

23

```bash

24

# Run with default sample data

25

java -cp flink-examples-streaming_2.10-1.3.3.jar \

26

org.apache.flink.streaming.examples.join.WindowJoin

27

28

# Run with file input

29

java -cp flink-examples-streaming_2.10-1.3.3.jar \

30

org.apache.flink.streaming.examples.join.WindowJoin \

31

--input /path/to/input.txt --output /path/to/results.txt

32

```

33

34

### WindowJoin (Scala)

35

36

Scala implementation of windowed stream joins using functional API and case classes.

37

38

```scala { .api }

39

/**

40

* Scala version of windowed stream joins

41

* @param args Command line arguments

42

*/

43

object WindowJoin {

44

def main(args: Array[String]): Unit;

45

}

46

```

47

48

## Join Patterns

49

50

### Time Window Join Setup

51

52

```java

53

// Enable event time processing

54

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

55

56

// Create two data streams with timestamps

57

DataStream<Tuple3<String, String, Long>> orangeStream = env

58

.addSource(new ThrottledIterator<>(OrangeSourceData.ORANGE_DATA.iterator(), elementsPerSecond))

59

.assignTimestampsAndWatermarks(new Tuple3TimestampExtractor());

60

61

DataStream<Tuple3<String, String, Long>> greenStream = env

62

.addSource(new ThrottledIterator<>(GreenSourceData.GREEN_DATA.iterator(), elementsPerSecond))

63

.assignTimestampsAndWatermarks(new Tuple3TimestampExtractor());

64

65

// Perform windowed join

66

DataStream<Tuple6<String, String, String, String, Long, Long>> joinedStream = orangeStream

67

.join(greenStream)

68

.where(new KeySelector<Tuple3<String, String, Long>, String>() {

69

@Override

70

public String getKey(Tuple3<String, String, Long> value) throws Exception {

71

return value.f0; // Join on first field

72

}

73

})

74

.equalTo(new KeySelector<Tuple3<String, String, Long>, String>() {

75

@Override

76

public String getKey(Tuple3<String, String, Long> value) throws Exception {

77

return value.f0; // Join on first field

78

}

79

})

80

.window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize)))

81

.apply(new JoinFunction<Tuple3<String, String, Long>, Tuple3<String, String, Long>,

82

Tuple6<String, String, String, String, Long, Long>>() {

83

@Override

84

public Tuple6<String, String, String, String, Long, Long> join(

85

Tuple3<String, String, Long> orange,

86

Tuple3<String, String, Long> green) throws Exception {

87

return new Tuple6<>(orange.f0, orange.f1, green.f1,

88

"JOINED", orange.f2, green.f2);

89

}

90

});

91

```

92

93

### Timestamp and Watermark Assignment

94

95

```java

96

private static class Tuple3TimestampExtractor

97

extends AscendingTimestampExtractor<Tuple3<String, String, Long>> {

98

99

@Override

100

public long extractAscendingTimestamp(Tuple3<String, String, Long> element) {

101

return element.f2; // Use third field as timestamp

102

}

103

}

104

```

105

106

### Key-Based Join Logic

107

108

```java

109

// Define join keys

110

KeySelector<Tuple3<String, String, Long>, String> keySelector =

111

new KeySelector<Tuple3<String, String, Long>, String>() {

112

@Override

113

public String getKey(Tuple3<String, String, Long> value) throws Exception {

114

return value.f0; // Join on first field (ID)

115

}

116

};

117

118

// Apply join with key selectors

119

orangeStream.join(greenStream)

120

.where(keySelector)

121

.equalTo(keySelector)

122

.window(TumblingEventTimeWindows.of(Time.seconds(windowSizeSeconds)))

123

.apply(joinFunction);

124

```

125

126

## Data Structures

127

128

### Input Data Format

129

130

```java

131

// Input tuples: (id, value, timestamp)

132

Tuple3<String, String, Long> inputElement;

133

// f0: Join key (ID)

134

// f1: Data value

135

// f2: Event timestamp (milliseconds)

136

```

137

138

### Join Result Format

139

140

```java

141

// Join result: (id, leftValue, rightValue, joinType, leftTimestamp, rightTimestamp)

142

Tuple6<String, String, String, String, Long, Long> joinResult;

143

// f0: Join key

144

// f1: Left stream value

145

// f2: Right stream value

146

// f3: Join type indicator ("JOINED")

147

// f4: Left stream timestamp

148

// f5: Right stream timestamp

149

```

150

151

## Sample Data Sources

152

153

### Orange Stream Data

154

155

```java

156

public class WindowJoinSampleData {

157

public static final String[] ORANGE_DATA = {

158

"orange-1,orange-data-1,1000",

159

"orange-2,orange-data-2,2000",

160

"orange-3,orange-data-3,3000",

161

// More sample data...

162

};

163

}

164

```

165

166

### Throttled Data Generation

167

168

```java

169

// Use ThrottledIterator for controlled data emission

170

DataStream<Tuple3<String, String, Long>> orangeStream = env

171

.addSource(new ThrottledIterator<>(

172

OrangeSourceData.ORANGE_DATA.iterator(),

173

elementsPerSecond // Control emission rate

174

))

175

.assignTimestampsAndWatermarks(new Tuple3TimestampExtractor());

176

```

177

178

## Window Configuration

179

180

### Tumbling Event Time Windows

181

182

```java

183

// Fixed-size non-overlapping windows based on event time

184

.window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize)))

185

```

186

187

### Sliding Event Time Windows

188

189

```java

190

// Overlapping windows with slide interval

191

.window(SlidingEventTimeWindows.of(

192

Time.milliseconds(windowSize), // Window size

193

Time.milliseconds(slideInterval) // Slide interval

194

))

195

```

196

197

### Session Windows for Joins

198

199

```java

200

// Variable-size windows based on activity gaps

201

.window(EventTimeSessionWindows.withGap(Time.minutes(sessionGapMinutes)))

202

```

203

204

## Join Function Implementations

205

206

### Basic Join Function

207

208

```java

209

private static class BasicJoinFunction

210

implements JoinFunction<Tuple3<String, String, Long>,

211

Tuple3<String, String, Long>,

212

Tuple6<String, String, String, String, Long, Long>> {

213

214

@Override

215

public Tuple6<String, String, String, String, Long, Long> join(

216

Tuple3<String, String, Long> left,

217

Tuple3<String, String, Long> right) throws Exception {

218

219

return new Tuple6<>(

220

left.f0, // Join key

221

left.f1, // Left value

222

right.f1, // Right value

223

"INNER_JOIN", // Join type

224

left.f2, // Left timestamp

225

right.f2 // Right timestamp

226

);

227

}

228

}

229

```

230

231

### Rich Join Function with State

232

233

```java

234

private static class StatefulJoinFunction

235

extends RichJoinFunction<Tuple3<String, String, Long>,

236

Tuple3<String, String, Long>,

237

Tuple6<String, String, String, String, Long, Long>> {

238

239

private ValueState<Long> joinCountState;

240

241

@Override

242

public void open(Configuration parameters) throws Exception {

243

ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>(

244

"joinCount", Long.class, 0L);

245

joinCountState = getRuntimeContext().getState(descriptor);

246

}

247

248

@Override

249

public Tuple6<String, String, String, String, Long, Long> join(

250

Tuple3<String, String, Long> left,

251

Tuple3<String, String, Long> right) throws Exception {

252

253

Long currentCount = joinCountState.value();

254

joinCountState.update(currentCount + 1);

255

256

return new Tuple6<>(left.f0, left.f1, right.f1,

257

"JOIN_" + currentCount, left.f2, right.f2);

258

}

259

}

260

```

261

262

## Advanced Join Patterns

263

264

### CoGroup for Custom Join Logic

265

266

```java

267

// CoGroup allows custom join logic including outer joins

268

orangeStream.coGroup(greenStream)

269

.where(keySelector)

270

.equalTo(keySelector)

271

.window(TumblingEventTimeWindows.of(Time.seconds(windowSizeSeconds)))

272

.apply(new CoGroupFunction<Tuple3<String, String, Long>,

273

Tuple3<String, String, Long>,

274

String>() {

275

@Override

276

public void coGroup(

277

Iterable<Tuple3<String, String, Long>> left,

278

Iterable<Tuple3<String, String, Long>> right,

279

Collector<String> out) throws Exception {

280

281

// Custom join logic - can handle outer joins

282

for (Tuple3<String, String, Long> leftElement : left) {

283

boolean hasMatch = false;

284

for (Tuple3<String, String, Long> rightElement : right) {

285

out.collect("INNER: " + leftElement + " + " + rightElement);

286

hasMatch = true;

287

}

288

if (!hasMatch) {

289

out.collect("LEFT_OUTER: " + leftElement + " + null");

290

}

291

}

292

}

293

});

294

```

295

296

### Interval Join

297

298

```java

299

// Join elements within time intervals

300

orangeStream.keyBy(keySelector)

301

.intervalJoin(greenStream.keyBy(keySelector))

302

.between(Time.milliseconds(-100), Time.milliseconds(100)) // ±100ms window

303

.process(new ProcessJoinFunction<Tuple3<String, String, Long>,

304

Tuple3<String, String, Long>,

305

String>() {

306

@Override

307

public void processElement(

308

Tuple3<String, String, Long> left,

309

Tuple3<String, String, Long> right,

310

Context ctx,

311

Collector<String> out) throws Exception {

312

out.collect("INTERVAL_JOIN: " + left + " + " + right);

313

}

314

});

315

```

316

317

## Event Time and Watermarks

318

319

### Watermark Strategy

320

321

```java

322

// Assign watermarks with bounded out-of-orderness

323

orangeStream.assignTimestampsAndWatermarks(

324

WatermarkStrategy.<Tuple3<String, String, Long>>forBoundedOutOfOrderness(

325

Duration.ofMillis(100)) // 100ms max out-of-order

326

.withTimestampAssigner((element, timestamp) -> element.f2)

327

);

328

```

329

330

### Late Data Handling

331

332

```java

333

// Configure late data handling

334

.window(TumblingEventTimeWindows.of(Time.seconds(10)))

335

.allowedLateness(Time.seconds(2)) // Allow 2 seconds late data

336

.sideOutputLateData(lateDataTag) // Collect late data in side output

337

```

338

339

## Dependencies

340

341

```xml

342

<dependency>

343

<groupId>org.apache.flink</groupId>

344

<artifactId>flink-streaming-java_2.10</artifactId>

345

<version>1.3.3</version>

346

</dependency>

347

```

348

349

## Required Imports

350

351

```java

352

import org.apache.flink.api.common.functions.JoinFunction;

353

import org.apache.flink.api.java.functions.KeySelector;

354

import org.apache.flink.api.java.tuple.Tuple3;

355

import org.apache.flink.api.java.tuple.Tuple6;

356

import org.apache.flink.streaming.api.TimeCharacteristic;

357

import org.apache.flink.streaming.api.datastream.DataStream;

358

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

359

import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;

360

import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;

361

import org.apache.flink.streaming.api.windowing.time.Time;

362

```