or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

binary-operations.mddata-transformations.mdexecution-environment.mdgrouping-aggregation.mdindex.mdinput-output.mdpartitioning-distribution.mdtype-system.mdutilities.md

binary-operations.mddocs/

0

# Binary Operations

1

2

Operations for combining multiple DataSets through joins, crosses, and coGroup operations. These operations enable complex data relationships and multi-dataset analysis.

3

4

## Capabilities

5

6

### Join Operations

7

8

Combine two DataSets based on matching keys with various join strategies.

9

10

```scala { .api }

11

class DataSet[T] {

12

/**

13

* Starts an inner join with another DataSet

14

* @param other DataSet to join with

15

* @return UnfinishedJoinOperation for key specification

16

*/

17

def join[O](other: DataSet[O]): UnfinishedJoinOperation[T, O]

18

19

/**

20

* Starts an inner join with optimizer hint

21

* @param other DataSet to join with

22

* @param strategy Join strategy hint (OPTIMIZER_CHOOSES, BROADCAST_HASH_FIRST, etc.)

23

* @return UnfinishedJoinOperation for key specification

24

*/

25

def join[O](other: DataSet[O], strategy: JoinHint): UnfinishedJoinOperation[T, O]

26

27

/**

28

* Joins where other DataSet is small (broadcast join)

29

* @param other Small DataSet to broadcast

30

* @return UnfinishedJoinOperation for key specification

31

*/

32

def joinWithTiny[O](other: DataSet[O]): UnfinishedJoinOperation[T, O]

33

34

/**

35

* Joins where other DataSet is large (this DataSet is broadcast)

36

* @param other Large DataSet

37

* @return UnfinishedJoinOperation for key specification

38

*/

39

def joinWithHuge[O](other: DataSet[O]): UnfinishedJoinOperation[T, O]

40

}

41

```

42

43

### Outer Join Operations

44

45

Perform left, right, and full outer joins to include non-matching elements.

46

47

```scala { .api }

48

class DataSet[T] {

49

/**

50

* Starts a full outer join with another DataSet

51

* @param other DataSet to join with

52

* @return UnfinishedOuterJoinOperation for key specification

53

*/

54

def fullOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]

55

56

/**

57

* Starts a full outer join with optimizer hint

58

* @param other DataSet to join with

59

* @param strategy Join strategy hint

60

* @return UnfinishedOuterJoinOperation for key specification

61

*/

62

def fullOuterJoin[O](other: DataSet[O], strategy: JoinHint): UnfinishedOuterJoinOperation[T, O]

63

64

/**

65

* Starts a left outer join (keeps all elements from this DataSet)

66

* @param other DataSet to join with

67

* @return UnfinishedOuterJoinOperation for key specification

68

*/

69

def leftOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]

70

71

/**

72

* Starts a left outer join with optimizer hint

73

* @param other DataSet to join with

74

* @param strategy Join strategy hint

75

* @return UnfinishedOuterJoinOperation for key specification

76

*/

77

def leftOuterJoin[O](other: DataSet[O], strategy: JoinHint): UnfinishedOuterJoinOperation[T, O]

78

79

/**

80

* Starts a right outer join (keeps all elements from other DataSet)

81

* @param other DataSet to join with

82

* @return UnfinishedOuterJoinOperation for key specification

83

*/

84

def rightOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]

85

86

/**

87

* Starts a right outer join with optimizer hint

88

* @param other DataSet to join with

89

* @param strategy Join strategy hint

90

* @return UnfinishedOuterJoinOperation for key specification

91

*/

92

def rightOuterJoin[O](other: DataSet[O], strategy: JoinHint): UnfinishedOuterJoinOperation[T, O]

93

}

94

```

95

96

### Join Key Specification

97

98

Specify join keys and apply join functions to create final results.

99

100

```scala { .api }

101

class UnfinishedJoinOperation[L, R] {

102

/**

103

* Specifies join key using field positions

104

* @param fields Field positions for join key

105

* @return KeySelector for specifying other side's key

106

*/

107

def where(fields: Int*): Keys[L]

108

109

/**

110

* Specifies join key using field names

111

* @param firstField First field name

112

* @param otherFields Additional field names

113

* @return KeySelector for specifying other side's key

114

*/

115

def where(firstField: String, otherFields: String*): Keys[L]

116

117

/**

118

* Specifies join key using key selector function

119

* @param fun Key selector function

120

* @return KeySelector for specifying other side's key

121

*/

122

def where[K: TypeInformation](fun: L => K): Keys[L]

123

}

124

125

class Keys[T] {

126

/**

127

* Specifies other side's join key using field positions

128

* @param fields Field positions for other side's key

129

* @return JoinFunctionAssigner for applying join function

130

*/

131

def equalTo(fields: Int*): JoinFunctionAssigner[L, R]

132

133

/**

134

* Specifies other side's join key using field names

135

* @param firstField First field name

136

* @param otherFields Additional field names

137

* @return JoinFunctionAssigner for applying join function

138

*/

139

def equalTo(firstField: String, otherFields: String*): JoinFunctionAssigner[L, R]

140

141

/**

142

* Specifies other side's join key using key selector function

143

* @param fun Key selector function

144

* @return JoinFunctionAssigner for applying join function

145

*/

146

def equalTo[K: TypeInformation](fun: R => K): JoinFunctionAssigner[L, R]

147

}

148

```

149

150

### Join Function Application

151

152

Apply functions to joined elements to produce final results.

153

154

```scala { .api }

155

trait JoinFunctionAssigner[L, R] {

156

/**

157

* Applies function to each pair of joined elements

158

* @param fun Function combining left and right elements

159

* @return DataSet with join results

160

*/

161

def apply[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O]

162

163

/**

164

* Applies function with collector for multiple outputs per join

165

* @param fun Function with collector for emitting multiple results

166

* @return DataSet with collected join results

167

*/

168

def apply[O: TypeInformation: ClassTag](fun: (L, R, Collector[O]) => Unit): DataSet[O]

169

170

/**

171

* Applies JoinFunction to joined elements

172

* @param joiner JoinFunction implementation

173

* @return DataSet with join results

174

*/

175

def apply[O: TypeInformation: ClassTag](joiner: JoinFunction[L, R, O]): DataSet[O]

176

177

/**

178

* Applies FlatJoinFunction for multiple outputs per join

179

* @param joiner FlatJoinFunction implementation

180

* @return DataSet with flattened join results

181

*/

182

def apply[O: TypeInformation: ClassTag](joiner: FlatJoinFunction[L, R, O]): DataSet[O]

183

184

/**

185

* Uses custom partitioner for join distribution

186

* @param partitioner Custom partitioner

187

* @return JoinFunctionAssigner with custom partitioning

188

*/

189

def withPartitioner[K: TypeInformation](partitioner: Partitioner[K]): JoinFunctionAssigner[L, R]

190

}

191

```

192

193

**Usage Examples:**

194

195

```scala

196

import org.apache.flink.api.scala._

197

198

case class Customer(id: Int, name: String, city: String)

199

case class Order(id: Int, customerId: Int, product: String, amount: Double)

200

case class CustomerOrder(customerName: String, product: String, amount: Double)

201

202

val env = ExecutionEnvironment.getExecutionEnvironment

203

204

val customers = env.fromElements(

205

Customer(1, "Alice", "NYC"),

206

Customer(2, "Bob", "LA"),

207

Customer(3, "Charlie", "Chicago")

208

)

209

210

val orders = env.fromElements(

211

Order(101, 1, "Laptop", 999.99),

212

Order(102, 2, "Phone", 599.99),

213

Order(103, 1, "Mouse", 29.99)

214

)

215

216

// Inner join customers with orders

217

val customerOrders = customers

218

.join(orders)

219

.where(_.id)

220

.equalTo(_.customerId)

221

.apply((customer, order) => CustomerOrder(customer.name, order.product, order.amount))

222

223

// Left outer join to include customers without orders

224

val allCustomerOrders = customers

225

.leftOuterJoin(orders)

226

.where(_.id)

227

.equalTo(_.customerId)

228

.apply { (customer, order) =>

229

if (order != null) {

230

CustomerOrder(customer.name, order.product, order.amount)

231

} else {

232

CustomerOrder(customer.name, "No orders", 0.0)

233

}

234

}

235

```

236

237

### Cross Operations

238

239

Create Cartesian product of two DataSets for all-pairs operations.

240

241

```scala { .api }

242

class DataSet[T] {

243

/**

244

* Creates Cartesian product with another DataSet

245

* @param other DataSet to cross with

246

* @return CrossDataSet for applying cross function

247

*/

248

def cross[O](other: DataSet[O]): CrossDataSet[T, O]

249

250

/**

251

* Cross where other DataSet is small (broadcast)

252

* @param other Small DataSet to broadcast

253

* @return CrossDataSet for applying cross function

254

*/

255

def crossWithTiny[O](other: DataSet[O]): CrossDataSet[T, O]

256

257

/**

258

* Cross where other DataSet is large (this DataSet is broadcast)

259

* @param other Large DataSet

260

* @return CrossDataSet for applying cross function

261

*/

262

def crossWithHuge[O](other: DataSet[O]): CrossDataSet[T, O]

263

}

264

```

265

266

### Cross Function Application

267

268

Apply functions to all pairs of elements from crossed DataSets.

269

270

```scala { .api }

271

class CrossDataSet[L, R] {

272

/**

273

* Applies function to each pair of crossed elements

274

* @param fun Function combining left and right elements

275

* @return DataSet with cross results

276

*/

277

def apply[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O]

278

279

/**

280

* Applies CrossFunction to crossed elements

281

* @param crosser CrossFunction implementation

282

* @return DataSet with cross results

283

*/

284

def apply[O: TypeInformation: ClassTag](crosser: CrossFunction[L, R, O]): DataSet[O]

285

}

286

```

287

288

**Usage Examples:**

289

290

```scala

291

val colors = env.fromElements("Red", "Green", "Blue")

292

val sizes = env.fromElements("Small", "Medium", "Large")

293

294

// Create all color-size combinations

295

val combinations = colors

296

.cross(sizes)

297

.apply((color, size) => s"$color $size")

298

299

// Results: ["Red Small", "Red Medium", "Red Large", "Green Small", ...]

300

```

301

302

### CoGroup Operations

303

304

Group elements from two DataSets by key and process groups together.

305

306

```scala { .api }

307

class DataSet[T] {

308

/**

309

* Starts a coGroup operation with another DataSet

310

* @param other DataSet to coGroup with

311

* @return UnfinishedCoGroupOperation for key specification

312

*/

313

def coGroup[O: ClassTag](other: DataSet[O]): UnfinishedCoGroupOperation[T, O]

314

}

315

```

316

317

### CoGroup Key Specification and Function Application

318

319

```scala { .api }

320

class UnfinishedCoGroupOperation[L, R] {

321

/**

322

* Specifies coGroup key using field positions

323

* @param fields Field positions for coGroup key

324

* @return KeySelector for specifying other side's key

325

*/

326

def where(fields: Int*): Keys[L]

327

328

/**

329

* Specifies coGroup key using key selector function

330

* @param fun Key selector function

331

* @return KeySelector for specifying other side's key

332

*/

333

def where[K: TypeInformation](fun: L => K): Keys[L]

334

}

335

336

class CoGroupDataSet[L, R] {

337

/**

338

* Applies function to each pair of grouped iterators

339

* @param fun Function processing left and right group iterators

340

* @return DataSet with coGroup results

341

*/

342

def apply[O: TypeInformation: ClassTag](fun: (Iterator[L], Iterator[R]) => O): DataSet[O]

343

344

/**

345

* Applies function with collector to grouped iterators

346

* @param fun Function with collector for multiple outputs

347

* @return DataSet with collected coGroup results

348

*/

349

def apply[O: TypeInformation: ClassTag](fun: (Iterator[L], Iterator[R], Collector[O]) => Unit): DataSet[O]

350

351

/**

352

* Applies CoGroupFunction to grouped iterators

353

* @param coGrouper CoGroupFunction implementation

354

* @return DataSet with coGroup results

355

*/

356

def apply[O: TypeInformation: ClassTag](coGrouper: CoGroupFunction[L, R, O]): DataSet[O]

357

358

/**

359

* Uses custom partitioner for coGroup distribution

360

* @param partitioner Custom partitioner

361

* @return CoGroupDataSet with custom partitioning

362

*/

363

def withPartitioner[K: TypeInformation](partitioner: Partitioner[K]): CoGroupDataSet[L, R]

364

}

365

```

366

367

### CoGroup Sorting

368

369

Sort elements within each group before coGroup processing.

370

371

```scala { .api }

372

class CoGroupDataSet[L, R] {

373

/**

374

* Sorts first DataSet's groups by field position

375

* @param field Field position for sorting

376

* @param order Sort order

377

* @return CoGroupDataSet with sorted first groups

378

*/

379

def sortFirstGroup(field: Int, order: Order): CoGroupDataSet[L, R]

380

381

/**

382

* Sorts first DataSet's groups by field name

383

* @param field Field name for sorting

384

* @param order Sort order

385

* @return CoGroupDataSet with sorted first groups

386

*/

387

def sortFirstGroup(field: String, order: Order): CoGroupDataSet[L, R]

388

389

/**

390

* Sorts second DataSet's groups by field position

391

* @param field Field position for sorting

392

* @param order Sort order

393

* @return CoGroupDataSet with sorted second groups

394

*/

395

def sortSecondGroup(field: Int, order: Order): CoGroupDataSet[L, R]

396

397

/**

398

* Sorts second DataSet's groups by field name

399

* @param field Field name for sorting

400

* @param order Sort order

401

* @return CoGroupDataSet with sorted second groups

402

*/

403

def sortSecondGroup(field: String, order: Order): CoGroupDataSet[L, R]

404

}

405

```

406

407

**Usage Examples:**

408

409

```scala

410

case class LeftData(key: String, value: Int)

411

case class RightData(key: String, description: String)

412

413

val leftData = env.fromElements(

414

LeftData("A", 1),

415

LeftData("A", 2),

416

LeftData("B", 3)

417

)

418

419

val rightData = env.fromElements(

420

RightData("A", "Group A data"),

421

RightData("C", "Group C data")

422

)

423

424

// CoGroup by key to process groups together

425

val coGroupResult = leftData

426

.coGroup(rightData)

427

.where(_.key)

428

.equalTo(_.key)

429

.apply { (leftIter, rightIter) =>

430

val leftList = leftIter.toList

431

val rightList = rightIter.toList

432

val key = if (leftList.nonEmpty) leftList.head.key else rightList.head.key

433

val leftSum = leftList.map(_.value).sum

434

val rightCount = rightList.length

435

s"Key: $key, LeftSum: $leftSum, RightCount: $rightCount"

436

}

437

```

438

439

### Union Operations

440

441

Combine DataSets of the same type without key matching.

442

443

```scala { .api }

444

class DataSet[T] {

445

/**

446

* Creates union with another DataSet of the same type

447

* @param other DataSet to union with

448

* @return DataSet containing elements from both DataSets

449

*/

450

def union(other: DataSet[T]): DataSet[T]

451

}

452

453

class ExecutionEnvironment {

454

/**

455

* Creates union of multiple DataSets

456

* @param sets Sequence of DataSets to union

457

* @return Unified DataSet

458

*/

459

def union[T](sets: Seq[DataSet[T]]): DataSet[T]

460

}

461

```

462

463

**Usage Examples:**

464

465

```scala

466

val numbers1 = env.fromElements(1, 2, 3)

467

val numbers2 = env.fromElements(4, 5, 6)

468

val numbers3 = env.fromElements(7, 8, 9)

469

470

// Union two DataSets

471

val combined = numbers1.union(numbers2)

472

473

// Union multiple DataSets

474

val allNumbers = env.union(Seq(numbers1, numbers2, numbers3))

475

```

476

477

## Types

478

479

```scala { .api }

480

abstract class JoinFunction[IN1, IN2, OUT] extends Function {

481

def join(first: IN1, second: IN2): OUT

482

}

483

484

abstract class FlatJoinFunction[IN1, IN2, OUT] extends Function {

485

def join(first: IN1, second: IN2, out: Collector[OUT]): Unit

486

}

487

488

abstract class CrossFunction[IN1, IN2, OUT] extends Function {

489

def cross(val1: IN1, val2: IN2): OUT

490

}

491

492

abstract class CoGroupFunction[IN1, IN2, OUT] extends Function {

493

def coGroup(first: java.lang.Iterable[IN1], second: java.lang.Iterable[IN2], out: Collector[OUT]): Unit

494

}

495

496

sealed trait JoinHint

497

object JoinHint {

498

case object OPTIMIZER_CHOOSES extends JoinHint

499

case object BROADCAST_HASH_FIRST extends JoinHint

500

case object BROADCAST_HASH_SECOND extends JoinHint

501

case object REPARTITION_HASH_FIRST extends JoinHint

502

case object REPARTITION_HASH_SECOND extends JoinHint

503

case object REPARTITION_SORT_MERGE extends JoinHint

504

}

505

506

sealed trait JoinType

507

object JoinType {

508

case object INNER extends JoinType

509

case object LEFT_OUTER extends JoinType

510

case object RIGHT_OUTER extends JoinType

511

case object FULL_OUTER extends JoinType

512

}

513

514

class UnfinishedJoinOperation[L, R] {

515

// Fluent interface for join key specification

516

}

517

518

class UnfinishedOuterJoinOperation[L, R] {

519

// Fluent interface for outer join key specification

520

}

521

522

class UnfinishedCoGroupOperation[L, R] {

523

// Fluent interface for coGroup key specification

524

}

525

526

trait JoinFunctionAssigner[L, R] {

527

// Interface for applying join functions

528

}

529

530

class CrossDataSet[L, R] extends DataSet[(L, R)] {

531

// Represents result of cross operation

532

}

533

534

class CoGroupDataSet[L, R] {

535

// Represents configured coGroup operation ready for function application

536

}

537

```