or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

dataset-operations.mdexecution-environment.mdextensions.mdgrouped-dataset-operations.mdindex.mdjoin-operations.mdtype-system.mdutility-functions.md

join-operations.mddocs/

0

# Join Operations

1

2

Join operations combine two DataSets based on key equality. Flink supports various join types and provides optimization hints for better performance.

3

4

## Join Types

5

6

### Inner Join

7

8

```scala { .api }

9

class DataSet[T] {

10

def join[O](other: DataSet[O]): UnfinishedJoinOperation[T, O]

11

}

12

```

13

14

### Outer Joins

15

16

```scala { .api }

17

class DataSet[T] {

18

def leftOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]

19

def rightOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]

20

def fullOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]

21

}

22

```

23

24

## Join Key Specification

25

26

### UnfinishedJoinOperation

27

28

```scala { .api }

29

class UnfinishedJoinOperation[L, R] {

30

def where[K: TypeInformation](fun: L => K): HalfUnfinishedJoinOperation[L, R]

31

def where(fields: Int*): HalfUnfinishedJoinOperation[L, R]

32

def where(firstField: String, otherFields: String*): HalfUnfinishedJoinOperation[L, R]

33

}

34

35

class HalfUnfinishedJoinOperation[L, R] {

36

def equalTo[K: TypeInformation](fun: R => K): JoinDataSet[L, R]

37

def equalTo(fields: Int*): JoinDataSet[L, R]

38

def equalTo(firstField: String, otherFields: String*): JoinDataSet[L, R]

39

}

40

```

41

42

## Join Function Application

43

44

```scala { .api }

45

class JoinDataSet[L, R] {

46

def apply[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O]

47

def apply[O: TypeInformation: ClassTag](fun: (L, R, Collector[O]) => Unit): DataSet[O]

48

def apply[O: TypeInformation: ClassTag](joiner: JoinFunction[L, R, O]): DataSet[O]

49

def apply[O: TypeInformation: ClassTag](joiner: FlatJoinFunction[L, R, O]): DataSet[O]

50

51

// Optimization hints

52

def withPartitioner[K: TypeInformation](partitioner: Partitioner[K]): JoinDataSet[L, R]

53

}

54

```

55

56

## Usage Examples

57

58

### Basic Inner Join

59

60

```scala

61

case class Employee(id: Int, name: String, deptId: Int)

62

case class Department(id: Int, name: String)

63

64

val employees = env.fromElements(

65

Employee(1, "Alice", 10),

66

Employee(2, "Bob", 20),

67

Employee(3, "Charlie", 10)

68

)

69

70

val departments = env.fromElements(

71

Department(10, "Engineering"),

72

Department(20, "Sales"),

73

Department(30, "Marketing")

74

)

75

76

// Join by department ID

77

val employeeDepartments = employees

78

.join(departments)

79

.where(_.deptId)

80

.equalTo(_.id)

81

.apply((emp, dept) => (emp.name, dept.name))

82

83

// Result: ("Alice", "Engineering"), ("Bob", "Sales"), ("Charlie", "Engineering")

84

```

85

86

### Join with Field Positions

87

88

```scala

89

val orders = env.fromElements(

90

(1, "Order1", 100), // (orderId, orderName, customerId)

91

(2, "Order2", 200),

92

(3, "Order3", 100)

93

)

94

95

val customers = env.fromElements(

96

(100, "Customer A"), // (customerId, customerName)

97

(200, "Customer B"),

98

(300, "Customer C")

99

)

100

101

// Join using field positions

102

val orderCustomers = orders

103

.join(customers)

104

.where(2) // customerId field in orders (index 2)

105

.equalTo(0) // customerId field in customers (index 0)

106

.apply((order, customer) => (order._2, customer._2))

107

```

108

109

### Join with Field Names

110

111

```scala

112

case class Order(orderId: Int, orderName: String, customerId: Int)

113

case class Customer(customerId: Int, customerName: String)

114

115

val orders = env.fromElements(

116

Order(1, "Order1", 100),

117

Order(2, "Order2", 200)

118

)

119

120

val customers = env.fromElements(

121

Customer(100, "Customer A"),

122

Customer(200, "Customer B")

123

)

124

125

// Join using field names

126

val result = orders

127

.join(customers)

128

.where("customerId")

129

.equalTo("customerId")

130

.apply((order, customer) => s"${order.orderName} for ${customer.customerName}")

131

```

132

133

### Left Outer Join

134

135

```scala

136

val leftData = env.fromElements(("A", 1), ("B", 2), ("C", 3))

137

val rightData = env.fromElements(("A", 10), ("C", 30), ("D", 40))

138

139

val leftOuterResult = leftData

140

.leftOuterJoin(rightData)

141

.where(_._1)

142

.equalTo(_._1)

143

.apply { (left, right) =>

144

val rightValue = Option(right).map(_._2).getOrElse(0)

145

(left._1, left._2, rightValue)

146

}

147

148

// Result: ("A", 1, 10), ("B", 2, 0), ("C", 3, 30)

149

```

150

151

### Right Outer Join

152

153

```scala

154

val rightOuterResult = leftData

155

.rightOuterJoin(rightData)

156

.where(_._1)

157

.equalTo(_._1)

158

.apply { (left, right) =>

159

val leftValue = Option(left).map(_._2).getOrElse(0)

160

(right._1, leftValue, right._2)

161

}

162

163

// Result: ("A", 1, 10), ("C", 3, 30), ("D", 0, 40)

164

```

165

166

### Full Outer Join

167

168

```scala

169

val fullOuterResult = leftData

170

.fullOuterJoin(rightData)

171

.where(_._1)

172

.equalTo(_._1)

173

.apply { (left, right) =>

174

val key = if (left != null) left._1 else right._1

175

val leftValue = Option(left).map(_._2).getOrElse(0)

176

val rightValue = Option(right).map(_._2).getOrElse(0)

177

(key, leftValue, rightValue)

178

}

179

180

// Result: ("A", 1, 10), ("B", 2, 0), ("C", 3, 30), ("D", 0, 40)

181

```

182

183

## Multiple Key Fields

184

185

```scala

186

case class Sale(region: String, product: String, amount: Double)

187

case class Target(region: String, product: String, target: Double)

188

189

val sales = env.fromElements(

190

Sale("US", "Product A", 1000),

191

Sale("EU", "Product B", 1500)

192

)

193

194

val targets = env.fromElements(

195

Target("US", "Product A", 1200),

196

Target("EU", "Product B", 1400)

197

)

198

199

// Join on multiple fields

200

val comparison = sales

201

.join(targets)

202

.where(s => (s.region, s.product))

203

.equalTo(t => (t.region, t.product))

204

.apply { (sale, target) =>

205

(sale.region, sale.product, sale.amount, target.target, sale.amount - target.target)

206

}

207

```

208

209

## Cross Product Operations

210

211

Cross operations create a Cartesian product of two DataSets.

212

213

### Cross Product

214

215

```scala { .api }

216

class DataSet[T] {

217

def cross[O](other: DataSet[O]): CrossDataSet[T, O]

218

def crossWithTiny[O](other: DataSet[O]): CrossDataSet[T, O]

219

def crossWithHuge[O](other: DataSet[O]): CrossDataSet[T, O]

220

}

221

222

class CrossDataSet[L, R] {

223

def apply[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O]

224

def apply[O: TypeInformation: ClassTag](fun: (L, R, Collector[O]) => Unit): DataSet[O]

225

def apply[O: TypeInformation: ClassTag](crosser: CrossFunction[L, R, O]): DataSet[O]

226

}

227

```

228

229

### Cross Examples

230

231

```scala

232

val left = env.fromElements("A", "B")

233

val right = env.fromElements(1, 2, 3)

234

235

// Simple cross product

236

val crossed = left

237

.cross(right)

238

.apply((l, r) => s"$l-$r")

239

240

// Result: "A-1", "A-2", "A-3", "B-1", "B-2", "B-3"

241

242

// Cross with hints for optimization

243

val leftSmall = env.fromElements(1, 2) // Small dataset

244

val rightLarge = env.fromElements((1 to 1000000): _*) // Large dataset

245

246

// Hint that left is tiny (will be broadcast)

247

val crossedWithHint = leftSmall

248

.crossWithHuge(rightLarge)

249

.apply((small, large) => small * large)

250

```

251

252

## CoGroup Operations

253

254

CoGroup operations group elements from two DataSets by key and process groups together.

255

256

### CoGroup API

257

258

```scala { .api }

259

class DataSet[T] {

260

def coGroup[O: ClassTag](other: DataSet[O]): UnfinishedCoGroupOperation[T, O]

261

}

262

263

class UnfinishedCoGroupOperation[L, R] {

264

def where[K: TypeInformation](fun: L => K): HalfUnfinishedCoGroupOperation[L, R]

265

def where(fields: Int*): HalfUnfinishedCoGroupOperation[L, R]

266

}

267

268

class CoGroupDataSet[L, R] {

269

def apply[O: TypeInformation: ClassTag](fun: (Iterator[L], Iterator[R]) => O): DataSet[O]

270

def apply[O: TypeInformation: ClassTag](fun: (Iterator[L], Iterator[R], Collector[O]) => Unit): DataSet[O]

271

def apply[O: TypeInformation: ClassTag](coGroupFunction: CoGroupFunction[L, R, O]): DataSet[O]

272

}

273

```

274

275

### CoGroup Examples

276

277

```scala

278

val left = env.fromElements(("A", 1), ("B", 2), ("A", 3))

279

val right = env.fromElements(("A", 10), ("C", 30), ("A", 20))

280

281

val coGrouped = left

282

.coGroup(right)

283

.where(_._1)

284

.equalTo(_._1)

285

.apply { (leftIterator, rightIterator) =>

286

val leftList = leftIterator.toList

287

val rightList = rightIterator.toList

288

val key = if (leftList.nonEmpty) leftList.head._1 else rightList.head._1

289

val leftSum = leftList.map(_._2).sum

290

val rightSum = rightList.map(_._2).sum

291

(key, leftSum, rightSum)

292

}

293

294

// Result: ("A", 4, 30), ("B", 2, 0), ("C", 0, 30)

295

```

296

297

## Types

298

299

```scala { .api }

300

// Join function interfaces

301

trait JoinFunction[IN1, IN2, OUT] extends Function {

302

def join(first: IN1, second: IN2): OUT

303

}

304

305

trait FlatJoinFunction[IN1, IN2, OUT] extends Function {

306

def join(first: IN1, second: IN2, out: Collector[OUT]): Unit

307

}

308

309

// Cross function interfaces

310

trait CrossFunction[IN1, IN2, OUT] extends Function {

311

def cross(val1: IN1, val2: IN2): OUT

312

}

313

314

// CoGroup function interface

315

trait CoGroupFunction[IN1, IN2, OUT] extends Function {

316

def coGroup(first: java.lang.Iterable[IN1], second: java.lang.Iterable[IN2], out: Collector[OUT]): Unit

317

}

318

319

// For outer joins, null values are possible

320

// Always check for null in outer join functions:

321

def handleOuterJoin(left: LeftType, right: RightType): ResultType = {

322

val leftValue = Option(left).getOrElse(defaultLeft)

323

val rightValue = Option(right).getOrElse(defaultRight)

324

// Process values...

325

}

326

```

327

328

## Performance Hints

329

330

### Join Hints

331

332

```scala

333

// For joins where one dataset is much smaller

334

val result = largeDataSet

335

.join(smallDataSet)

336

.where(_.key)

337

.equalTo(_.key)

338

.withPartitioner(new CustomPartitioner()) // Custom partitioning

339

.apply((large, small) => processJoin(large, small))

340

341

// Cross product hints

342

val crossResult = smallDataSet

343

.crossWithHuge(largeDataSet) // smallDataSet will be broadcast

344

.apply((small, large) => combine(small, large))

345

```

346

347

### Join Strategy Selection

348

349

Flink automatically chooses join strategies, but you can influence the choice:

350

351

- Use `crossWithTiny()` when one dataset fits in memory

352

- Use `crossWithHuge()` when the other dataset is very large

353

- CoGroup is efficient for joining datasets with many-to-many relationships

354

- Consider using broadcast variables for very small lookup datasets