0
# Utility Functions
1
2
The Flink Scala API provides additional utility functions through the `utils` package, offering extended functionality for DataSet operations including sampling, indexing, and partitioning.
3
4
## Importing Utilities
5
6
```scala
7
import org.apache.flink.api.scala.utils._
8
```
9
10
This import adds implicit conversions that extend DataSet with additional utility methods.
11
12
## Element Counting and Indexing
13
14
### Count Elements Per Partition
15
16
```scala { .api }
17
class DataSet[T] {
18
def countElementsPerPartition(): DataSet[(Int, Long)] // (partitionId, elementCount)
19
}
20
```
21
22
```scala
23
val data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
24
.setParallelism(3)
25
26
// Count elements in each partition
27
val partitionCounts = data.countElementsPerPartition()
28
partitionCounts.collect()
29
// Result: (0, 3), (1, 3), (2, 4) - partition ID and count per partition
30
```
31
32
### Zip with Index
33
34
```scala { .api }
35
class DataSet[T] {
36
def zipWithIndex(): DataSet[(Long, T)] // (index, element)
37
}
38
```
39
40
```scala
41
val words = env.fromElements("apple", "banana", "cherry", "date")
42
43
// Add sequential index to each element
44
val indexed = words.zipWithIndex()
45
indexed.collect()
46
// Result: (0, "apple"), (1, "banana"), (2, "cherry"), (3, "date")
47
48
// Use indexed data
49
val evenIndexed = indexed.filter(_._1 % 2 == 0).map(_._2)
50
// Result: "apple", "cherry"
51
```
52
53
### Zip with Unique ID
54
55
```scala { .api }
56
class DataSet[T] {
57
def zipWithUniqueId(): DataSet[(Long, T)] // (uniqueId, element)
58
}
59
```
60
61
```scala
62
val data = env.fromElements("A", "B", "C", "D", "E")
63
.setParallelism(2)
64
65
// Add unique ID to each element (not necessarily sequential)
66
val withUniqueId = data.zipWithUniqueId()
67
withUniqueId.collect()
68
// Result: (0, "A"), (1, "B"), (2, "C"), (3, "D"), (4, "E")
69
// Note: IDs are unique but may not be in exact sequential order across partitions
70
```
71
72
## Sampling Operations
73
74
### Sample with Fraction
75
76
```scala { .api }
77
class DataSet[T] {
78
def sample(
79
withReplacement: Boolean,
80
fraction: Double,
81
seed: Long = Utils.RNG.nextLong()
82
): DataSet[T]
83
}
84
```
85
86
```scala
87
val numbers = env.fromElements(1 to 1000: _*)
88
89
// Sample approximately 10% of elements without replacement
90
val sample10Percent = numbers.sample(
91
withReplacement = false,
92
fraction = 0.1,
93
seed = 12345L
94
)
95
96
// Sample with replacement (elements can appear multiple times)
97
val sampleWithReplacement = numbers.sample(
98
withReplacement = true,
99
fraction = 0.05,
100
seed = 67890L
101
)
102
103
println(s"Original size: ${numbers.count()}")
104
println(s"Sample size: ${sample10Percent.count()}")
105
```
106
107
### Sample with Fixed Size
108
109
```scala { .api }
110
class DataSet[T] {
111
def sampleWithSize(
112
withReplacement: Boolean,
113
numSamples: Int,
114
seed: Long = Utils.RNG.nextLong()
115
): DataSet[T]
116
}
117
```
118
119
```scala
120
val largeDataset = env.fromElements(1 to 10000: _*)
121
122
// Sample exactly 100 elements without replacement
123
val fixedSample = largeDataset.sampleWithSize(
124
withReplacement = false,
125
numSamples = 100,
126
seed = 42L
127
)
128
129
// Sample with replacement - can get duplicates
130
val sampleWithDuplicates = largeDataset.sampleWithSize(
131
withReplacement = true,
132
numSamples = 50,
133
seed = 123L
134
)
135
136
fixedSample.count() // Always returns 100 (or dataset size if smaller)
137
```
138
139
## Advanced Partitioning
140
141
### Partition by Range with Distribution
142
143
```scala { .api }
144
class DataSet[T] {
145
def partitionByRange(
146
distribution: DataDistribution,
147
fields: Int*
148
): DataSet[T]
149
150
def partitionByRange(
151
distribution: DataDistribution,
152
firstField: String,
153
otherFields: String*
154
): DataSet[T]
155
}
156
```
157
158
```scala
159
import org.apache.flink.api.common.distributions.DataDistribution
160
161
val salesData = env.fromElements(
162
("Q1", 1000), ("Q2", 1500), ("Q3", 1200), ("Q4", 1800),
163
("Q1", 1100), ("Q2", 1600), ("Q3", 1300), ("Q4", 1900)
164
)
165
166
// Create a custom data distribution
167
class QuarterDistribution extends DataDistribution {
168
override def getBucketBoundary(bucketNum: Int, totalNumBuckets: Int): Array[AnyRef] = {
169
val quarters = Array("Q1", "Q2", "Q3", "Q4")
170
Array(quarters(bucketNum * quarters.length / totalNumBuckets))
171
}
172
}
173
174
// Partition by range using the custom distribution
175
val rangePartitioned = salesData.partitionByRange(
176
new QuarterDistribution(),
177
0 // Partition by first field (quarter)
178
)
179
```
180
181
## Checksum and Validation
182
183
### Checksum Hash Code
184
185
```scala { .api }
186
class DataSet[T] {
187
def checksumHashCode(): ChecksumHashCode
188
}
189
```
190
191
```scala
192
val data1 = env.fromElements("apple", "banana", "cherry")
193
val data2 = env.fromElements("apple", "banana", "cherry")
194
val data3 = env.fromElements("apple", "cherry", "banana") // Different order
195
196
// Calculate checksums
197
val checksum1 = data1.checksumHashCode()
198
val checksum2 = data2.checksumHashCode()
199
val checksum3 = data3.checksumHashCode()
200
201
// Compare datasets for equality
202
println(s"data1 == data2: ${checksum1 == checksum2}") // true (same content)
203
println(s"data1 == data3: ${checksum1 == checksum3}") // false (different order)
204
```
205
206
## Practical Usage Examples
207
208
### Data Quality Analysis
209
210
```scala
211
case class Record(id: Int, value: String, timestamp: Long)
212
213
val records = env.fromElements(
214
Record(1, "A", 1000L),
215
Record(2, "B", 2000L),
216
Record(3, "C", 3000L),
217
Record(4, "D", 4000L),
218
Record(5, "E", 5000L)
219
)
220
221
// Count records per partition for load balancing analysis
222
val partitionAnalysis = records.countElementsPerPartition()
223
224
// Sample data for quality checks
225
val qualitySample = records.sample(withReplacement = false, fraction = 0.1, seed = 42L)
226
227
// Add index for tracking
228
val indexedRecords = records.zipWithIndex()
229
230
// Verify data integrity
231
val dataChecksum = records.checksumHashCode()
232
```
233
234
### Statistical Sampling
235
236
```scala
237
val populationData = env.fromElements((1 to 100000).map(i => (i, s"Person$i", Math.random() * 100)): _*)
238
239
// Stratified sampling - sample from different age groups
240
val youngSample = populationData
241
.filter(_._3 < 30) // Age < 30
242
.sample(withReplacement = false, fraction = 0.05)
243
244
val middleAgedSample = populationData
245
.filter(p => p._3 >= 30 && p._3 < 60) // Age 30-60
246
.sample(withReplacement = false, fraction = 0.03)
247
248
val seniorSample = populationData
249
.filter(_._3 >= 60) // Age >= 60
250
.sample(withReplacement = false, fraction = 0.02)
251
252
// Combine stratified samples
253
val stratifiedSample = youngSample.union(middleAgedSample).union(seniorSample)
254
```
255
256
### Data Validation Pipeline
257
258
```scala
259
val inputData = env.readTextFile("input.txt")
260
261
// Add unique IDs for tracking
262
val trackedData = inputData.zipWithUniqueId()
263
264
// Validate and process
265
val validatedData = trackedData
266
.filter { case (id, line) =>
267
line.nonEmpty && line.length > 5 // Basic validation
268
}
269
.map { case (id, line) =>
270
(id, line.toUpperCase, System.currentTimeMillis())
271
}
272
273
// Sample for manual inspection
274
val inspectionSample = validatedData.sampleWithSize(
275
withReplacement = false,
276
numSamples = 1000,
277
seed = System.currentTimeMillis()
278
)
279
280
// Calculate checksum for integrity verification
281
val integrityChecksum = validatedData.checksumHashCode()
282
283
// Count distribution across partitions
284
val distributionCheck = validatedData.countElementsPerPartition()
285
```
286
287
### Performance Monitoring
288
289
```scala
290
val largeBatch = env.fromElements((1 to 1000000): _*)
291
.setParallelism(8)
292
293
// Monitor partition distribution
294
val partitionStats = largeBatch
295
.countElementsPerPartition()
296
.collect()
297
298
partitionStats.foreach { case (partitionId, count) =>
299
println(s"Partition $partitionId: $count elements")
300
}
301
302
// Sample for performance testing
303
val performanceTestSample = largeBatch.sampleWithSize(
304
withReplacement = false,
305
numSamples = 10000,
306
seed = 42L
307
)
308
309
// Add tracking IDs
310
val trackedBatch = largeBatch.zipWithIndex()
311
val firstBatch = trackedBatch.filter(_._1 < 100000)
312
val secondBatch = trackedBatch.filter(_._1 >= 100000)
313
```
314
315
## Types
316
317
```scala { .api }
318
// Checksum type
319
class ChecksumHashCode {
320
def getChecksum: Long
321
def getHashCode: Long
322
override def equals(obj: Any): Boolean
323
override def hashCode(): Int
324
override def toString: String
325
}
326
327
// Data distribution interface
328
trait DataDistribution {
329
def getBucketBoundary(bucketNum: Int, totalNumBuckets: Int): Array[AnyRef]
330
}
331
332
// Utility constants
333
object Utils {
334
object RNG {
335
def nextLong(): Long
336
}
337
}
338
```
339
340
## Performance Considerations
341
342
### Sampling Performance
343
344
```scala
345
// For large datasets, sample early in the pipeline to reduce data volume
346
val massiveDataset = env.fromElements((1 to 10000000): _*)
347
348
// Good - sample early
349
val efficientPipeline = massiveDataset
350
.sample(withReplacement = false, fraction = 0.01) // Sample first
351
.filter(_ % 2 == 0) // Then apply expensive operations
352
.map(_ * _ )
353
354
// Less efficient - sample after expensive operations
355
val inefficientPipeline = massiveDataset
356
.filter(_ % 2 == 0) // Expensive operation on full dataset
357
.map(_ * _)
358
.sample(withReplacement = false, fraction = 0.01) // Sample last
359
```
360
361
### Index Assignment
362
363
```scala
364
// zipWithIndex requires global coordination - use sparingly on large datasets
365
val smallDataset = env.fromElements(1 to 1000: _*)
366
val indexedSmall = smallDataset.zipWithIndex() // OK for small data
367
368
val hugeDataset = env.fromElements(1 to 100000000: _*)
369
// Consider alternatives for huge datasets:
370
// 1. Use zipWithUniqueId if order doesn't matter
371
val uniqueIdVersion = hugeDataset.zipWithUniqueId()
372
373
// 2. Or add IDs during data generation if possible
374
val preIndexedData = env.fromElements((1 to 1000000).map(i => (i, s"value$i")): _*)
375
```
376
377
### Partition Analysis
378
379
```scala
380
// Use countElementsPerPartition for debugging partition skew
381
val skewedData = env.fromElements(
382
(1 to 1000).map(_ => "A") ++ // Most data in one key
383
(1 to 10).map(_ => "B") ++ // Little data in other keys
384
(1 to 5).map(_ => "C"): _*
385
)
386
387
val partitionAnalysis = skewedData
388
.groupBy(identity)
389
.reduceGroup(_.size)
390
.countElementsPerPartition()
391
392
// Use this information to rebalance if needed
393
val rebalanced = skewedData.rebalance()
394
```