0
# Extensions Package - Partial Functions Support
1
2
The Flink Scala API extensions package provides partial function support for DataSet operations, enabling more idiomatic Scala pattern matching and case class decomposition directly in transformation functions.
3
4
## Overview
5
6
The extensions package provides methods with the `-With` suffix that accept partial functions, allowing you to use pattern matching syntax directly in transformations without wrapping in explicit functions.
7
8
## Importing Extensions
9
10
```scala
11
import org.apache.flink.api.scala.extensions._
12
```
13
14
This import adds implicit conversions that extend DataSet and related classes with partial function methods.
15
16
## DataSet Extensions
17
18
### OnDataSet Extension Methods
19
20
```scala { .api }
21
class OnDataSet[T] {
22
def mapWith[R: TypeInformation: ClassTag](fun: T => R): DataSet[R]
23
def mapPartitionWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R]
24
def flatMapWith[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataSet[R]
25
def filterWith(fun: T => Boolean): DataSet[T]
26
def reduceWith(fun: (T, T) => T): DataSet[T]
27
def reduceGroupWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R]
28
def groupingBy[K: TypeInformation](fun: T => K): GroupedDataSet[T]
29
def sortingBy[K: TypeInformation](fun: T => K, order: Order): DataSet[T]
30
}
31
```
32
33
### Usage Examples
34
35
```scala
36
import org.apache.flink.api.scala._
37
import org.apache.flink.api.scala.extensions._
38
39
case class Person(name: String, age: Int, city: String)
40
case class Sale(product: String, amount: Double, region: String)
41
42
val people = env.fromElements(
43
Person("Alice", 25, "NYC"),
44
Person("Bob", 30, "LA"),
45
Person("Charlie", 35, "NYC")
46
)
47
48
val sales = env.fromElements(
49
Sale("ProductA", 100.0, "US"),
50
Sale("ProductB", 200.0, "EU"),
51
Sale("ProductA", 150.0, "US")
52
)
53
54
// Pattern matching with mapWith
55
val names = people.mapWith {
56
case Person(name, _, _) => name.toUpperCase
57
}
58
59
// Filter with pattern matching
60
val youngPeople = people.filterWith {
61
case Person(_, age, _) => age < 30
62
}
63
64
// FlatMap with pattern matching
65
val cityLetters = people.flatMapWith {
66
case Person(_, _, city) => city.toCharArray
67
}
68
69
// Group by with pattern matching
70
val groupedByCity = people.groupingBy {
71
case Person(_, _, city) => city
72
}
73
74
// Complex pattern matching example
75
val salesSummary = sales
76
.groupingBy(_.product)
77
.reduceGroupWith { salesStream =>
78
val salesList = salesStream.toList
79
val product = salesList.head.product
80
val totalAmount = salesList.map(_.amount).sum
81
val regions = salesList.map(_.region).distinct
82
(product, totalAmount, regions.mkString(","))
83
}
84
```
85
86
## GroupedDataSet Extensions
87
88
### OnGroupedDataSet Extension Methods
89
90
```scala { .api }
91
class OnGroupedDataSet[T] {
92
def reduceWith(fun: (T, T) => T): DataSet[T]
93
def reduceGroupWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R]
94
def sortingBy[K: TypeInformation](fun: T => K, order: Order): GroupedDataSet[T]
95
}
96
```
97
98
### Usage Examples
99
100
```scala
101
val sales = env.fromElements(
102
("ProductA", 100), ("ProductB", 200), ("ProductA", 150), ("ProductB", 50)
103
)
104
105
val groupedSales = sales.groupingBy(_._1)
106
107
// Reduce with pattern matching
108
val productTotals = groupedSales.reduceWith {
109
case ((product, amount1), (_, amount2)) => (product, amount1 + amount2)
110
}
111
112
// Reduce group with pattern matching and complex logic
113
val productStats = groupedSales.reduceGroupWith { salesStream =>
114
val salesList = salesStream.toList
115
val product = salesList.head._1
116
val amounts = salesList.map(_._2)
117
val total = amounts.sum
118
val average = total / amounts.length
119
val max = amounts.max
120
(product, total, average, max)
121
}
122
```
123
124
## Cross Product Extensions
125
126
### OnCrossDataSet Extension Methods
127
128
```scala { .api }
129
class OnCrossDataSet[L, R] {
130
def applyWith[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O]
131
}
132
```
133
134
### Usage Examples
135
136
```scala
137
val left = env.fromElements(("A", 1), ("B", 2))
138
val right = env.fromElements(("X", 10), ("Y", 20))
139
140
val crossed = left
141
.cross(right)
142
.applyWith {
143
case ((leftKey, leftVal), (rightKey, rightVal)) =>
144
s"$leftKey-$rightKey: ${leftVal * rightVal}"
145
}
146
```
147
148
## CoGroup Extensions
149
150
### OnCoGroupDataSet Extension Methods
151
152
```scala { .api }
153
class OnCoGroupDataSet[L, R] {
154
def applyWith[O: TypeInformation: ClassTag](
155
fun: (Stream[L], Stream[R]) => O
156
): DataSet[O]
157
}
158
```
159
160
### Usage Examples
161
162
```scala
163
val left = env.fromElements(("A", 1), ("B", 2), ("A", 3))
164
val right = env.fromElements(("A", 10), ("C", 30), ("A", 20))
165
166
val coGrouped = left
167
.coGroup(right)
168
.where(_._1)
169
.equalTo(_._1)
170
.applyWith {
171
case (leftStream, rightStream) =>
172
val leftList = leftStream.toList
173
val rightList = rightStream.toList
174
val key = if (leftList.nonEmpty) leftList.head._1 else rightList.head._1
175
val leftSum = leftList.map(_._2).sum
176
val rightSum = rightList.map(_._2).sum
177
(key, leftSum, rightSum)
178
}
179
```
180
181
## Join Extensions
182
183
### OnJoinFunctionAssigner Extension Methods
184
185
```scala { .api }
186
class OnJoinFunctionAssigner[L, R] {
187
def applyWith[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O]
188
}
189
```
190
191
### Usage Examples
192
193
```scala
194
val employees = env.fromElements(
195
("Alice", "Engineering"), ("Bob", "Sales"), ("Charlie", "Engineering")
196
)
197
val salaries = env.fromElements(
198
("Alice", 75000), ("Bob", 65000), ("Charlie", 80000)
199
)
200
201
val employeeData = employees
202
.join(salaries)
203
.where(_._1)
204
.equalTo(_._1)
205
.applyWith {
206
case ((name, dept), (_, salary)) => (name, dept, salary)
207
}
208
```
209
210
## Advanced Pattern Matching Examples
211
212
### Working with Complex Case Classes
213
214
```scala
215
case class Order(id: Int, customerId: String, items: List[String], total: Double)
216
case class Customer(id: String, name: String, region: String)
217
218
val orders = env.fromElements(
219
Order(1, "C001", List("A", "B"), 100.0),
220
Order(2, "C002", List("C", "D", "E"), 250.0),
221
Order(3, "C001", List("F"), 50.0)
222
)
223
224
val customers = env.fromElements(
225
Customer("C001", "Alice Corp", "US"),
226
Customer("C002", "Bob Industries", "EU")
227
)
228
229
// Extract order details with pattern matching
230
val orderSummary = orders.mapWith {
231
case Order(id, customerId, items, total) =>
232
(id, customerId, items.length, total)
233
}
234
235
// Filter orders by complex criteria
236
val largeOrders = orders.filterWith {
237
case Order(_, _, items, total) => items.length > 2 && total > 200
238
}
239
240
// Join with pattern matching
241
val customerOrders = customers
242
.join(orders)
243
.where(_.id)
244
.equalTo(_.customerId)
245
.applyWith {
246
case (Customer(_, name, region), Order(orderId, _, items, total)) =>
247
(orderId, name, region, items.length, total)
248
}
249
```
250
251
### Working with Tuples and Collections
252
253
```scala
254
val tupleData = env.fromElements(
255
("A", List(1, 2, 3), Map("x" -> 10)),
256
("B", List(4, 5), Map("y" -> 20, "z" -> 30)),
257
("C", List(6), Map("x" -> 40))
258
)
259
260
// Pattern match on complex tuple structure
261
val extracted = tupleData.flatMapWith {
262
case (key, numbers, mapping) =>
263
for {
264
num <- numbers
265
(mapKey, mapValue) <- mapping
266
} yield (key, mapKey, num * mapValue)
267
}
268
269
// Filter with nested pattern matching
270
val filtered = tupleData.filterWith {
271
case (_, numbers, mapping) =>
272
numbers.sum > 5 && mapping.values.max > 15
273
}
274
```
275
276
## Key Advantages
277
278
1. **Pattern Matching Support**: Use Scala's powerful pattern matching directly in transformations
279
2. **Case Class Decomposition**: Extract fields from case classes without explicit getters
280
3. **Tuple Deconstruction**: Break down tuples in a readable way
281
4. **Collection Pattern Matching**: Match on collection structures and contents
282
5. **Cleaner Code**: More concise and idiomatic Scala code
283
284
## Important Notes
285
286
- Extensions require explicit import: `import org.apache.flink.api.scala.extensions._`
287
- All methods have `-With` suffix to avoid conflicts with regular DataSet methods
288
- Full type information is still required for transformations
289
- Performance is equivalent to regular DataSet operations - extensions are purely syntactic
290
- Particularly useful for complex data structures and ETL operations
291
292
The extensions package makes Flink Scala API more idiomatic and easier to work with for Scala developers, especially when dealing with complex data structures and requiring pattern matching capabilities.