or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

binary-operations.mddata-transformations.mdexecution-environment.mdgrouping-aggregation.mdindex.mdinput-output.mdpartitioning-distribution.mdtype-system.mdutilities.md

utilities.mddocs/

0

# Utilities

1

2

Advanced utilities for sampling, indexing, data analysis, and partial function support. These utilities extend DataSet functionality with specialized operations.

3

4

## Capabilities

5

6

### DataSet Utilities

7

8

Enhanced utilities available through implicit conversion on DataSets.

9

10

```scala { .api }

11

implicit class DataSetUtils[T](dataSet: DataSet[T]) {

12

/**

13

* Adds consecutive indices to each element starting from 0

14

* @return DataSet of (index, element) tuples

15

*/

16

def zipWithIndex: DataSet[(Long, T)]

17

18

/**

19

* Adds unique identifiers to each element

20

* @return DataSet of (uniqueId, element) tuples

21

*/

22

def zipWithUniqueId: DataSet[(Long, T)]

23

24

/**

25

* Counts the number of elements in each partition

26

* @return DataSet of (partitionIndex, elementCount) tuples

27

*/

28

def countElementsPerPartition: DataSet[(Int, Long)]

29

30

/**

31

* Computes checksum and total count of the DataSet

32

* @return ChecksumHashCode with checksum and count

33

*/

34

def checksumHashCode(): ChecksumHashCode

35

}

36

```

37

38

**Usage Examples:**

39

40

```scala

41

import org.apache.flink.api.scala._

42

import org.apache.flink.api.scala.utils._

43

44

val env = ExecutionEnvironment.getExecutionEnvironment

45

val data = env.fromElements("apple", "banana", "cherry", "date")

46

47

// Add consecutive indices

48

val indexed = data.zipWithIndex

49

// Result: [(0, "apple"), (1, "banana"), (2, "cherry"), (3, "date")]

50

51

// Add unique IDs (useful in distributed environment)

52

val withIds = data.zipWithUniqueId

53

// Result: [(uniqueId1, "apple"), (uniqueId2, "banana"), ...]

54

55

// Count elements per partition

56

val partitionCounts = data.countElementsPerPartition

57

// Result: [(0, 2), (1, 2)] for 2 partitions with 2 elements each

58

59

// Get checksum and count

60

val checksum = data.checksumHashCode()

61

println(s"Checksum: ${checksum.getChecksum}, Count: ${checksum.getCount}")

62

```

63

64

### Sampling Operations

65

66

Statistical sampling methods for data analysis and subset creation.

67

68

```scala { .api }

69

implicit class DataSetUtils[T](dataSet: DataSet[T]) {

70

/**

71

* Samples elements randomly by fraction

72

* @param withReplacement Whether to sample with replacement

73

* @param fraction Fraction of elements to sample (0.0 to 1.0)

74

* @param seed Random seed for reproducibility

75

* @return DataSet with sampled elements

76

*/

77

def sample(withReplacement: Boolean, fraction: Double, seed: Long = Random.nextLong()): DataSet[T]

78

79

/**

80

* Samples a specific number of elements

81

* @param withReplacement Whether to sample with replacement

82

* @param numSamples Number of elements to sample

83

* @param seed Random seed for reproducibility

84

* @return DataSet with sampled elements

85

*/

86

def sampleWithSize(withReplacement: Boolean, numSamples: Int, seed: Long = Random.nextLong()): DataSet[T]

87

}

88

```

89

90

**Usage Examples:**

91

92

```scala

93

val largeDataset = env.generateSequence(1, 1000000)

94

95

// Sample 10% of elements without replacement

96

val sample10Percent = largeDataset.sample(withReplacement = false, fraction = 0.1)

97

98

// Sample exactly 1000 elements with replacement

99

val exactSample = largeDataset.sampleWithSize(withReplacement = true, numSamples = 1000)

100

101

// Reproducible sampling with seed

102

val reproducibleSample = largeDataset.sample(

103

withReplacement = false,

104

fraction = 0.05,

105

seed = 12345L

106

)

107

```

108

109

### Advanced Range Partitioning

110

111

Range partitioning with custom data distribution for optimized data placement.

112

113

```scala { .api }

114

implicit class DataSetUtils[T](dataSet: DataSet[T]) {

115

/**

116

* Partitions by range using custom data distribution

117

* @param distribution Custom data distribution

118

* @param fields Field positions for partitioning

119

* @return DataSet with custom range partitioning

120

*/

121

def partitionByRange(distribution: DataDistribution, fields: Int*): DataSet[T]

122

123

/**

124

* Partitions by range using custom data distribution and field names

125

* @param distribution Custom data distribution

126

* @param firstField First field name

127

* @param otherFields Additional field names

128

* @return DataSet with custom range partitioning

129

*/

130

def partitionByRange(

131

distribution: DataDistribution,

132

firstField: String,

133

otherFields: String*

134

): DataSet[T]

135

136

/**

137

* Partitions by range using custom data distribution and key selector

138

* @param distribution Custom data distribution

139

* @param fun Key selector function

140

* @return DataSet with custom range partitioning

141

*/

142

def partitionByRange[K: TypeInformation](

143

distribution: DataDistribution,

144

fun: T => K

145

): DataSet[T]

146

}

147

```

148

149

**Usage Examples:**

150

151

```scala

152

// Custom data distribution for range partitioning

153

case class SalesRecord(region: String, amount: Double, date: String)

154

155

val salesData = env.fromElements(

156

SalesRecord("North", 1000.0, "2023-01-01"),

157

SalesRecord("South", 1500.0, "2023-01-02"),

158

SalesRecord("East", 800.0, "2023-01-03")

159

)

160

161

// Create custom distribution (implementation depends on requirements)

162

val customDistribution = new DataDistribution {

163

// Implementation for determining partition boundaries

164

}

165

166

val partitionedSales = salesData.partitionByRange(customDistribution, _.amount)

167

```

168

169

### Partial Function Extensions

170

171

Enable pattern matching and partial function usage in transformations.

172

173

```scala { .api }

174

// Import for partial function support

175

import org.apache.flink.api.scala.extensions._

176

177

implicit class OnDataSet[T](dataSet: DataSet[T]) {

178

/**

179

* Maps using partial function with pattern matching

180

* @param fun Partial function for transformation

181

* @return DataSet with partial function mapping

182

*/

183

def mapWith[R: TypeInformation: ClassTag](fun: T => R): DataSet[R]

184

185

/**

186

* FlatMaps using partial function

187

* @param fun Partial function returning traversable

188

* @return DataSet with partial function flatMap

189

*/

190

def flatMapWith[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataSet[R]

191

192

/**

193

* Filters using partial function

194

* @param fun Partial function for filtering

195

* @return DataSet with partial function filtering

196

*/

197

def filterWith(fun: T => Boolean): DataSet[T]

198

199

/**

200

* Reduces using partial function

201

* @param fun Partial function for reduction

202

* @return DataSet with partial function reduction

203

*/

204

def reduceWith(fun: (T, T) => T): DataSet[T]

205

206

/**

207

* Reduces groups using partial function

208

* @param fun Partial function for group reduction

209

* @return DataSet with partial function group reduction

210

*/

211

def reduceGroupWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R]

212

213

/**

214

* Groups by using partial function

215

* @param fun Partial function for key extraction

216

* @return GroupedDataSet with partial function grouping

217

*/

218

def groupingBy[K: TypeInformation](fun: T => K): GroupedDataSet[T]

219

220

/**

221

* MapPartition using partial function on streams

222

* @param fun Partial function for partition transformation

223

* @return DataSet with partial function mapPartition

224

*/

225

def mapPartitionWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R]

226

}

227

```

228

229

**Usage Examples:**

230

231

```scala

232

import org.apache.flink.api.scala._

233

import org.apache.flink.api.scala.extensions._

234

235

sealed trait Event

236

case class UserEvent(userId: String, action: String) extends Event

237

case class SystemEvent(level: String, message: String) extends Event

238

case class ErrorEvent(error: String, stackTrace: String) extends Event

239

240

val events: DataSet[Event] = env.fromElements(

241

UserEvent("user1", "login"),

242

SystemEvent("INFO", "System started"),

243

ErrorEvent("NPE", "NullPointerException at line 42")

244

)

245

246

// Pattern matching with partial functions

247

val userActions = events.mapWith {

248

case UserEvent(userId, action) => s"$userId performed $action"

249

case _ => "Not a user event"

250

}

251

252

val criticalEvents = events.filterWith {

253

case ErrorEvent(_, _) => true

254

case SystemEvent("ERROR", _) => true

255

case _ => false

256

}

257

258

// Group by event type using pattern matching

259

val groupedByType = events.groupingBy {

260

case _: UserEvent => "user"

261

case _: SystemEvent => "system"

262

case _: ErrorEvent => "error"

263

}

264

```

265

266

### Grouped DataSet Extensions

267

268

Partial function support for grouped operations.

269

270

```scala { .api }

271

implicit class OnGroupedDataSet[T](groupedDataSet: GroupedDataSet[T]) {

272

/**

273

* Reduces groups using partial function

274

* @param fun Partial function for group reduction

275

* @return DataSet with partial function group reduction

276

*/

277

def reduceWith(fun: (T, T) => T): DataSet[T]

278

279

/**

280

* Reduces groups using partial function on streams

281

* @param fun Partial function processing group as stream

282

* @return DataSet with stream-based group reduction

283

*/

284

def reduceGroupWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R]

285

}

286

```

287

288

### Binary Operation Extensions

289

290

Partial function support for join, cross, and coGroup operations.

291

292

```scala { .api }

293

implicit class OnJoinFunctionAssigner[L, R](joiner: JoinFunctionAssigner[L, R]) {

294

/**

295

* Applies join function using partial functions

296

* @param fun Partial function for joining elements

297

* @return DataSet with partial function join

298

*/

299

def applyWith[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O]

300

}

301

302

implicit class OnCrossDataSet[L, R](crossDataSet: CrossDataSet[L, R]) {

303

/**

304

* Applies cross function using partial functions

305

* @param fun Partial function for crossing elements

306

* @return DataSet with partial function cross

307

*/

308

def applyWith[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O]

309

}

310

311

implicit class OnCoGroupDataSet[L, R](coGroupDataSet: CoGroupDataSet[L, R]) {

312

/**

313

* Applies coGroup function using partial functions on streams

314

* @param fun Partial function for coGrouping streams

315

* @return DataSet with partial function coGroup

316

*/

317

def applyWith[O: TypeInformation: ClassTag](fun: (Stream[L], Stream[R]) => O): DataSet[O]

318

}

319

```

320

321

**Usage Examples:**

322

323

```scala

324

case class Order(customerId: Int, productId: Int, amount: Double)

325

case class Customer(id: Int, name: String, segment: String)

326

327

val orders = env.fromElements(

328

Order(1, 101, 99.99),

329

Order(2, 102, 149.99)

330

)

331

332

val customers = env.fromElements(

333

Customer(1, "Alice", "Premium"),

334

Customer(2, "Bob", "Standard")

335

)

336

337

// Join with partial functions

338

val customerOrders = customers

339

.join(orders)

340

.where(_.id)

341

.equalTo(_.customerId)

342

.applyWith { (customer, order) =>

343

s"${customer.name} (${customer.segment}) ordered product ${order.productId} for $${order.amount}"

344

}

345

```

346

347

### Metrics Integration

348

349

Scala-friendly gauge metrics for monitoring.

350

351

```scala { .api }

352

class ScalaGauge[T](getValue: () => T) extends Gauge[T] {

353

/**

354

* Gets the current gauge value

355

* @return Current value

356

*/

357

override def getValue: T = getValue()

358

}

359

```

360

361

**Usage Examples:**

362

363

```scala

364

import org.apache.flink.api.scala.metrics.ScalaGauge

365

import org.apache.flink.api.common.functions.RichMapFunction

366

import org.apache.flink.metrics.MetricGroup

367

368

class MonitoredMapFunction extends RichMapFunction[String, String] {

369

@volatile private var processedCount = 0L

370

371

override def open(parameters: Configuration): Unit = {

372

val metricGroup: MetricGroup = getRuntimeContext.getMetricGroup

373

374

// Register Scala gauge

375

metricGroup.gauge("processedCount", new ScalaGauge(() => processedCount))

376

}

377

378

override def map(value: String): String = {

379

processedCount += 1

380

value.toUpperCase

381

}

382

}

383

```

384

385

### Package Utilities

386

387

Utility functions in the utils package.

388

389

```scala { .api }

390

package object utils {

391

/**

392

* Gets call location name for debugging

393

* @param depth Stack depth to examine

394

* @return Call location name

395

*/

396

def getCallLocationName(depth: Int = 3): String

397

}

398

```

399

400

## Types

401

402

```scala { .api }

403

class ChecksumHashCode {

404

/**

405

* Gets the computed checksum

406

* @return Checksum value

407

*/

408

def getChecksum: Long

409

410

/**

411

* Gets the element count

412

* @return Number of elements

413

*/

414

def getCount: Long

415

}

416

417

trait DataDistribution {

418

/**

419

* Gets bucket boundary for range partitioning

420

* @param bucketNum Bucket number

421

* @param totalNumBuckets Total number of buckets

422

* @return Bucket boundary object

423

*/

424

def getBucketBoundary(bucketNum: Int, totalNumBuckets: Int): AnyRef

425

}

426

427

// Wrapper classes for partial function support

428

class OnDataSet[T](dataSet: DataSet[T]) {

429

// Enhanced DataSet with partial function capabilities

430

}

431

432

class OnGroupedDataSet[T](groupedDataSet: GroupedDataSet[T]) {

433

// Enhanced GroupedDataSet with partial function capabilities

434

}

435

436

class OnJoinFunctionAssigner[L, R](joiner: JoinFunctionAssigner[L, R]) {

437

// Enhanced join operations with partial function capabilities

438

}

439

440

class OnCrossDataSet[L, R](crossDataSet: CrossDataSet[L, R]) {

441

// Enhanced cross operations with partial function capabilities

442

}

443

444

class OnCoGroupDataSet[L, R](coGroupDataSet: CoGroupDataSet[L, R]) {

445

// Enhanced coGroup operations with partial function capabilities

446

}

447

448

class ScalaGauge[T](getValue: () => T) extends Gauge[T] {

449

// Scala-friendly gauge metric implementation

450

}

451

452

// Stream type for partial function support

453

type Stream[T] = scala.collection.immutable.Stream[T]

454

```