or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aggregation-grouping.mddata-input-output.mddataset-operations.mdexecution-environments.mdindex.mditeration-operations.mdjoin-cogroup-operations.mdutility-functions.md

utility-functions.mddocs/

0

# Utility Functions

1

2

Helper utilities for common operations, parameter handling, and DataSet manipulation. These utilities simplify common tasks and provide additional functionality for Flink batch programs.

3

4

## Capabilities

5

6

### Parameter Handling

7

8

Utility for handling program parameters from various sources including command line arguments, properties files, and system properties.

9

10

```java { .api }

11

/**

12

* Utility class for handling program parameters

13

*/

14

public class ParameterTool implements Serializable {

15

16

/**

17

* Create ParameterTool from command line arguments

18

* @param args command line arguments in key-value format

19

* @return ParameterTool instance

20

*/

21

public static ParameterTool fromArgs(String[] args);

22

23

/**

24

* Create ParameterTool from properties file

25

* @param path path to the properties file

26

* @return ParameterTool instance

27

* @throws IOException if file cannot be read

28

*/

29

public static ParameterTool fromPropertiesFile(String path) throws IOException;

30

31

/**

32

* Create ParameterTool from properties file with ClassLoader

33

* @param file properties file

34

* @param classLoader class loader to use

35

* @return ParameterTool instance

36

* @throws IOException if file cannot be read

37

*/

38

public static ParameterTool fromPropertiesFile(File file, ClassLoader classLoader) throws IOException;

39

40

/**

41

* Create ParameterTool from Map

42

* @param map map containing key-value pairs

43

* @return ParameterTool instance

44

*/

45

public static ParameterTool fromMap(Map<String, String> map);

46

47

/**

48

* Create ParameterTool from system properties

49

* @return ParameterTool instance with system properties

50

*/

51

public static ParameterTool fromSystemProperties();

52

53

/**

54

* Get parameter value as String

55

* @param key parameter key

56

* @return parameter value or null if not found

57

*/

58

public String get(String key);

59

60

/**

61

* Get parameter value with default

62

* @param key parameter key

63

* @param defaultValue default value if key not found

64

* @return parameter value or default value

65

*/

66

public String get(String key, String defaultValue);

67

68

/**

69

* Get parameter value as integer

70

* @param key parameter key

71

* @return parameter value as integer

72

* @throws NumberFormatException if value is not a valid integer

73

*/

74

public int getInt(String key);

75

76

/**

77

* Get parameter value as integer with default

78

* @param key parameter key

79

* @param defaultValue default value if key not found

80

* @return parameter value as integer or default value

81

*/

82

public int getInt(String key, int defaultValue);

83

84

/**

85

* Get parameter value as long

86

* @param key parameter key

87

* @return parameter value as long

88

*/

89

public long getLong(String key);

90

91

/**

92

* Get parameter value as long with default

93

* @param key parameter key

94

* @param defaultValue default value if key not found

95

* @return parameter value as long or default value

96

*/

97

public long getLong(String key, long defaultValue);

98

99

/**

100

* Get parameter value as double

101

* @param key parameter key

102

* @return parameter value as double

103

*/

104

public double getDouble(String key);

105

106

/**

107

* Get parameter value as double with default

108

* @param key parameter key

109

* @param defaultValue default value if key not found

110

* @return parameter value as double or default value

111

*/

112

public double getDouble(String key, double defaultValue);

113

114

/**

115

* Get parameter value as boolean

116

* @param key parameter key

117

* @return parameter value as boolean

118

*/

119

public boolean getBoolean(String key);

120

121

/**

122

* Get parameter value as boolean with default

123

* @param key parameter key

124

* @param defaultValue default value if key not found

125

* @return parameter value as boolean or default value

126

*/

127

public boolean getBoolean(String key, boolean defaultValue);

128

129

/**

130

* Check if parameter key exists

131

* @param key parameter key

132

* @return true if key exists, false otherwise

133

*/

134

public boolean has(String key);

135

136

/**

137

* Convert to Flink Configuration object

138

* @return Configuration object with all parameters

139

*/

140

public Configuration getConfiguration();

141

142

/**

143

* Convert to Properties object

144

* @return Properties object with all parameters

145

*/

146

public Properties getProperties();

147

}

148

```

149

150

**Usage Examples:**

151

152

```java

153

public static void main(String[] args) throws Exception {

154

// Parse command line arguments

155

ParameterTool params = ParameterTool.fromArgs(args);

156

157

// Get parameters with defaults

158

String inputPath = params.get("input", "/default/input/path");

159

String outputPath = params.get("output", "/default/output/path");

160

int parallelism = params.getInt("parallelism", 1);

161

boolean verbose = params.getBoolean("verbose", false);

162

163

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

164

165

// Use configuration from parameters

166

env.getConfig().setGlobalJobParameters(params);

167

env.setParallelism(parallelism);

168

169

// Create program using parameters

170

DataSet<String> input = env.readTextFile(inputPath);

171

// ... process data ...

172

result.writeAsText(outputPath);

173

174

env.execute("Parameterized Job");

175

}

176

177

// Usage from properties file

178

ParameterTool fileParams = ParameterTool.fromPropertiesFile("/path/to/config.properties");

179

String dbHost = fileParams.get("database.host", "localhost");

180

int dbPort = fileParams.getInt("database.port", 5432);

181

182

// Combine multiple parameter sources

183

ParameterTool systemParams = ParameterTool.fromSystemProperties();

184

ParameterTool combinedParams = fileParams.mergeWith(systemParams);

185

```

186

187

### Multiple Parameter Tool

188

189

Enhanced parameter tool supporting multiple values per key.

190

191

```java { .api }

192

/**

193

* Parameter tool supporting multiple values for the same key

194

*/

195

public class MultipleParameterTool implements Serializable {

196

197

/**

198

* Create from command line arguments allowing multiple values

199

* @param args command line arguments

200

* @return MultipleParameterTool instance

201

*/

202

public static MultipleParameterTool fromArgs(String[] args);

203

204

/**

205

* Get all values for a key

206

* @param key parameter key

207

* @return list of all values for the key

208

*/

209

public List<String> getMultiple(String key);

210

211

/**

212

* Get first value for a key

213

* @param key parameter key

214

* @return first value or null if not found

215

*/

216

public String get(String key);

217

218

/**

219

* Convert to regular ParameterTool (keeps only first value per key)

220

* @return ParameterTool instance

221

*/

222

public ParameterTool getParameterTool();

223

}

224

```

225

226

### DataSet Utilities

227

228

Utility functions for common DataSet operations and manipulations.

229

230

```java { .api }

231

/**

232

* Utility class for DataSet operations

233

*/

234

public class DataSetUtils {

235

236

/**

237

* Zip DataSet elements with sequential index

238

* @param input input DataSet

239

* @return DataSet of Tuple2 with index and original element

240

*/

241

public static <T> DataSet<Tuple2<Long, T>> zipWithIndex(DataSet<T> input);

242

243

/**

244

* Zip DataSet elements with unique ID

245

* @param input input DataSet

246

* @return DataSet of Tuple2 with unique ID and original element

247

*/

248

public static <T> DataSet<Tuple2<Long, T>> zipWithUniqueId(DataSet<T> input);

249

250

/**

251

* Sample elements from DataSet

252

* @param input input DataSet

253

* @param withReplacement whether to sample with replacement

254

* @param fraction fraction of elements to sample (0.0 to 1.0)

255

* @return DataSet with sampled elements

256

*/

257

public static <T> DataSet<T> sample(DataSet<T> input, boolean withReplacement, double fraction);

258

259

/**

260

* Sample elements from DataSet with random seed

261

* @param input input DataSet

262

* @param withReplacement whether to sample with replacement

263

* @param fraction fraction of elements to sample

264

* @param seed random seed for reproducible sampling

265

* @return DataSet with sampled elements

266

*/

267

public static <T> DataSet<T> sample(DataSet<T> input, boolean withReplacement, double fraction, long seed);

268

269

/**

270

* Sample fixed number of elements from DataSet

271

* @param input input DataSet

272

* @param withReplacement whether to sample with replacement

273

* @param numSamples number of samples to take

274

* @return DataSet with sampled elements

275

*/

276

public static <T> DataSet<T> sampleWithSize(DataSet<T> input, boolean withReplacement, int numSamples);

277

278

/**

279

* Sample fixed number of elements with random seed

280

* @param input input DataSet

281

* @param withReplacement whether to sample with replacement

282

* @param numSamples number of samples to take

283

* @param seed random seed for reproducible sampling

284

* @return DataSet with sampled elements

285

*/

286

public static <T> DataSet<T> sampleWithSize(DataSet<T> input, boolean withReplacement, int numSamples, long seed);

287

288

/**

289

* Count elements in each partition

290

* @param input input DataSet

291

* @return DataSet with partition ID and element count pairs

292

*/

293

public static <T> DataSet<Tuple2<Integer, Long>> countElementsPerPartition(DataSet<T> input);

294

}

295

```

296

297

**Usage Examples:**

298

299

```java

300

// Add sequential index to elements

301

DataSet<String> words = env.fromElements("hello", "world", "flink");

302

DataSet<Tuple2<Long, String>> indexed = DataSetUtils.zipWithIndex(words);

303

// Result: (0, "hello"), (1, "world"), (2, "flink")

304

305

// Add unique IDs (not necessarily sequential)

306

DataSet<Tuple2<Long, String>> withIds = DataSetUtils.zipWithUniqueId(words);

307

308

// Sample 50% of elements

309

DataSet<String> largeDatatSet = env.readTextFile("/path/to/large/file.txt");

310

DataSet<String> sample = DataSetUtils.sample(largeDatatSet, false, 0.5);

311

312

// Sample fixed number of elements

313

DataSet<String> fixedSample = DataSetUtils.sampleWithSize(largeDatatSet, false, 1000);

314

315

// Sample with seed for reproducible results

316

DataSet<String> reproducibleSample = DataSetUtils.sample(largeDatatSet, false, 0.1, 12345L);

317

318

// Count elements per partition

319

DataSet<Tuple2<Integer, Long>> partitionCounts = DataSetUtils.countElementsPerPartition(largeDatatSet);

320

```

321

322

### Data Summarization

323

324

Utilities for computing summary statistics on DataSets.

325

326

```java { .api }

327

/**

328

* Interface for column summary statistics

329

*/

330

public interface ColumnSummary {

331

/**

332

* Get total count of elements

333

* @return total element count

334

*/

335

long getTotalCount();

336

337

/**

338

* Get count of null elements

339

* @return null element count

340

*/

341

long getNullCount();

342

343

/**

344

* Get count of non-null elements

345

* @return non-null element count

346

*/

347

long getNonNullCount();

348

}

349

350

/**

351

* Summary statistics for numeric columns

352

* @param <T> numeric type (Integer, Long, Double, etc.)

353

*/

354

public interface NumericColumnSummary<T> extends ColumnSummary {

355

/**

356

* Get minimum value

357

* @return minimum value

358

*/

359

T getMin();

360

361

/**

362

* Get maximum value

363

* @return maximum value

364

*/

365

T getMax();

366

367

/**

368

* Get sum of all values

369

* @return sum

370

*/

371

Double getSum();

372

373

/**

374

* Get mean (average) value

375

* @return mean value

376

*/

377

Double getMean();

378

379

/**

380

* Get variance

381

* @return variance

382

*/

383

Double getVariance();

384

385

/**

386

* Get standard deviation

387

* @return standard deviation

388

*/

389

Double getStandardDeviation();

390

}

391

392

/**

393

* Summary statistics for string columns

394

*/

395

public interface StringColumnSummary extends ColumnSummary {

396

/**

397

* Get minimum string length

398

* @return minimum length

399

*/

400

Integer getMinLength();

401

402

/**

403

* Get maximum string length

404

* @return maximum length

405

*/

406

Integer getMaxLength();

407

408

/**

409

* Get mean string length

410

* @return mean length

411

*/

412

Double getMeanLength();

413

414

/**

415

* Check if all values are numeric

416

* @return true if all non-null values are numeric

417

*/

418

Boolean getIsNumeric();

419

}

420

421

/**

422

* Summary statistics for boolean columns

423

*/

424

public interface BooleanColumnSummary extends ColumnSummary {

425

/**

426

* Get count of true values

427

* @return true count

428

*/

429

Long getTrueCount();

430

431

/**

432

* Get count of false values

433

* @return false count

434

*/

435

Long getFalseCount();

436

}

437

```

438

439

### Function Annotations

440

441

Annotations for optimizing user-defined functions by specifying field forwarding patterns.

442

443

```java { .api }

444

/**

445

* Container class for function annotations

446

*/

447

public class FunctionAnnotation {

448

449

/**

450

* Annotation to specify forwarded fields

451

*/

452

@Target(ElementType.TYPE)

453

@Retention(RetentionPolicy.RUNTIME)

454

public @interface ForwardedFields {

455

String[] value();

456

}

457

458

/**

459

* Annotation for forwarded fields from first input (for two-input functions)

460

*/

461

@Target(ElementType.TYPE)

462

@Retention(RetentionPolicy.RUNTIME)

463

public @interface ForwardedFieldsFirst {

464

String[] value();

465

}

466

467

/**

468

* Annotation for forwarded fields from second input (for two-input functions)

469

*/

470

@Target(ElementType.TYPE)

471

@Retention(RetentionPolicy.RUNTIME)

472

public @interface ForwardedFieldsSecond {

473

String[] value();

474

}

475

476

/**

477

* Annotation to specify non-forwarded fields

478

*/

479

@Target(ElementType.TYPE)

480

@Retention(RetentionPolicy.RUNTIME)

481

public @interface NonForwardedFields {

482

String[] value();

483

}

484

485

/**

486

* Annotation to specify which fields are read by the function

487

*/

488

@Target(ElementType.TYPE)

489

@Retention(RetentionPolicy.RUNTIME)

490

public @interface ReadFields {

491

String[] value();

492

}

493

494

/**

495

* Annotation to specify which fields are not read by the function

496

*/

497

@Target(ElementType.TYPE)

498

@Retention(RetentionPolicy.RUNTIME)

499

public @interface SkippedFields {

500

String[] value();

501

}

502

}

503

```

504

505

**Usage Examples:**

506

507

```java

508

// Function that forwards first field unchanged

509

@FunctionAnnotation.ForwardedFields("0")

510

public static class AddConstantMap implements MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {

511

@Override

512

public Tuple2<String, Integer> map(Tuple2<String, Integer> value) {

513

return new Tuple2<>(value.f0, value.f1 + 10); // field 0 is forwarded, field 1 is modified

514

}

515

}

516

517

// Join function that forwards fields from both inputs

518

@FunctionAnnotation.ForwardedFieldsFirst("0")

519

@FunctionAnnotation.ForwardedFieldsSecond("1")

520

public static class CombineJoin implements JoinFunction<Tuple2<String, Integer>, Tuple2<String, Double>, Tuple3<String, Integer, Double>> {

521

@Override

522

public Tuple3<String, Integer, Double> join(Tuple2<String, Integer> first, Tuple2<String, Double> second) {

523

return new Tuple3<>(first.f0, first.f1, second.f1);

524

}

525

}

526

527

// Function that only reads certain fields

528

@FunctionAnnotation.ReadFields("1;2") // only reads fields 1 and 2

529

public static class PartialReader implements MapFunction<Tuple4<String, Integer, Double, Boolean>, String> {

530

@Override

531

public String map(Tuple4<String, Integer, Double, Boolean> value) {

532

return "Value: " + value.f1 + ", " + value.f2; // only uses f1 and f2

533

}

534

}

535

```

536

537

## Types

538

539

```java { .api }

540

import org.apache.flink.api.java.utils.ParameterTool;

541

import org.apache.flink.api.java.utils.MultipleParameterTool;

542

import org.apache.flink.api.java.utils.DataSetUtils;

543

import org.apache.flink.api.java.summarize.*;

544

import org.apache.flink.api.java.functions.FunctionAnnotation;

545

import org.apache.flink.configuration.Configuration;

546

import org.apache.flink.api.java.tuple.Tuple2;

547

import java.util.Properties;

548

import java.util.List;

549

import java.util.Map;

550

import java.io.IOException;

551

```