or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-sources-sinks.mdexecution-environment.mdgrouping-aggregation.mdhadoop-integration.mdindex.mditerations.mdjoins-cogroups.mdtransformations.mdtype-system.md

joins-cogroups.mddocs/

0

# Joins and CoGroups

1

2

Apache Flink Scala API provides comprehensive support for combining multiple DataSets using joins, co-groups, and cross products with flexible key selection and customizable join strategies.

3

4

## Join Operations

5

6

### Basic Join Setup

7

8

```scala { .api }

9

class DataSet[T] {

10

// Start join operation

11

def join[O: ClassTag](other: DataSet[O]): UnfinishedJoinOperation[T, O]

12

13

// Join with size hints for optimization

14

def joinWithTiny[O: ClassTag](other: DataSet[O]): UnfinishedJoinOperation[T, O]

15

def joinWithHuge[O: ClassTag](other: DataSet[O]): UnfinishedJoinOperation[T, O]

16

}

17

```

18

19

### Outer Joins

20

21

```scala { .api }

22

class DataSet[T] {

23

// Left outer join

24

def leftOuterJoin[O: ClassTag](other: DataSet[O]): UnfinishedJoinOperation[T, O]

25

26

// Right outer join

27

def rightOuterJoin[O: ClassTag](other: DataSet[O]): UnfinishedJoinOperation[T, O]

28

29

// Full outer join

30

def fullOuterJoin[O: ClassTag](other: DataSet[O]): UnfinishedJoinOperation[T, O]

31

}

32

```

33

34

### Join Key Specification

35

36

```scala { .api }

37

class UnfinishedJoinOperation[T, O] {

38

// Specify left side key using function

39

def where[K: TypeInformation](keySelector: T => K): UnfinishedJoinOperationWhere[T, O]

40

41

// Specify left side key using field positions

42

def where(fields: Int*): UnfinishedJoinOperationWhere[T, O]

43

44

// Specify left side key using field names

45

def where(firstField: String, otherFields: String*): UnfinishedJoinOperationWhere[T, O]

46

}

47

48

class UnfinishedJoinOperationWhere[T, O] {

49

// Specify right side key using function

50

def equalTo[K: TypeInformation](keySelector: O => K): UnfinishedJoinOperationWhereEqual[T, O]

51

52

// Specify right side key using field positions

53

def equalTo(fields: Int*): UnfinishedJoinOperationWhereEqual[T, O]

54

55

// Specify right side key using field names

56

def equalTo(firstField: String, otherFields: String*): UnfinishedJoinOperationWhereEqual[T, O]

57

}

58

```

59

60

### Join Function Application

61

62

```scala { .api }

63

class UnfinishedJoinOperationWhereEqual[T, O] {

64

// Apply join function - creates tuple by default

65

def apply[R: TypeInformation: ClassTag](fun: (T, O) => R): DataSet[R]

66

67

// Use JoinFunction

68

def apply[R: TypeInformation: ClassTag](joiner: JoinFunction[T, O, R]): DataSet[R]

69

70

// Get JoinDataSet for further configuration

71

def getJoinDataSet: JoinDataSet[T, O]

72

}

73

74

class JoinDataSet[T, O] {

75

// Apply join function

76

def apply[R: TypeInformation: ClassTag](fun: (T, O) => R): DataSet[R]

77

def apply[R: TypeInformation: ClassTag](joiner: JoinFunction[T, O, R]): DataSet[R]

78

79

// Configuration methods

80

def withJoinHint(joinHint: JoinHint): JoinDataSet[T, O]

81

}

82

```

83

84

## CoGroup Operations

85

86

CoGroup operations group elements from two DataSets by key and provide access to all elements from both groups.

87

88

```scala { .api }

89

class DataSet[T] {

90

// Start coGroup operation

91

def coGroup[O: ClassTag](other: DataSet[O]): UnfinishedCoGroupOperation[T, O]

92

}

93

94

class UnfinishedCoGroupOperation[T, O] {

95

// Specify left side key

96

def where[K: TypeInformation](keySelector: T => K): HalfUnfinishedKeyPairOperation[T, O, K]

97

def where(fields: Int*): HalfUnfinishedKeyPairOperation[T, O, _]

98

def where(firstField: String, otherFields: String*): HalfUnfinishedKeyPairOperation[T, O, _]

99

}

100

101

class HalfUnfinishedKeyPairOperation[T, O, K] {

102

// Specify right side key

103

def equalTo[K2: TypeInformation](keySelector: O => K2): CoGroupDataSet[T, O]

104

def equalTo(fields: Int*): CoGroupDataSet[T, O]

105

def equalTo(firstField: String, otherFields: String*): CoGroupDataSet[T, O]

106

}

107

```

108

109

### CoGroup Function Application

110

111

```scala { .api }

112

class CoGroupDataSet[T, O] {

113

// Apply function with iterators

114

def apply[R: TypeInformation: ClassTag](fun: (Iterator[T], Iterator[O]) => R): DataSet[R]

115

116

// Apply function with iterators returning multiple results

117

def apply[R: TypeInformation: ClassTag](fun: (Iterator[T], Iterator[O]) => TraversableOnce[R]): DataSet[R]

118

119

// Apply function with iterators and collector

120

def apply[R: TypeInformation: ClassTag](fun: (Iterator[T], Iterator[O], Collector[R]) => Unit): DataSet[R]

121

122

// Use CoGroupFunction

123

def apply[R: TypeInformation: ClassTag](coGrouper: CoGroupFunction[T, O, R]): DataSet[R]

124

}

125

```

126

127

## Cross Operations

128

129

Cross operations create Cartesian products of two DataSets.

130

131

```scala { .api }

132

class DataSet[T] {

133

// Cross with other DataSet

134

def cross[O: ClassTag](other: DataSet[O]): CrossDataSet[T, O]

135

136

// Cross with size hints

137

def crossWithTiny[O: ClassTag](other: DataSet[O]): CrossDataSet[T, O]

138

def crossWithHuge[O: ClassTag](other: DataSet[O]): CrossDataSet[T, O]

139

}

140

141

class CrossDataSet[T, O] {

142

// Apply cross function

143

def apply[R: TypeInformation: ClassTag](fun: (T, O) => R): DataSet[R]

144

145

// Use CrossFunction

146

def apply[R: TypeInformation: ClassTag](crosser: CrossFunction[T, O, R]): DataSet[R]

147

148

// Configuration

149

def withCrossHint(crossHint: CrossHint): CrossDataSet[T, O]

150

}

151

```

152

153

## Usage Examples

154

155

### Inner Join

156

157

```scala

158

import org.apache.flink.api.scala._

159

160

case class Customer(id: Int, name: String, city: String)

161

case class Order(customerId: Int, product: String, amount: Double)

162

case class CustomerOrder(customerName: String, city: String, product: String, amount: Double)

163

164

val env = ExecutionEnvironment.getExecutionEnvironment

165

166

val customers = env.fromElements(

167

Customer(1, "Alice", "New York"),

168

Customer(2, "Bob", "London"),

169

Customer(3, "Charlie", "Paris")

170

)

171

172

val orders = env.fromElements(

173

Order(1, "laptop", 1000.0),

174

Order(2, "phone", 500.0),

175

Order(1, "mouse", 25.0)

176

)

177

178

// Join customers with orders

179

val customerOrders = customers

180

.join(orders)

181

.where(_.id)

182

.equalTo(_.customerId)

183

.apply { (customer, order) =>

184

CustomerOrder(customer.name, customer.city, order.product, order.amount)

185

}

186

```

187

188

### Left Outer Join

189

190

```scala

191

import org.apache.flink.api.scala._

192

193

case class Employee(id: Int, name: String, deptId: Int)

194

case class Department(id: Int, name: String)

195

case class EmployeeDept(empName: String, deptName: Option[String])

196

197

val env = ExecutionEnvironment.getExecutionEnvironment

198

199

val employees = env.fromElements(

200

Employee(1, "Alice", 10),

201

Employee(2, "Bob", 20),

202

Employee(3, "Charlie", 99) // Department doesn't exist

203

)

204

205

val departments = env.fromElements(

206

Department(10, "Engineering"),

207

Department(20, "Sales")

208

)

209

210

// Left outer join - all employees, with department if it exists

211

val employeeDepts = employees

212

.leftOuterJoin(departments)

213

.where(_.deptId)

214

.equalTo(_.id)

215

.apply { (emp, deptOpt) =>

216

EmployeeDept(emp.name, Option(deptOpt).map(_.name))

217

}

218

```

219

220

### CoGroup Operation

221

222

```scala

223

import org.apache.flink.api.scala._

224

225

case class Student(id: Int, name: String)

226

case class Grade(studentId: Int, subject: String, score: Int)

227

case class StudentReport(name: String, grades: List[Grade], avgScore: Double)

228

229

val env = ExecutionEnvironment.getExecutionEnvironment

230

231

val students = env.fromElements(

232

Student(1, "Alice"),

233

Student(2, "Bob"),

234

Student(3, "Charlie")

235

)

236

237

val grades = env.fromElements(

238

Grade(1, "Math", 90),

239

Grade(1, "Science", 85),

240

Grade(2, "Math", 78),

241

Grade(2, "Science", 82),

242

Grade(1, "History", 88)

243

)

244

245

// CoGroup to create comprehensive student reports

246

val studentReports = students

247

.coGroup(grades)

248

.where(_.id)

249

.equalTo(_.studentId)

250

.apply { (studentIter, gradeIter) =>

251

val student = studentIter.next() // Should be exactly one

252

val gradeList = gradeIter.toList

253

val avgScore = if (gradeList.nonEmpty) gradeList.map(_.score).sum.toDouble / gradeList.length else 0.0

254

StudentReport(student.name, gradeList, avgScore)

255

}

256

```

257

258

### Multiple Join Conditions

259

260

```scala

261

import org.apache.flink.api.scala._

262

263

case class Sale(year: Int, quarter: Int, region: String, amount: Double)

264

case class Target(year: Int, quarter: Int, region: String, target: Double)

265

case class Performance(year: Int, quarter: Int, region: String, actual: Double, target: Double, achievement: Double)

266

267

val env = ExecutionEnvironment.getExecutionEnvironment

268

269

val sales = env.fromElements(

270

Sale(2023, 1, "US", 1000000),

271

Sale(2023, 1, "EU", 800000),

272

Sale(2023, 2, "US", 1200000)

273

)

274

275

val targets = env.fromElements(

276

Target(2023, 1, "US", 900000),

277

Target(2023, 1, "EU", 750000),

278

Target(2023, 2, "US", 1100000)

279

)

280

281

// Join on multiple fields: year, quarter, region

282

val performance = sales

283

.join(targets)

284

.where(s => (s.year, s.quarter, s.region))

285

.equalTo(t => (t.year, t.quarter, t.region))

286

.apply { (sale, target) =>

287

Performance(

288

sale.year,

289

sale.quarter,

290

sale.region,

291

sale.amount,

292

target.target,

293

sale.amount / target.target

294

)

295

}

296

```

297

298

### Cross Product

299

300

```scala

301

import org.apache.flink.api.scala._

302

303

case class Color(name: String, hex: String)

304

case class Size(name: String, dimension: String)

305

case class Product(color: String, size: String, colorHex: String, sizeDimension: String)

306

307

val env = ExecutionEnvironment.getExecutionEnvironment

308

309

val colors = env.fromElements(

310

Color("Red", "#FF0000"),

311

Color("Blue", "#0000FF"),

312

Color("Green", "#00FF00")

313

)

314

315

val sizes = env.fromElements(

316

Size("Small", "S"),

317

Size("Medium", "M"),

318

Size("Large", "L")

319

)

320

321

// Create all color-size combinations

322

val products = colors

323

.cross(sizes)

324

.apply { (color, size) =>

325

Product(color.name, size.name, color.hex, size.dimension)

326

}

327

```

328

329

### Join with Custom Functions

330

331

```scala

332

import org.apache.flink.api.scala._

333

import org.apache.flink.api.common.functions.JoinFunction

334

335

case class User(id: Int, name: String, email: String)

336

case class Activity(userId: Int, activity: String, timestamp: Long)

337

case class UserActivity(userName: String, email: String, activities: List[String])

338

339

val env = ExecutionEnvironment.getExecutionEnvironment

340

341

val users = env.fromElements(

342

User(1, "Alice", "alice@example.com"),

343

User(2, "Bob", "bob@example.com")

344

)

345

346

val activities = env.fromElements(

347

Activity(1, "login", 1000L),

348

Activity(1, "view_page", 1001L),

349

Activity(2, "login", 1002L)

350

)

351

352

// Using JoinFunction for complex join logic

353

class UserActivityJoinFunction extends JoinFunction[User, Activity, (Int, String, String, String)] {

354

def join(user: User, activity: Activity): (Int, String, String, String) = {

355

(user.id, user.name, user.email, activity.activity)

356

}

357

}

358

359

val userActivities = users

360

.join(activities)

361

.where(_.id)

362

.equalTo(_.userId)

363

.apply(new UserActivityJoinFunction())

364

```

365

366

### Broadcast Join

367

368

```scala

369

import org.apache.flink.api.scala._

370

371

case class Transaction(id: String, productCode: String, amount: Double)

372

case class ProductInfo(code: String, name: String, category: String)

373

case class EnrichedTransaction(id: String, productName: String, category: String, amount: Double)

374

375

val env = ExecutionEnvironment.getExecutionEnvironment

376

377

// Large transaction dataset

378

val transactions = env.fromElements(

379

Transaction("t1", "P001", 100.0),

380

Transaction("t2", "P002", 200.0),

381

Transaction("t3", "P001", 150.0)

382

)

383

384

// Small product info dataset (suitable for broadcasting)

385

val productInfo = env.fromElements(

386

ProductInfo("P001", "Laptop", "Electronics"),

387

ProductInfo("P002", "Phone", "Electronics")

388

)

389

390

// Join with broadcast hint for small dataset

391

val enrichedTransactions = transactions

392

.joinWithTiny(productInfo) // Hint that productInfo is small

393

.where(_.productCode)

394

.equalTo(_.code)

395

.apply { (transaction, product) =>

396

EnrichedTransaction(

397

transaction.id,

398

product.name,

399

product.category,

400

transaction.amount

401

)

402

}

403

```