or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aggregation-grouping.mddata-input-output.mddataset-operations.mdexecution-environments.mdindex.mditeration-operations.mdjoin-cogroup-operations.mdutility-functions.md

join-cogroup-operations.mddocs/

0

# Join and CoGroup Operations

1

2

Advanced operations for combining multiple DataSets using various join strategies, coGroup operations, and cross products. These operations enable complex data processing workflows involving multiple data sources.

3

4

## Capabilities

5

6

### Join Operations

7

8

Join two DataSets based on key equality with support for different join types.

9

10

```java { .api }

11

/**

12

* Join with another DataSet

13

* @param other the other DataSet to join with

14

* @return JoinOperatorSets for key specification and join configuration

15

*/

16

public <R> JoinOperatorSets<T, R> join(DataSet<R> other);

17

```

18

19

**Usage Examples:**

20

21

```java

22

// Inner join on key fields

23

DataSet<Tuple2<Long, String>> users = env.fromElements(

24

new Tuple2<>(1L, "Alice"), new Tuple2<>(2L, "Bob"));

25

DataSet<Tuple2<Long, String>> orders = env.fromElements(

26

new Tuple2<>(1L, "Order1"), new Tuple2<>(1L, "Order2"), new Tuple2<>(2L, "Order3"));

27

28

// Join users and orders on user ID

29

DataSet<Tuple2<Tuple2<Long, String>, Tuple2<Long, String>>> joined = users

30

.join(orders)

31

.where(0) // user ID field in users

32

.equalTo(0) // user ID field in orders

33

.types(Tuple2.class, Tuple2.class);

34

35

// Custom join function

36

DataSet<String> customJoin = users

37

.join(orders)

38

.where(0)

39

.equalTo(0)

40

.with(new JoinFunction<Tuple2<Long, String>, Tuple2<Long, String>, String>() {

41

@Override

42

public String join(Tuple2<Long, String> user, Tuple2<Long, String> order) {

43

return user.f1 + " has " + order.f1;

44

}

45

});

46

```

47

48

### Join Types

49

50

Different types of joins supported by the join operation.

51

52

```java { .api }

53

/**

54

* Enum defining join types

55

*/

56

public enum JoinType {

57

INNER, // Inner join (default)

58

LEFT_OUTER, // Left outer join

59

RIGHT_OUTER, // Right outer join

60

FULL_OUTER // Full outer join

61

}

62

63

/**

64

* Specify the join type

65

* @param joinType the type of join to perform

66

* @return configured join operator

67

*/

68

public JoinOperator<T1, T2, R> with(JoinType joinType);

69

```

70

71

### Join Function Interface

72

73

Custom join logic for combining elements from both DataSets.

74

75

```java { .api }

76

/**

77

* Interface for join functions

78

* @param <IN1> type of elements from first DataSet

79

* @param <IN2> type of elements from second DataSet

80

* @param <OUT> type of result elements

81

*/

82

public interface JoinFunction<IN1, IN2, OUT> extends Function, Serializable {

83

/**

84

* Join function that combines two elements

85

* @param first element from first DataSet

86

* @param second element from second DataSet

87

* @return combined result element

88

*/

89

OUT join(IN1 first, IN2 second) throws Exception;

90

}

91

92

/**

93

* Rich version with access to runtime context

94

*/

95

public abstract class RichJoinFunction<IN1, IN2, OUT>

96

extends AbstractRichFunction implements JoinFunction<IN1, IN2, OUT> {

97

}

98

```

99

100

### CoGroup Operations

101

102

Group elements from two DataSets by key and process groups together.

103

104

```java { .api }

105

/**

106

* CoGroup with another DataSet

107

* @param other the other DataSet to coGroup with

108

* @return CoGroupOperatorSets for key specification and coGroup configuration

109

*/

110

public <R> CoGroupOperator.CoGroupOperatorSets<T, R> coGroup(DataSet<R> other);

111

```

112

113

**Usage Examples:**

114

115

```java

116

// CoGroup users and orders

117

DataSet<String> coGroupResult = users

118

.coGroup(orders)

119

.where(0)

120

.equalTo(0)

121

.with(new CoGroupFunction<Tuple2<Long, String>, Tuple2<Long, String>, String>() {

122

@Override

123

public void coGroup(

124

Iterable<Tuple2<Long, String>> users,

125

Iterable<Tuple2<Long, String>> orders,

126

Collector<String> out) {

127

128

Iterator<Tuple2<Long, String>> userIter = users.iterator();

129

if (userIter.hasNext()) {

130

Tuple2<Long, String> user = userIter.next();

131

int orderCount = 0;

132

for (Tuple2<Long, String> order : orders) {

133

orderCount++;

134

}

135

out.collect(user.f1 + " has " + orderCount + " orders");

136

}

137

}

138

});

139

```

140

141

### CoGroup Function Interface

142

143

Custom coGroup logic for processing groups from both DataSets.

144

145

```java { .api }

146

/**

147

* Interface for coGroup functions

148

* @param <IN1> type of elements from first DataSet

149

* @param <IN2> type of elements from second DataSet

150

* @param <OUT> type of result elements

151

*/

152

public interface CoGroupFunction<IN1, IN2, OUT> extends Function, Serializable {

153

/**

154

* CoGroup function that processes groups from both sides

155

* @param first iterable of elements from first DataSet

156

* @param second iterable of elements from second DataSet

157

* @param out collector for result elements

158

*/

159

void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<OUT> out) throws Exception;

160

}

161

162

/**

163

* Rich version with access to runtime context

164

*/

165

public abstract class RichCoGroupFunction<IN1, IN2, OUT>

166

extends AbstractRichFunction implements CoGroupFunction<IN1, IN2, OUT> {

167

}

168

```

169

170

### Cross Operations

171

172

Compute the cross product (Cartesian product) of two DataSets.

173

174

```java { .api }

175

/**

176

* Cross product with another DataSet

177

* @param other the other DataSet to cross with

178

* @return CrossOperator for cross product configuration

179

*/

180

public <R> CrossOperator.DefaultCross<T, R> cross(DataSet<R> other);

181

```

182

183

**Usage Examples:**

184

185

```java

186

// Simple cross product

187

DataSet<String> colors = env.fromElements("red", "blue");

188

DataSet<String> sizes = env.fromElements("small", "large");

189

190

DataSet<Tuple2<String, String>> crossed = colors

191

.cross(sizes)

192

.types(String.class, String.class);

193

194

// Cross with custom function

195

DataSet<String> customCross = colors

196

.cross(sizes)

197

.with(new CrossFunction<String, String, String>() {

198

@Override

199

public String cross(String color, String size) {

200

return size + " " + color + " item";

201

}

202

});

203

```

204

205

### Cross Function Interface

206

207

Custom cross logic for combining every element from first DataSet with every element from second DataSet.

208

209

```java { .api }

210

/**

211

* Interface for cross functions

212

* @param <IN1> type of elements from first DataSet

213

* @param <IN2> type of elements from second DataSet

214

* @param <OUT> type of result elements

215

*/

216

public interface CrossFunction<IN1, IN2, OUT> extends Function, Serializable {

217

/**

218

* Cross function that combines elements from both DataSets

219

* @param first element from first DataSet

220

* @param second element from second DataSet

221

* @return combined result element

222

*/

223

OUT cross(IN1 first, IN2 second) throws Exception;

224

}

225

226

/**

227

* Rich version with access to runtime context

228

*/

229

public abstract class RichCrossFunction<IN1, IN2, OUT>

230

extends AbstractRichFunction implements CrossFunction<IN1, IN2, OUT> {

231

}

232

```

233

234

### Union Operations

235

236

Combine two DataSets of the same type into a single DataSet.

237

238

```java { .api }

239

/**

240

* Union with another DataSet of the same type

241

* @param other the other DataSet to union with

242

* @return UnionOperator containing elements from both DataSets

243

*/

244

public UnionOperator<T> union(DataSet<T> other);

245

```

246

247

**Usage Examples:**

248

249

```java

250

// Union two DataSets

251

DataSet<String> dataset1 = env.fromElements("a", "b", "c");

252

DataSet<String> dataset2 = env.fromElements("d", "e", "f");

253

254

DataSet<String> combined = dataset1.union(dataset2);

255

// Result contains: a, b, c, d, e, f

256

257

// Union multiple DataSets

258

DataSet<String> dataset3 = env.fromElements("g", "h");

259

DataSet<String> allCombined = dataset1.union(dataset2).union(dataset3);

260

```

261

262

### Key Specification

263

264

Methods for specifying keys for join and coGroup operations.

265

266

```java { .api }

267

/**

268

* Specify key fields by position (for Tuple types)

269

* @param fields the field positions to use as keys

270

* @return key specification for further configuration

271

*/

272

public JoinOperatorSets.JoinOperatorSetsPredicate where(int... fields);

273

274

/**

275

* Specify key fields by name (for POJO types)

276

* @param fields the field names to use as keys

277

* @return key specification for further configuration

278

*/

279

public JoinOperatorSets.JoinOperatorSetsPredicate where(String... fields);

280

281

/**

282

* Specify key using key selector function

283

* @param keyExtractor function to extract the key

284

* @return key specification for further configuration

285

*/

286

public <K> JoinOperatorSets.JoinOperatorSetsPredicateWithKeySelector<K> where(KeySelector<T, K> keyExtractor);

287

288

/**

289

* Specify the matching key fields in the other DataSet

290

* @param fields the field positions in the other DataSet

291

* @return configured join operator

292

*/

293

public JoinOperator.EquiJoin<T1, T2> equalTo(int... fields);

294

```

295

296

### Join Hints

297

298

Performance hints for join execution strategy.

299

300

```java { .api }

301

/**

302

* Enum for join strategy hints

303

*/

304

public enum JoinHint {

305

OPTIMIZER_CHOOSES, // Let optimizer choose strategy

306

BROADCAST_HASH_FIRST, // Broadcast first DataSet and use hash join

307

BROADCAST_HASH_SECOND,// Broadcast second DataSet and use hash join

308

REPARTITION_HASH_FIRST, // Repartition both, use first as build side

309

REPARTITION_HASH_SECOND, // Repartition both, use second as build side

310

REPARTITION_SORT_MERGE // Repartition and sort-merge join

311

}

312

313

/**

314

* Provide a hint for join execution strategy

315

* @param hint the execution strategy hint

316

* @return configured join operator

317

*/

318

public JoinOperator<T1, T2, R> strategy(JoinHint hint);

319

```

320

321

## Types

322

323

```java { .api }

324

import org.apache.flink.api.java.operators.JoinOperator;

325

import org.apache.flink.api.java.operators.CoGroupOperator;

326

import org.apache.flink.api.java.operators.CrossOperator;

327

import org.apache.flink.api.java.operators.UnionOperator;

328

import org.apache.flink.api.java.operators.join.JoinType;

329

import org.apache.flink.api.java.operators.join.JoinOperatorSets;

330

import org.apache.flink.api.common.functions.JoinFunction;

331

import org.apache.flink.api.common.functions.CoGroupFunction;

332

import org.apache.flink.api.common.functions.CrossFunction;

333

import org.apache.flink.api.common.functions.RichJoinFunction;

334

import org.apache.flink.api.common.functions.RichCoGroupFunction;

335

import org.apache.flink.api.common.functions.RichCrossFunction;

336

import org.apache.flink.util.Collector;

337

```