or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

dataset-operations.mdexecution-environment.mdextensions.mdgrouped-dataset-operations.mdindex.mdjoin-operations.mdtype-system.mdutility-functions.md

extensions.mddocs/

0

# Extensions Package - Partial Functions Support

1

2

The Flink Scala API extensions package provides partial function support for DataSet operations, enabling more idiomatic Scala pattern matching and case class decomposition directly in transformation functions.

3

4

## Overview

5

6

The extensions package provides methods with the `-With` suffix that accept partial functions, allowing you to use pattern matching syntax directly in transformations without wrapping in explicit functions.

7

8

## Importing Extensions

9

10

```scala

11

import org.apache.flink.api.scala.extensions._

12

```

13

14

This import adds implicit conversions that extend DataSet and related classes with partial function methods.

15

16

## DataSet Extensions

17

18

### OnDataSet Extension Methods

19

20

```scala { .api }

21

class OnDataSet[T] {

22

def mapWith[R: TypeInformation: ClassTag](fun: T => R): DataSet[R]

23

def mapPartitionWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R]

24

def flatMapWith[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataSet[R]

25

def filterWith(fun: T => Boolean): DataSet[T]

26

def reduceWith(fun: (T, T) => T): DataSet[T]

27

def reduceGroupWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R]

28

def groupingBy[K: TypeInformation](fun: T => K): GroupedDataSet[T]

29

def sortingBy[K: TypeInformation](fun: T => K, order: Order): DataSet[T]

30

}

31

```

32

33

### Usage Examples

34

35

```scala

36

import org.apache.flink.api.scala._

37

import org.apache.flink.api.scala.extensions._

38

39

case class Person(name: String, age: Int, city: String)

40

case class Sale(product: String, amount: Double, region: String)

41

42

val people = env.fromElements(

43

Person("Alice", 25, "NYC"),

44

Person("Bob", 30, "LA"),

45

Person("Charlie", 35, "NYC")

46

)

47

48

val sales = env.fromElements(

49

Sale("ProductA", 100.0, "US"),

50

Sale("ProductB", 200.0, "EU"),

51

Sale("ProductA", 150.0, "US")

52

)

53

54

// Pattern matching with mapWith

55

val names = people.mapWith {

56

case Person(name, _, _) => name.toUpperCase

57

}

58

59

// Filter with pattern matching

60

val youngPeople = people.filterWith {

61

case Person(_, age, _) => age < 30

62

}

63

64

// FlatMap with pattern matching

65

val cityLetters = people.flatMapWith {

66

case Person(_, _, city) => city.toCharArray

67

}

68

69

// Group by with pattern matching

70

val groupedByCity = people.groupingBy {

71

case Person(_, _, city) => city

72

}

73

74

// Complex pattern matching example

75

val salesSummary = sales

76

.groupingBy(_.product)

77

.reduceGroupWith { salesStream =>

78

val salesList = salesStream.toList

79

val product = salesList.head.product

80

val totalAmount = salesList.map(_.amount).sum

81

val regions = salesList.map(_.region).distinct

82

(product, totalAmount, regions.mkString(","))

83

}

84

```

85

86

## GroupedDataSet Extensions

87

88

### OnGroupedDataSet Extension Methods

89

90

```scala { .api }

91

class OnGroupedDataSet[T] {

92

def reduceWith(fun: (T, T) => T): DataSet[T]

93

def reduceGroupWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R]

94

def sortingBy[K: TypeInformation](fun: T => K, order: Order): GroupedDataSet[T]

95

}

96

```

97

98

### Usage Examples

99

100

```scala

101

val sales = env.fromElements(

102

("ProductA", 100), ("ProductB", 200), ("ProductA", 150), ("ProductB", 50)

103

)

104

105

val groupedSales = sales.groupingBy(_._1)

106

107

// Reduce with pattern matching

108

val productTotals = groupedSales.reduceWith {

109

case ((product, amount1), (_, amount2)) => (product, amount1 + amount2)

110

}

111

112

// Reduce group with pattern matching and complex logic

113

val productStats = groupedSales.reduceGroupWith { salesStream =>

114

val salesList = salesStream.toList

115

val product = salesList.head._1

116

val amounts = salesList.map(_._2)

117

val total = amounts.sum

118

val average = total / amounts.length

119

val max = amounts.max

120

(product, total, average, max)

121

}

122

```

123

124

## Cross Product Extensions

125

126

### OnCrossDataSet Extension Methods

127

128

```scala { .api }

129

class OnCrossDataSet[L, R] {

130

def applyWith[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O]

131

}

132

```

133

134

### Usage Examples

135

136

```scala

137

val left = env.fromElements(("A", 1), ("B", 2))

138

val right = env.fromElements(("X", 10), ("Y", 20))

139

140

val crossed = left

141

.cross(right)

142

.applyWith {

143

case ((leftKey, leftVal), (rightKey, rightVal)) =>

144

s"$leftKey-$rightKey: ${leftVal * rightVal}"

145

}

146

```

147

148

## CoGroup Extensions

149

150

### OnCoGroupDataSet Extension Methods

151

152

```scala { .api }

153

class OnCoGroupDataSet[L, R] {

154

def applyWith[O: TypeInformation: ClassTag](

155

fun: (Stream[L], Stream[R]) => O

156

): DataSet[O]

157

}

158

```

159

160

### Usage Examples

161

162

```scala

163

val left = env.fromElements(("A", 1), ("B", 2), ("A", 3))

164

val right = env.fromElements(("A", 10), ("C", 30), ("A", 20))

165

166

val coGrouped = left

167

.coGroup(right)

168

.where(_._1)

169

.equalTo(_._1)

170

.applyWith {

171

case (leftStream, rightStream) =>

172

val leftList = leftStream.toList

173

val rightList = rightStream.toList

174

val key = if (leftList.nonEmpty) leftList.head._1 else rightList.head._1

175

val leftSum = leftList.map(_._2).sum

176

val rightSum = rightList.map(_._2).sum

177

(key, leftSum, rightSum)

178

}

179

```

180

181

## Join Extensions

182

183

### OnJoinFunctionAssigner Extension Methods

184

185

```scala { .api }

186

class OnJoinFunctionAssigner[L, R] {

187

def applyWith[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O]

188

}

189

```

190

191

### Usage Examples

192

193

```scala

194

val employees = env.fromElements(

195

("Alice", "Engineering"), ("Bob", "Sales"), ("Charlie", "Engineering")

196

)

197

val salaries = env.fromElements(

198

("Alice", 75000), ("Bob", 65000), ("Charlie", 80000)

199

)

200

201

val employeeData = employees

202

.join(salaries)

203

.where(_._1)

204

.equalTo(_._1)

205

.applyWith {

206

case ((name, dept), (_, salary)) => (name, dept, salary)

207

}

208

```

209

210

## Advanced Pattern Matching Examples

211

212

### Working with Complex Case Classes

213

214

```scala

215

case class Order(id: Int, customerId: String, items: List[String], total: Double)

216

case class Customer(id: String, name: String, region: String)

217

218

val orders = env.fromElements(

219

Order(1, "C001", List("A", "B"), 100.0),

220

Order(2, "C002", List("C", "D", "E"), 250.0),

221

Order(3, "C001", List("F"), 50.0)

222

)

223

224

val customers = env.fromElements(

225

Customer("C001", "Alice Corp", "US"),

226

Customer("C002", "Bob Industries", "EU")

227

)

228

229

// Extract order details with pattern matching

230

val orderSummary = orders.mapWith {

231

case Order(id, customerId, items, total) =>

232

(id, customerId, items.length, total)

233

}

234

235

// Filter orders by complex criteria

236

val largeOrders = orders.filterWith {

237

case Order(_, _, items, total) => items.length > 2 && total > 200

238

}

239

240

// Join with pattern matching

241

val customerOrders = customers

242

.join(orders)

243

.where(_.id)

244

.equalTo(_.customerId)

245

.applyWith {

246

case (Customer(_, name, region), Order(orderId, _, items, total)) =>

247

(orderId, name, region, items.length, total)

248

}

249

```

250

251

### Working with Tuples and Collections

252

253

```scala

254

val tupleData = env.fromElements(

255

("A", List(1, 2, 3), Map("x" -> 10)),

256

("B", List(4, 5), Map("y" -> 20, "z" -> 30)),

257

("C", List(6), Map("x" -> 40))

258

)

259

260

// Pattern match on complex tuple structure

261

val extracted = tupleData.flatMapWith {

262

case (key, numbers, mapping) =>

263

for {

264

num <- numbers

265

(mapKey, mapValue) <- mapping

266

} yield (key, mapKey, num * mapValue)

267

}

268

269

// Filter with nested pattern matching

270

val filtered = tupleData.filterWith {

271

case (_, numbers, mapping) =>

272

numbers.sum > 5 && mapping.values.max > 15

273

}

274

```

275

276

## Key Advantages

277

278

1. **Pattern Matching Support**: Use Scala's powerful pattern matching directly in transformations

279

2. **Case Class Decomposition**: Extract fields from case classes without explicit getters

280

3. **Tuple Deconstruction**: Break down tuples in a readable way

281

4. **Collection Pattern Matching**: Match on collection structures and contents

282

5. **Cleaner Code**: More concise and idiomatic Scala code

283

284

## Important Notes

285

286

- Extensions require explicit import: `import org.apache.flink.api.scala.extensions._`

287

- All methods have `-With` suffix to avoid conflicts with regular DataSet methods

288

- Full type information is still required for transformations

289

- Performance is equivalent to regular DataSet operations - extensions are purely syntactic

290

- Particularly useful for complex data structures and ETL operations

291

292

The extensions package makes Flink Scala API more idiomatic and easier to work with for Scala developers, especially when dealing with complex data structures and requiring pattern matching capabilities.