or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

dataset-operations.mdexecution-environment.mdextensions.mdgrouped-dataset-operations.mdindex.mdjoin-operations.mdtype-system.mdutility-functions.md

utility-functions.mddocs/

0

# Utility Functions

1

2

The Flink Scala API provides additional utility functions through the `utils` package, offering extended functionality for DataSet operations including sampling, indexing, and partitioning.

3

4

## Importing Utilities

5

6

```scala

7

import org.apache.flink.api.scala.utils._

8

```

9

10

This import adds implicit conversions that extend DataSet with additional utility methods.

11

12

## Element Counting and Indexing

13

14

### Count Elements Per Partition

15

16

```scala { .api }

17

class DataSet[T] {

18

def countElementsPerPartition(): DataSet[(Int, Long)] // (partitionId, elementCount)

19

}

20

```

21

22

```scala

23

val data = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

24

.setParallelism(3)

25

26

// Count elements in each partition

27

val partitionCounts = data.countElementsPerPartition()

28

partitionCounts.collect()

29

// Result: (0, 3), (1, 3), (2, 4) - partition ID and count per partition

30

```

31

32

### Zip with Index

33

34

```scala { .api }

35

class DataSet[T] {

36

def zipWithIndex(): DataSet[(Long, T)] // (index, element)

37

}

38

```

39

40

```scala

41

val words = env.fromElements("apple", "banana", "cherry", "date")

42

43

// Add sequential index to each element

44

val indexed = words.zipWithIndex()

45

indexed.collect()

46

// Result: (0, "apple"), (1, "banana"), (2, "cherry"), (3, "date")

47

48

// Use indexed data

49

val evenIndexed = indexed.filter(_._1 % 2 == 0).map(_._2)

50

// Result: "apple", "cherry"

51

```

52

53

### Zip with Unique ID

54

55

```scala { .api }

56

class DataSet[T] {

57

def zipWithUniqueId(): DataSet[(Long, T)] // (uniqueId, element)

58

}

59

```

60

61

```scala

62

val data = env.fromElements("A", "B", "C", "D", "E")

63

.setParallelism(2)

64

65

// Add unique ID to each element (not necessarily sequential)

66

val withUniqueId = data.zipWithUniqueId()

67

withUniqueId.collect()

68

// Result: (0, "A"), (1, "B"), (2, "C"), (3, "D"), (4, "E")

69

// Note: IDs are unique but may not be in exact sequential order across partitions

70

```

71

72

## Sampling Operations

73

74

### Sample with Fraction

75

76

```scala { .api }

77

class DataSet[T] {

78

def sample(

79

withReplacement: Boolean,

80

fraction: Double,

81

seed: Long = Utils.RNG.nextLong()

82

): DataSet[T]

83

}

84

```

85

86

```scala

87

val numbers = env.fromElements(1 to 1000: _*)

88

89

// Sample approximately 10% of elements without replacement

90

val sample10Percent = numbers.sample(

91

withReplacement = false,

92

fraction = 0.1,

93

seed = 12345L

94

)

95

96

// Sample with replacement (elements can appear multiple times)

97

val sampleWithReplacement = numbers.sample(

98

withReplacement = true,

99

fraction = 0.05,

100

seed = 67890L

101

)

102

103

println(s"Original size: ${numbers.count()}")

104

println(s"Sample size: ${sample10Percent.count()}")

105

```

106

107

### Sample with Fixed Size

108

109

```scala { .api }

110

class DataSet[T] {

111

def sampleWithSize(

112

withReplacement: Boolean,

113

numSamples: Int,

114

seed: Long = Utils.RNG.nextLong()

115

): DataSet[T]

116

}

117

```

118

119

```scala

120

val largeDataset = env.fromElements(1 to 10000: _*)

121

122

// Sample exactly 100 elements without replacement

123

val fixedSample = largeDataset.sampleWithSize(

124

withReplacement = false,

125

numSamples = 100,

126

seed = 42L

127

)

128

129

// Sample with replacement - can get duplicates

130

val sampleWithDuplicates = largeDataset.sampleWithSize(

131

withReplacement = true,

132

numSamples = 50,

133

seed = 123L

134

)

135

136

fixedSample.count() // Always returns 100 (or dataset size if smaller)

137

```

138

139

## Advanced Partitioning

140

141

### Partition by Range with Distribution

142

143

```scala { .api }

144

class DataSet[T] {

145

def partitionByRange(

146

distribution: DataDistribution,

147

fields: Int*

148

): DataSet[T]

149

150

def partitionByRange(

151

distribution: DataDistribution,

152

firstField: String,

153

otherFields: String*

154

): DataSet[T]

155

}

156

```

157

158

```scala

159

import org.apache.flink.api.common.distributions.DataDistribution

160

161

val salesData = env.fromElements(

162

("Q1", 1000), ("Q2", 1500), ("Q3", 1200), ("Q4", 1800),

163

("Q1", 1100), ("Q2", 1600), ("Q3", 1300), ("Q4", 1900)

164

)

165

166

// Create a custom data distribution

167

class QuarterDistribution extends DataDistribution {

168

override def getBucketBoundary(bucketNum: Int, totalNumBuckets: Int): Array[AnyRef] = {

169

val quarters = Array("Q1", "Q2", "Q3", "Q4")

170

Array(quarters(bucketNum * quarters.length / totalNumBuckets))

171

}

172

}

173

174

// Partition by range using the custom distribution

175

val rangePartitioned = salesData.partitionByRange(

176

new QuarterDistribution(),

177

0 // Partition by first field (quarter)

178

)

179

```

180

181

## Checksum and Validation

182

183

### Checksum Hash Code

184

185

```scala { .api }

186

class DataSet[T] {

187

def checksumHashCode(): ChecksumHashCode

188

}

189

```

190

191

```scala

192

val data1 = env.fromElements("apple", "banana", "cherry")

193

val data2 = env.fromElements("apple", "banana", "cherry")

194

val data3 = env.fromElements("apple", "cherry", "banana") // Different order

195

196

// Calculate checksums

197

val checksum1 = data1.checksumHashCode()

198

val checksum2 = data2.checksumHashCode()

199

val checksum3 = data3.checksumHashCode()

200

201

// Compare datasets for equality

202

println(s"data1 == data2: ${checksum1 == checksum2}") // true (same content)

203

println(s"data1 == data3: ${checksum1 == checksum3}") // false (different order)

204

```

205

206

## Practical Usage Examples

207

208

### Data Quality Analysis

209

210

```scala

211

case class Record(id: Int, value: String, timestamp: Long)

212

213

val records = env.fromElements(

214

Record(1, "A", 1000L),

215

Record(2, "B", 2000L),

216

Record(3, "C", 3000L),

217

Record(4, "D", 4000L),

218

Record(5, "E", 5000L)

219

)

220

221

// Count records per partition for load balancing analysis

222

val partitionAnalysis = records.countElementsPerPartition()

223

224

// Sample data for quality checks

225

val qualitySample = records.sample(withReplacement = false, fraction = 0.1, seed = 42L)

226

227

// Add index for tracking

228

val indexedRecords = records.zipWithIndex()

229

230

// Verify data integrity

231

val dataChecksum = records.checksumHashCode()

232

```

233

234

### Statistical Sampling

235

236

```scala

237

val populationData = env.fromElements((1 to 100000).map(i => (i, s"Person$i", Math.random() * 100)): _*)

238

239

// Stratified sampling - sample from different age groups

240

val youngSample = populationData

241

.filter(_._3 < 30) // Age < 30

242

.sample(withReplacement = false, fraction = 0.05)

243

244

val middleAgedSample = populationData

245

.filter(p => p._3 >= 30 && p._3 < 60) // Age 30-60

246

.sample(withReplacement = false, fraction = 0.03)

247

248

val seniorSample = populationData

249

.filter(_._3 >= 60) // Age >= 60

250

.sample(withReplacement = false, fraction = 0.02)

251

252

// Combine stratified samples

253

val stratifiedSample = youngSample.union(middleAgedSample).union(seniorSample)

254

```

255

256

### Data Validation Pipeline

257

258

```scala

259

val inputData = env.readTextFile("input.txt")

260

261

// Add unique IDs for tracking

262

val trackedData = inputData.zipWithUniqueId()

263

264

// Validate and process

265

val validatedData = trackedData

266

.filter { case (id, line) =>

267

line.nonEmpty && line.length > 5 // Basic validation

268

}

269

.map { case (id, line) =>

270

(id, line.toUpperCase, System.currentTimeMillis())

271

}

272

273

// Sample for manual inspection

274

val inspectionSample = validatedData.sampleWithSize(

275

withReplacement = false,

276

numSamples = 1000,

277

seed = System.currentTimeMillis()

278

)

279

280

// Calculate checksum for integrity verification

281

val integrityChecksum = validatedData.checksumHashCode()

282

283

// Count distribution across partitions

284

val distributionCheck = validatedData.countElementsPerPartition()

285

```

286

287

### Performance Monitoring

288

289

```scala

290

val largeBatch = env.fromElements((1 to 1000000): _*)

291

.setParallelism(8)

292

293

// Monitor partition distribution

294

val partitionStats = largeBatch

295

.countElementsPerPartition()

296

.collect()

297

298

partitionStats.foreach { case (partitionId, count) =>

299

println(s"Partition $partitionId: $count elements")

300

}

301

302

// Sample for performance testing

303

val performanceTestSample = largeBatch.sampleWithSize(

304

withReplacement = false,

305

numSamples = 10000,

306

seed = 42L

307

)

308

309

// Add tracking IDs

310

val trackedBatch = largeBatch.zipWithIndex()

311

val firstBatch = trackedBatch.filter(_._1 < 100000)

312

val secondBatch = trackedBatch.filter(_._1 >= 100000)

313

```

314

315

## Types

316

317

```scala { .api }

318

// Checksum type

319

class ChecksumHashCode {

320

def getChecksum: Long

321

def getHashCode: Long

322

override def equals(obj: Any): Boolean

323

override def hashCode(): Int

324

override def toString: String

325

}

326

327

// Data distribution interface

328

trait DataDistribution {

329

def getBucketBoundary(bucketNum: Int, totalNumBuckets: Int): Array[AnyRef]

330

}

331

332

// Utility constants

333

object Utils {

334

object RNG {

335

def nextLong(): Long

336

}

337

}

338

```

339

340

## Performance Considerations

341

342

### Sampling Performance

343

344

```scala

345

// For large datasets, sample early in the pipeline to reduce data volume

346

val massiveDataset = env.fromElements((1 to 10000000): _*)

347

348

// Good - sample early

349

val efficientPipeline = massiveDataset

350

.sample(withReplacement = false, fraction = 0.01) // Sample first

351

.filter(_ % 2 == 0) // Then apply expensive operations

352

.map(_ * _ )

353

354

// Less efficient - sample after expensive operations

355

val inefficientPipeline = massiveDataset

356

.filter(_ % 2 == 0) // Expensive operation on full dataset

357

.map(_ * _)

358

.sample(withReplacement = false, fraction = 0.01) // Sample last

359

```

360

361

### Index Assignment

362

363

```scala

364

// zipWithIndex requires global coordination - use sparingly on large datasets

365

val smallDataset = env.fromElements(1 to 1000: _*)

366

val indexedSmall = smallDataset.zipWithIndex() // OK for small data

367

368

val hugeDataset = env.fromElements(1 to 100000000: _*)

369

// Consider alternatives for huge datasets:

370

// 1. Use zipWithUniqueId if order doesn't matter

371

val uniqueIdVersion = hugeDataset.zipWithUniqueId()

372

373

// 2. Or add IDs during data generation if possible

374

val preIndexedData = env.fromElements((1 to 1000000).map(i => (i, s"value$i")): _*)

375

```

376

377

### Partition Analysis

378

379

```scala

380

// Use countElementsPerPartition for debugging partition skew

381

val skewedData = env.fromElements(

382

(1 to 1000).map(_ => "A") ++ // Most data in one key

383

(1 to 10).map(_ => "B") ++ // Little data in other keys

384

(1 to 5).map(_ => "C"): _*

385

)

386

387

val partitionAnalysis = skewedData

388

.groupBy(identity)

389

.reduceGroup(_.size)

390

.countElementsPerPartition()

391

392

// Use this information to rebalance if needed

393

val rebalanced = skewedData.rebalance()

394

```