or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

environment.mdfunctions.mdindex.mdjoins.mdsources-sinks.mdtransformations.md

joins.mddocs/

0

# Join and CoGroup Operations

1

2

Advanced operations for combining multiple datasets through joins, cross products, co-group operations, and unions. These operations enable complex data processing patterns that require coordination between multiple data sources.

3

4

## Capabilities

5

6

### Join Operations

7

8

#### Basic Join

9

10

Joins two DataSets on specified key fields, combining matching elements.

11

12

```python { .api }

13

def join(self, other_set):

14

"""

15

Joins with another DataSet on specified keys.

16

17

Parameters:

18

other_set (DataSet): DataSet to join with

19

20

Returns:

21

JoinOperatorWhere: Chainable join builder for .where().equal_to().using() pattern

22

"""

23

```

24

25

#### Join with Optimization Hints

26

27

Provides hints to the optimizer about the relative sizes of datasets.

28

29

```python { .api }

30

def join_with_huge(self, other_set):

31

"""

32

Join optimization hint for large second DataSet.

33

34

Parameters:

35

other_set (DataSet): Large DataSet to join with

36

37

Returns:

38

JoinOperatorWhere: Chainable join builder

39

"""

40

41

def join_with_tiny(self, other_set):

42

"""

43

Join optimization hint for small second DataSet.

44

45

Parameters:

46

other_set (DataSet): Small DataSet to join with

47

48

Returns:

49

JoinOperatorWhere: Chainable join builder

50

"""

51

```

52

53

### Cross Product Operations

54

55

#### Basic Cross Product

56

57

Creates Cartesian product of two DataSets.

58

59

```python { .api }

60

def cross(self, other_set):

61

"""

62

Creates cross product with another DataSet.

63

64

Parameters:

65

other_set (DataSet): DataSet to cross with

66

67

Returns:

68

CrossOperatorWhere: Cross operation builder

69

"""

70

```

71

72

#### Cross Product with Optimization Hints

73

74

Provides size hints for cross product optimization.

75

76

```python { .api }

77

def cross_with_huge(self, other_set):

78

"""

79

Cross product optimization hint for large second DataSet.

80

81

Parameters:

82

other_set (DataSet): Large DataSet

83

84

Returns:

85

CrossOperatorWhere: Cross operation builder

86

"""

87

88

def cross_with_tiny(self, other_set):

89

"""

90

Cross product optimization hint for small second DataSet.

91

92

Parameters:

93

other_set (DataSet): Small DataSet

94

95

Returns:

96

CrossOperatorWhere: Cross operation builder

97

"""

98

```

99

100

### CoGroup Operations

101

102

Groups two DataSets by matching keys and processes the groups together.

103

104

```python { .api }

105

def co_group(self, other_set):

106

"""

107

Groups two DataSets by keys and processes together.

108

109

Co-groups allow processing of groups from two DataSets that have the same key,

110

even when one or both groups are empty.

111

112

Parameters:

113

other_set (DataSet): DataSet to co-group with

114

115

Returns:

116

CoGroupOperatorWhere: CoGroup operation builder

117

"""

118

```

119

120

### Union Operations

121

122

Combines two DataSets with compatible schemas.

123

124

```python { .api }

125

def union(self, other_set):

126

"""

127

Creates union with another DataSet.

128

129

Both DataSets must have compatible element types.

130

131

Parameters:

132

other_set (DataSet): DataSet to union with

133

134

Returns:

135

OperatorSet: Union of both datasets

136

"""

137

```

138

139

## Join Operation Builders

140

141

### JoinOperatorWhere

142

143

Specifies join keys from the first DataSet.

144

145

```python { .api }

146

class JoinOperatorWhere:

147

def where(self, *fields):

148

"""

149

Specifies join key fields from first DataSet.

150

151

Parameters:

152

*fields (int): Field indices for join keys

153

154

Returns:

155

JoinOperatorTo: Next step in join building

156

"""

157

```

158

159

### JoinOperatorTo

160

161

Specifies join keys from the second DataSet.

162

163

```python { .api }

164

class JoinOperatorTo:

165

def equal_to(self, *fields):

166

"""

167

Specifies join key fields from second DataSet.

168

169

Parameters:

170

*fields (int): Field indices for join keys

171

172

Returns:

173

JoinOperator: Final join configuration

174

"""

175

```

176

177

### JoinOperator

178

179

Finalizes join operation with custom function.

180

181

```python { .api }

182

class JoinOperator:

183

def using(self, operator):

184

"""

185

Specifies JoinFunction to combine matching elements.

186

187

Parameters:

188

operator (JoinFunction): Function to combine joined elements

189

190

Returns:

191

OperatorSet: Joined dataset

192

"""

193

```

194

195

## Cross Operation Builders

196

197

### CrossOperatorWhere

198

199

Configures cross product operation.

200

201

```python { .api }

202

class CrossOperatorWhere:

203

def using(self, operator):

204

"""

205

Specifies CrossFunction to combine elements.

206

207

Parameters:

208

operator (CrossFunction): Function to combine cross product elements

209

210

Returns:

211

OperatorSet: Cross product result

212

"""

213

```

214

215

## CoGroup Operation Builders

216

217

### CoGroupOperatorWhere

218

219

Specifies keys for the first DataSet in co-group.

220

221

```python { .api }

222

class CoGroupOperatorWhere:

223

def where(self, *fields):

224

"""

225

Specifies key fields from first DataSet.

226

227

Parameters:

228

*fields (int): Field indices for grouping keys

229

230

Returns:

231

CoGroupOperatorTo: Next step in cogroup building

232

"""

233

```

234

235

### CoGroupOperatorTo

236

237

Specifies keys for the second DataSet in co-group.

238

239

```python { .api }

240

class CoGroupOperatorTo:

241

def equal_to(self, *fields):

242

"""

243

Specifies key fields from second DataSet.

244

245

Parameters:

246

*fields (int): Field indices for grouping keys

247

248

Returns:

249

CoGroupOperatorUsing: Final cogroup configuration

250

"""

251

```

252

253

### CoGroupOperatorUsing

254

255

Finalizes co-group operation with custom function.

256

257

```python { .api }

258

class CoGroupOperatorUsing:

259

def using(self, operator):

260

"""

261

Specifies CoGroupFunction to process matching groups.

262

263

Parameters:

264

operator (CoGroupFunction): Function to process cogroup results

265

266

Returns:

267

OperatorSet: CoGroup result dataset

268

"""

269

```

270

271

## Projection Support

272

273

### Projector

274

275

Handles field projection in join and cross operations.

276

277

```python { .api }

278

class Projector:

279

def project_first(self, *fields):

280

"""

281

Projects specified fields from first DataSet.

282

283

Parameters:

284

*fields (int): Field indices to project

285

286

Returns:

287

Projector: Self for method chaining

288

"""

289

290

def project_second(self, *fields):

291

"""

292

Projects specified fields from second DataSet.

293

294

Parameters:

295

*fields (int): Field indices to project

296

297

Returns:

298

Projector: Self for method chaining

299

"""

300

```

301

302

## Usage Examples

303

304

### Basic Inner Join

305

306

```python

307

from flink.plan.Environment import get_environment

308

from flink.functions.JoinFunction import JoinFunction

309

310

env = get_environment()

311

312

# Create two datasets

313

orders = env.from_elements(

314

(1, "customer1", 100),

315

(2, "customer2", 200),

316

(3, "customer1", 150)

317

)

318

319

customers = env.from_elements(

320

("customer1", "Alice"),

321

("customer2", "Bob")

322

)

323

324

# Define join function

325

class OrderCustomerJoin(JoinFunction):

326

def join(self, order, customer):

327

return (order[0], customer[1], order[2]) # (order_id, customer_name, amount)

328

329

# Perform join

330

result = orders.join(customers) \

331

.where(1) \

332

.equal_to(0) \

333

.using(OrderCustomerJoin())

334

335

result.output()

336

env.execute()

337

```

338

339

### Cross Product with Function

340

341

```python

342

from flink.functions.CrossFunction import CrossFunction

343

344

# Create datasets

345

colors = env.from_elements("red", "green", "blue")

346

sizes = env.from_elements("small", "medium", "large")

347

348

# Define cross function

349

class ProductCombiner(CrossFunction):

350

def cross(self, color, size):

351

return f"{size} {color} shirt"

352

353

# Create product combinations

354

products = colors.cross(sizes).using(ProductCombiner())

355

```

356

357

### CoGroup Operation

358

359

```python

360

from flink.functions.CoGroupFunction import CoGroupFunction

361

362

# Example: Left outer join using cogroup

363

class LeftOuterJoin(CoGroupFunction):

364

def co_group(self, iterator1, iterator2, collector):

365

left_items = list(iterator1)

366

right_items = list(iterator2)

367

368

if not right_items:

369

# No matching items in right dataset

370

for left_item in left_items:

371

collector.collect((left_item, None))

372

else:

373

# Join with all matching items

374

for left_item in left_items:

375

for right_item in right_items:

376

collector.collect((left_item, right_item))

377

378

result = dataset1.co_group(dataset2) \

379

.where(0) \

380

.equal_to(0) \

381

.using(LeftOuterJoin())

382

```

383

384

### Multiple Dataset Union

385

386

```python

387

# Union multiple datasets

388

data1 = env.from_elements(1, 2, 3)

389

data2 = env.from_elements(4, 5, 6)

390

data3 = env.from_elements(7, 8, 9)

391

392

# Chain unions

393

combined = data1.union(data2).union(data3)

394

```

395

396

### Performance Optimization with Hints

397

398

```python

399

# Optimize join when one dataset is much smaller

400

large_dataset = env.read_csv("large_file.csv", [str, int, float])

401

small_lookup = env.from_elements(("key1", "value1"), ("key2", "value2"))

402

403

# Use tiny hint for broadcast join optimization

404

result = large_dataset.join_with_tiny(small_lookup) \

405

.where(0) \

406

.equal_to(0) \

407

.using(JoinFunction())

408

409

# Use huge hint when second dataset is very large

410

result = small_dataset.join_with_huge(large_dataset) \

411

.where(0) \

412

.equal_to(0) \

413

.using(JoinFunction())

414

```

415

416

### Complex Multi-Step Joins

417

418

```python

419

# Complex pipeline with multiple joins

420

customers = env.read_csv("customers.csv", [str, str, str]) # id, name, city

421

orders = env.read_csv("orders.csv", [int, str, float]) # order_id, customer_id, amount

422

products = env.read_csv("products.csv", [int, str, float]) # product_id, name, price

423

424

# Join orders with customers

425

order_customer = orders.join(customers) \

426

.where(1) \

427

.equal_to(0) \

428

.using(lambda order, customer: (order[0], customer[1], order[2], customer[2]))

429

430

# Further processing

431

result = order_customer.group_by(3).sum(2) # Sum by city

432

```