0
# Utilities
1
2
Advanced utilities for sampling, indexing, data analysis, and partial function support. These utilities extend DataSet functionality with specialized operations.
3
4
## Capabilities
5
6
### DataSet Utilities
7
8
Enhanced utilities available through implicit conversion on DataSets.
9
10
```scala { .api }
11
implicit class DataSetUtils[T](dataSet: DataSet[T]) {
12
/**
13
* Adds consecutive indices to each element starting from 0
14
* @return DataSet of (index, element) tuples
15
*/
16
def zipWithIndex: DataSet[(Long, T)]
17
18
/**
19
* Adds unique identifiers to each element
20
* @return DataSet of (uniqueId, element) tuples
21
*/
22
def zipWithUniqueId: DataSet[(Long, T)]
23
24
/**
25
* Counts the number of elements in each partition
26
* @return DataSet of (partitionIndex, elementCount) tuples
27
*/
28
def countElementsPerPartition: DataSet[(Int, Long)]
29
30
/**
31
* Computes checksum and total count of the DataSet
32
* @return ChecksumHashCode with checksum and count
33
*/
34
def checksumHashCode(): ChecksumHashCode
35
}
36
```
37
38
**Usage Examples:**
39
40
```scala
41
import org.apache.flink.api.scala._
42
import org.apache.flink.api.scala.utils._
43
44
val env = ExecutionEnvironment.getExecutionEnvironment
45
val data = env.fromElements("apple", "banana", "cherry", "date")
46
47
// Add consecutive indices
48
val indexed = data.zipWithIndex
49
// Result: [(0, "apple"), (1, "banana"), (2, "cherry"), (3, "date")]
50
51
// Add unique IDs (useful in distributed environment)
52
val withIds = data.zipWithUniqueId
53
// Result: [(uniqueId1, "apple"), (uniqueId2, "banana"), ...]
54
55
// Count elements per partition
56
val partitionCounts = data.countElementsPerPartition
57
// Result: [(0, 2), (1, 2)] for 2 partitions with 2 elements each
58
59
// Get checksum and count
60
val checksum = data.checksumHashCode()
61
println(s"Checksum: ${checksum.getChecksum}, Count: ${checksum.getCount}")
62
```
63
64
### Sampling Operations
65
66
Statistical sampling methods for data analysis and subset creation.
67
68
```scala { .api }
69
implicit class DataSetUtils[T](dataSet: DataSet[T]) {
70
/**
71
* Samples elements randomly by fraction
72
* @param withReplacement Whether to sample with replacement
73
* @param fraction Fraction of elements to sample (0.0 to 1.0)
74
* @param seed Random seed for reproducibility
75
* @return DataSet with sampled elements
76
*/
77
def sample(withReplacement: Boolean, fraction: Double, seed: Long = Random.nextLong()): DataSet[T]
78
79
/**
80
* Samples a specific number of elements
81
* @param withReplacement Whether to sample with replacement
82
* @param numSamples Number of elements to sample
83
* @param seed Random seed for reproducibility
84
* @return DataSet with sampled elements
85
*/
86
def sampleWithSize(withReplacement: Boolean, numSamples: Int, seed: Long = Random.nextLong()): DataSet[T]
87
}
88
```
89
90
**Usage Examples:**
91
92
```scala
93
val largeDataset = env.generateSequence(1, 1000000)
94
95
// Sample 10% of elements without replacement
96
val sample10Percent = largeDataset.sample(withReplacement = false, fraction = 0.1)
97
98
// Sample exactly 1000 elements with replacement
99
val exactSample = largeDataset.sampleWithSize(withReplacement = true, numSamples = 1000)
100
101
// Reproducible sampling with seed
102
val reproducibleSample = largeDataset.sample(
103
withReplacement = false,
104
fraction = 0.05,
105
seed = 12345L
106
)
107
```
108
109
### Advanced Range Partitioning
110
111
Range partitioning with custom data distribution for optimized data placement.
112
113
```scala { .api }
114
implicit class DataSetUtils[T](dataSet: DataSet[T]) {
115
/**
116
* Partitions by range using custom data distribution
117
* @param distribution Custom data distribution
118
* @param fields Field positions for partitioning
119
* @return DataSet with custom range partitioning
120
*/
121
def partitionByRange(distribution: DataDistribution, fields: Int*): DataSet[T]
122
123
/**
124
* Partitions by range using custom data distribution and field names
125
* @param distribution Custom data distribution
126
* @param firstField First field name
127
* @param otherFields Additional field names
128
* @return DataSet with custom range partitioning
129
*/
130
def partitionByRange(
131
distribution: DataDistribution,
132
firstField: String,
133
otherFields: String*
134
): DataSet[T]
135
136
/**
137
* Partitions by range using custom data distribution and key selector
138
* @param distribution Custom data distribution
139
* @param fun Key selector function
140
* @return DataSet with custom range partitioning
141
*/
142
def partitionByRange[K: TypeInformation](
143
distribution: DataDistribution,
144
fun: T => K
145
): DataSet[T]
146
}
147
```
148
149
**Usage Examples:**
150
151
```scala
152
// Custom data distribution for range partitioning
153
case class SalesRecord(region: String, amount: Double, date: String)
154
155
val salesData = env.fromElements(
156
SalesRecord("North", 1000.0, "2023-01-01"),
157
SalesRecord("South", 1500.0, "2023-01-02"),
158
SalesRecord("East", 800.0, "2023-01-03")
159
)
160
161
// Create custom distribution (implementation depends on requirements)
162
val customDistribution = new DataDistribution {
163
// Implementation for determining partition boundaries
164
}
165
166
val partitionedSales = salesData.partitionByRange(customDistribution, _.amount)
167
```
168
169
### Partial Function Extensions
170
171
Enable pattern matching and partial function usage in transformations.
172
173
```scala { .api }
174
// Import for partial function support
175
import org.apache.flink.api.scala.extensions._
176
177
implicit class OnDataSet[T](dataSet: DataSet[T]) {
178
/**
179
* Maps using partial function with pattern matching
180
* @param fun Partial function for transformation
181
* @return DataSet with partial function mapping
182
*/
183
def mapWith[R: TypeInformation: ClassTag](fun: T => R): DataSet[R]
184
185
/**
186
* FlatMaps using partial function
187
* @param fun Partial function returning traversable
188
* @return DataSet with partial function flatMap
189
*/
190
def flatMapWith[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataSet[R]
191
192
/**
193
* Filters using partial function
194
* @param fun Partial function for filtering
195
* @return DataSet with partial function filtering
196
*/
197
def filterWith(fun: T => Boolean): DataSet[T]
198
199
/**
200
* Reduces using partial function
201
* @param fun Partial function for reduction
202
* @return DataSet with partial function reduction
203
*/
204
def reduceWith(fun: (T, T) => T): DataSet[T]
205
206
/**
207
* Reduces groups using partial function
208
* @param fun Partial function for group reduction
209
* @return DataSet with partial function group reduction
210
*/
211
def reduceGroupWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R]
212
213
/**
214
* Groups by using partial function
215
* @param fun Partial function for key extraction
216
* @return GroupedDataSet with partial function grouping
217
*/
218
def groupingBy[K: TypeInformation](fun: T => K): GroupedDataSet[T]
219
220
/**
221
* MapPartition using partial function on streams
222
* @param fun Partial function for partition transformation
223
* @return DataSet with partial function mapPartition
224
*/
225
def mapPartitionWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R]
226
}
227
```
228
229
**Usage Examples:**
230
231
```scala
232
import org.apache.flink.api.scala._
233
import org.apache.flink.api.scala.extensions._
234
235
sealed trait Event
236
case class UserEvent(userId: String, action: String) extends Event
237
case class SystemEvent(level: String, message: String) extends Event
238
case class ErrorEvent(error: String, stackTrace: String) extends Event
239
240
val events: DataSet[Event] = env.fromElements(
241
UserEvent("user1", "login"),
242
SystemEvent("INFO", "System started"),
243
ErrorEvent("NPE", "NullPointerException at line 42")
244
)
245
246
// Pattern matching with partial functions
247
val userActions = events.mapWith {
248
case UserEvent(userId, action) => s"$userId performed $action"
249
case _ => "Not a user event"
250
}
251
252
val criticalEvents = events.filterWith {
253
case ErrorEvent(_, _) => true
254
case SystemEvent("ERROR", _) => true
255
case _ => false
256
}
257
258
// Group by event type using pattern matching
259
val groupedByType = events.groupingBy {
260
case _: UserEvent => "user"
261
case _: SystemEvent => "system"
262
case _: ErrorEvent => "error"
263
}
264
```
265
266
### Grouped DataSet Extensions
267
268
Partial function support for grouped operations.
269
270
```scala { .api }
271
implicit class OnGroupedDataSet[T](groupedDataSet: GroupedDataSet[T]) {
272
/**
273
* Reduces groups using partial function
274
* @param fun Partial function for group reduction
275
* @return DataSet with partial function group reduction
276
*/
277
def reduceWith(fun: (T, T) => T): DataSet[T]
278
279
/**
280
* Reduces groups using partial function on streams
281
* @param fun Partial function processing group as stream
282
* @return DataSet with stream-based group reduction
283
*/
284
def reduceGroupWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R]
285
}
286
```
287
288
### Binary Operation Extensions
289
290
Partial function support for join, cross, and coGroup operations.
291
292
```scala { .api }
293
implicit class OnJoinFunctionAssigner[L, R](joiner: JoinFunctionAssigner[L, R]) {
294
/**
295
* Applies join function using partial functions
296
* @param fun Partial function for joining elements
297
* @return DataSet with partial function join
298
*/
299
def applyWith[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O]
300
}
301
302
implicit class OnCrossDataSet[L, R](crossDataSet: CrossDataSet[L, R]) {
303
/**
304
* Applies cross function using partial functions
305
* @param fun Partial function for crossing elements
306
* @return DataSet with partial function cross
307
*/
308
def applyWith[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O]
309
}
310
311
implicit class OnCoGroupDataSet[L, R](coGroupDataSet: CoGroupDataSet[L, R]) {
312
/**
313
* Applies coGroup function using partial functions on streams
314
* @param fun Partial function for coGrouping streams
315
* @return DataSet with partial function coGroup
316
*/
317
def applyWith[O: TypeInformation: ClassTag](fun: (Stream[L], Stream[R]) => O): DataSet[O]
318
}
319
```
320
321
**Usage Examples:**
322
323
```scala
324
case class Order(customerId: Int, productId: Int, amount: Double)
325
case class Customer(id: Int, name: String, segment: String)
326
327
val orders = env.fromElements(
328
Order(1, 101, 99.99),
329
Order(2, 102, 149.99)
330
)
331
332
val customers = env.fromElements(
333
Customer(1, "Alice", "Premium"),
334
Customer(2, "Bob", "Standard")
335
)
336
337
// Join with partial functions
338
val customerOrders = customers
339
.join(orders)
340
.where(_.id)
341
.equalTo(_.customerId)
342
.applyWith { (customer, order) =>
343
s"${customer.name} (${customer.segment}) ordered product ${order.productId} for $${order.amount}"
344
}
345
```
346
347
### Metrics Integration
348
349
Scala-friendly gauge metrics for monitoring.
350
351
```scala { .api }
352
class ScalaGauge[T](getValue: () => T) extends Gauge[T] {
353
/**
354
* Gets the current gauge value
355
* @return Current value
356
*/
357
override def getValue: T = getValue()
358
}
359
```
360
361
**Usage Examples:**
362
363
```scala
364
import org.apache.flink.api.scala.metrics.ScalaGauge
365
import org.apache.flink.api.common.functions.RichMapFunction
366
import org.apache.flink.metrics.MetricGroup
367
368
class MonitoredMapFunction extends RichMapFunction[String, String] {
369
@volatile private var processedCount = 0L
370
371
override def open(parameters: Configuration): Unit = {
372
val metricGroup: MetricGroup = getRuntimeContext.getMetricGroup
373
374
// Register Scala gauge
375
metricGroup.gauge("processedCount", new ScalaGauge(() => processedCount))
376
}
377
378
override def map(value: String): String = {
379
processedCount += 1
380
value.toUpperCase
381
}
382
}
383
```
384
385
### Package Utilities
386
387
Utility functions in the utils package.
388
389
```scala { .api }
390
package object utils {
391
/**
392
* Gets call location name for debugging
393
* @param depth Stack depth to examine
394
* @return Call location name
395
*/
396
def getCallLocationName(depth: Int = 3): String
397
}
398
```
399
400
## Types
401
402
```scala { .api }
403
class ChecksumHashCode {
404
/**
405
* Gets the computed checksum
406
* @return Checksum value
407
*/
408
def getChecksum: Long
409
410
/**
411
* Gets the element count
412
* @return Number of elements
413
*/
414
def getCount: Long
415
}
416
417
trait DataDistribution {
418
/**
419
* Gets bucket boundary for range partitioning
420
* @param bucketNum Bucket number
421
* @param totalNumBuckets Total number of buckets
422
* @return Bucket boundary object
423
*/
424
def getBucketBoundary(bucketNum: Int, totalNumBuckets: Int): AnyRef
425
}
426
427
// Wrapper classes for partial function support
428
class OnDataSet[T](dataSet: DataSet[T]) {
429
// Enhanced DataSet with partial function capabilities
430
}
431
432
class OnGroupedDataSet[T](groupedDataSet: GroupedDataSet[T]) {
433
// Enhanced GroupedDataSet with partial function capabilities
434
}
435
436
class OnJoinFunctionAssigner[L, R](joiner: JoinFunctionAssigner[L, R]) {
437
// Enhanced join operations with partial function capabilities
438
}
439
440
class OnCrossDataSet[L, R](crossDataSet: CrossDataSet[L, R]) {
441
// Enhanced cross operations with partial function capabilities
442
}
443
444
class OnCoGroupDataSet[L, R](coGroupDataSet: CoGroupDataSet[L, R]) {
445
// Enhanced coGroup operations with partial function capabilities
446
}
447
448
class ScalaGauge[T](getValue: () => T) extends Gauge[T] {
449
// Scala-friendly gauge metric implementation
450
}
451
452
// Stream type for partial function support
453
type Stream[T] = scala.collection.immutable.Stream[T]
454
```