or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cancellation-testing.mdcheckpointing-migration.mdfault-tolerance-recovery.mdindex.mdoperator-lifecycle.mdplugin-testing.mdruntime-utilities.mdsession-window-testing.mdstate-backend-restore.mdtest-data-utilities.md

runtime-utilities.mddocs/

0

# Runtime Utilities

1

2

Comprehensive collection of runtime utilities for test execution, process management, and common testing operations. These utilities provide essential infrastructure for running tests in controlled environments and managing Flink job execution.

3

4

## Capabilities

5

6

### Job Graph Execution Utilities

7

8

Utilities for executing JobGraphs on MiniCluster instances with comprehensive control and monitoring capabilities.

9

10

```java { .api }

11

/**

12

* Utility for running JobGraphs on MiniCluster for testing

13

*/

14

public class JobGraphRunningUtil {

15

16

/**

17

* Execute JobGraph on MiniCluster and wait for completion

18

* @param jobGraph JobGraph to execute

19

* @param miniCluster MiniCluster instance for execution

20

* @throws Exception if job execution fails

21

*/

22

public static void execute(JobGraph jobGraph, MiniCluster miniCluster) throws Exception;

23

24

/**

25

* Execute JobGraph with timeout

26

* @param jobGraph JobGraph to execute

27

* @param miniCluster MiniCluster instance

28

* @param timeoutMs timeout in milliseconds

29

* @return JobExecutionResult containing execution results

30

* @throws Exception if execution fails or times out

31

*/

32

public static JobExecutionResult executeWithTimeout(

33

JobGraph jobGraph,

34

MiniCluster miniCluster,

35

long timeoutMs) throws Exception;

36

37

/**

38

* Execute JobGraph and return execution result

39

* @param jobGraph JobGraph to execute

40

* @param miniCluster MiniCluster instance

41

* @return JobExecutionResult with job execution details

42

* @throws Exception if execution fails

43

*/

44

public static JobExecutionResult executeAndGetResult(

45

JobGraph jobGraph,

46

MiniCluster miniCluster) throws Exception;

47

48

/**

49

* Execute multiple JobGraphs sequentially

50

* @param jobGraphs list of JobGraphs to execute

51

* @param miniCluster MiniCluster instance

52

* @return List of JobExecutionResults

53

* @throws Exception if any job execution fails

54

*/

55

public static List<JobExecutionResult> executeSequentially(

56

List<JobGraph> jobGraphs,

57

MiniCluster miniCluster) throws Exception;

58

59

/**

60

* Submit JobGraph asynchronously and return CompletableFuture

61

* @param jobGraph JobGraph to submit

62

* @param miniCluster MiniCluster instance

63

* @return CompletableFuture containing JobExecutionResult

64

*/

65

public static CompletableFuture<JobExecutionResult> submitAsync(

66

JobGraph jobGraph,

67

MiniCluster miniCluster);

68

}

69

```

70

71

### Process Management Utilities

72

73

Entry points and utilities for managing external processes during testing scenarios.

74

75

```java { .api }

76

/**

77

* Entry point for task executor process testing

78

*/

79

public class TaskExecutorProcessEntryPoint {

80

81

/**

82

* Main entry point for standalone task executor process

83

* @param args command line arguments for task executor configuration

84

*/

85

public static void main(String[] args);

86

87

/**

88

* Start task executor with specific configuration

89

* @param config Configuration for task executor setup

90

* @throws Exception if task executor startup fails

91

*/

92

public static void startTaskExecutor(Configuration config) throws Exception;

93

94

/**

95

* Create default configuration for task executor testing

96

* @return Configuration with testing defaults

97

*/

98

public static Configuration createDefaultTestConfiguration();

99

}

100

```

101

102

### Test Function Utilities

103

104

Common test functions and utilities for data processing and validation scenarios.

105

106

```java { .api }

107

/**

108

* Tokenizer function for string processing tests

109

*/

110

public class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {

111

112

/**

113

* Constructor for tokenizer with default configuration

114

*/

115

public Tokenizer();

116

117

/**

118

* Constructor for tokenizer with custom delimiter

119

* @param delimiter delimiter pattern for tokenization

120

*/

121

public Tokenizer(String delimiter);

122

123

@Override

124

public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception;

125

}

126

127

/**

128

* Identity mapper for testing data flow without transformation

129

*/

130

public class NoOpIntMap implements MapFunction<Integer, Integer> {

131

132

/**

133

* Constructor for no-operation integer mapper

134

*/

135

public NoOpIntMap();

136

137

@Override

138

public Integer map(Integer value) throws Exception;

139

}

140

141

/**

142

* No-operation sink for testing data flow completion

143

*/

144

public class ReceiveCheckNoOpSink<T> implements SinkFunction<T> {

145

146

/**

147

* Constructor for no-op sink with receive tracking

148

* @param expectedCount expected number of elements to receive

149

*/

150

public ReceiveCheckNoOpSink(int expectedCount);

151

152

@Override

153

public void invoke(T value, Context context) throws Exception;

154

155

/**

156

* Check if expected number of elements were received

157

* @return boolean indicating if expected count was reached

158

*/

159

public boolean receivedExpectedCount();

160

161

/**

162

* Get actual count of received elements

163

* @return int representing actual received count

164

*/

165

public int getReceivedCount();

166

167

/**

168

* Reset the counter for reuse in multiple tests

169

*/

170

public void reset();

171

}

172

```

173

174

### Recovery Testing Utilities

175

176

Utilities specifically designed for testing recovery scenarios and restart strategies.

177

178

```java { .api }

179

/**

180

* Utility class for recovery testing operations

181

*/

182

public class RecoveryTestUtils {

183

184

/**

185

* Validate recovery behavior from job execution result

186

* @param result JobExecutionResult to analyze

187

* @param expectedRestartCount expected number of restarts

188

* @return boolean indicating if recovery behavior is valid

189

*/

190

public static boolean validateRecoveryBehavior(

191

JobExecutionResult result,

192

int expectedRestartCount);

193

194

/**

195

* Create configuration for failure injection testing

196

* @param restartStrategy restart strategy to use

197

* @param maxFailures maximum number of failures to inject

198

* @return Configuration with failure injection settings

199

*/

200

public static Configuration createFailureInjectionConfig(

201

String restartStrategy,

202

int maxFailures);

203

204

/**

205

* Wait for job to reach specific state with timeout

206

* @param jobId JobID to monitor

207

* @param targetState target JobStatus to wait for

208

* @param timeoutMs timeout in milliseconds

209

* @param restGateway RestClient for job monitoring

210

* @return boolean indicating if state was reached

211

* @throws Exception if monitoring fails

212

*/

213

public static boolean waitForJobState(

214

JobID jobId,

215

JobStatus targetState,

216

long timeoutMs,

217

RestClusterClient<?> restGateway) throws Exception;

218

}

219

```

220

221

### Test Environment Utilities

222

223

Utilities for setting up and managing test environments and configurations.

224

225

```java { .api }

226

/**

227

* Utility for creating and managing test environments

228

*/

229

public class TestEnvironmentUtil {

230

231

/**

232

* Create MiniCluster configuration for testing

233

* @param parallelism desired parallelism

234

* @param numTaskManagers number of task managers

235

* @return Configuration for MiniCluster setup

236

*/

237

public static Configuration createTestClusterConfig(

238

int parallelism,

239

int numTaskManagers);

240

241

/**

242

* Create streaming environment for testing

243

* @param parallelism parallelism for the environment

244

* @param checkpointingEnabled whether to enable checkpointing

245

* @return StreamExecutionEnvironment configured for testing

246

*/

247

public static StreamExecutionEnvironment createTestStreamEnv(

248

int parallelism,

249

boolean checkpointingEnabled);

250

251

/**

252

* Create batch environment for testing

253

* @param parallelism parallelism for the environment

254

* @return ExecutionEnvironment configured for testing

255

*/

256

public static ExecutionEnvironment createTestBatchEnv(int parallelism);

257

258

/**

259

* Set up test-specific logging configuration

260

* @param logLevel logging level for tests

261

* @param logToConsole whether to log to console

262

*/

263

public static void setupTestLogging(Level logLevel, boolean logToConsole);

264

265

/**

266

* Clean up test environment resources

267

* @param environment execution environment to clean up

268

* @param miniCluster mini cluster to shut down

269

* @throws Exception if cleanup fails

270

*/

271

public static void cleanupTestEnvironment(

272

StreamExecutionEnvironment environment,

273

MiniCluster miniCluster) throws Exception;

274

}

275

```

276

277

### Metrics and Monitoring Utilities

278

279

Utilities for collecting and validating metrics during test execution.

280

281

```java { .api }

282

/**

283

* Utility for metrics collection and validation in tests

284

*/

285

public class TestMetricsUtil {

286

287

/**

288

* Collect all metrics from MiniCluster

289

* @param miniCluster cluster to collect metrics from

290

* @return Map of metric names to values

291

* @throws Exception if metrics collection fails

292

*/

293

public static Map<String, Object> collectAllMetrics(MiniCluster miniCluster) throws Exception;

294

295

/**

296

* Wait for specific metric to reach expected value

297

* @param miniCluster cluster to monitor

298

* @param metricName name of metric to monitor

299

* @param expectedValue expected metric value

300

* @param timeoutMs timeout in milliseconds

301

* @return boolean indicating if metric reached expected value

302

* @throws Exception if monitoring fails

303

*/

304

public static boolean waitForMetricValue(

305

MiniCluster miniCluster,

306

String metricName,

307

Object expectedValue,

308

long timeoutMs) throws Exception;

309

310

/**

311

* Validate job metrics against expected values

312

* @param jobId JobID to validate metrics for

313

* @param expectedMetrics map of expected metric values

314

* @param miniCluster cluster to collect metrics from

315

* @return boolean indicating if all metrics match expectations

316

* @throws Exception if validation fails

317

*/

318

public static boolean validateJobMetrics(

319

JobID jobId,

320

Map<String, Object> expectedMetrics,

321

MiniCluster miniCluster) throws Exception;

322

}

323

```

324

325

**Usage Examples:**

326

327

```java

328

import org.apache.flink.test.runtime.*;

329

import org.apache.flink.test.util.*;

330

331

// Example: Executing jobs with runtime utilities

332

public class JobExecutionTest {

333

334

@Test

335

public void testJobExecution() throws Exception {

336

// Create test job graph

337

JobGraph jobGraph = createTestJobGraph();

338

339

// Set up mini cluster for testing

340

Configuration config = TestEnvironmentUtil.createTestClusterConfig(4, 2);

341

MiniCluster miniCluster = new MiniCluster(config);

342

miniCluster.start();

343

344

try {

345

// Execute job with timeout

346

JobExecutionResult result = JobGraphRunningUtil.executeWithTimeout(

347

jobGraph, miniCluster, 30000L);

348

349

// Validate execution results

350

assertTrue(result.isSuccess());

351

assertFalse(result.getAllAccumulatorResults().isEmpty());

352

353

} finally {

354

miniCluster.close();

355

}

356

}

357

358

@Test

359

public void testSequentialJobExecution() throws Exception {

360

List<JobGraph> jobs = Arrays.asList(

361

createTestJobGraph("job1"),

362

createTestJobGraph("job2"),

363

createTestJobGraph("job3")

364

);

365

366

Configuration config = TestEnvironmentUtil.createTestClusterConfig(2, 1);

367

MiniCluster miniCluster = new MiniCluster(config);

368

miniCluster.start();

369

370

try {

371

// Execute jobs sequentially

372

List<JobExecutionResult> results = JobGraphRunningUtil.executeSequentially(

373

jobs, miniCluster);

374

375

// Validate all jobs completed successfully

376

assertEquals(3, results.size());

377

results.forEach(result -> assertTrue(result.isSuccess()));

378

379

} finally {

380

miniCluster.close();

381

}

382

}

383

}

384

385

// Example: Recovery testing with utilities

386

public class RecoveryUtilityTest {

387

388

@Test

389

public void testRecoveryBehavior() throws Exception {

390

// Create job with failure injection

391

JobGraph faultTolerantJob = createJobWithFailures();

392

393

// Configure recovery settings

394

Configuration config = RecoveryTestUtils.createFailureInjectionConfig(

395

"fixed-delay", 3);

396

config.setString("restart-strategy.fixed-delay.delay", "1s");

397

398

MiniCluster miniCluster = new MiniCluster(config);

399

miniCluster.start();

400

401

try {

402

// Execute job and wait for completion

403

JobExecutionResult result = JobGraphRunningUtil.executeAndGetResult(

404

faultTolerantJob, miniCluster);

405

406

// Validate recovery behavior

407

boolean recoveryValid = RecoveryTestUtils.validateRecoveryBehavior(

408

result, 2);

409

assertTrue(recoveryValid);

410

411

} finally {

412

miniCluster.close();

413

}

414

}

415

}

416

417

// Example: Using test functions

418

public class TestFunctionUsage {

419

420

@Test

421

public void testTokenizerFunction() throws Exception {

422

StreamExecutionEnvironment env = TestEnvironmentUtil.createTestStreamEnv(1, false);

423

424

// Create test data

425

DataStreamSource<String> textStream = env.fromElements(

426

"hello world", "test data", "apache flink"

427

);

428

429

// Apply tokenizer

430

DataStream<Tuple2<String, Integer>> tokens = textStream

431

.flatMap(new Tokenizer())

432

.returns(TypeInformation.of(new TypeHint<Tuple2<String, Integer>>(){}));

433

434

// Use no-op sink to count results

435

ReceiveCheckNoOpSink<Tuple2<String, Integer>> countingSink =

436

new ReceiveCheckNoOpSink<>(6); // Expecting 6 tokens

437

438

tokens.addSink(countingSink);

439

440

// Execute and validate

441

env.execute("Tokenizer Test");

442

assertTrue(countingSink.receivedExpectedCount());

443

}

444

}

445

446

// Example: Environment setup and cleanup

447

public class EnvironmentManagementTest {

448

449

private StreamExecutionEnvironment env;

450

private MiniCluster miniCluster;

451

452

@Before

453

public void setup() throws Exception {

454

// Set up test logging

455

TestEnvironmentUtil.setupTestLogging(Level.INFO, true);

456

457

// Create test environment

458

env = TestEnvironmentUtil.createTestStreamEnv(2, true);

459

460

// Start mini cluster

461

Configuration config = TestEnvironmentUtil.createTestClusterConfig(2, 1);

462

miniCluster = new MiniCluster(config);

463

miniCluster.start();

464

}

465

466

@After

467

public void cleanup() throws Exception {

468

// Clean up test resources

469

TestEnvironmentUtil.cleanupTestEnvironment(env, miniCluster);

470

}

471

472

@Test

473

public void testWithManagedEnvironment() throws Exception {

474

// Test implementation using managed environment

475

DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 4, 5);

476

477

DataStream<Integer> processed = source

478

.map(new NoOpIntMap())

479

.filter(x -> x > 2);

480

481

ReceiveCheckNoOpSink<Integer> sink = new ReceiveCheckNoOpSink<>(3);

482

processed.addSink(sink);

483

484

env.execute("Managed Environment Test");

485

assertTrue(sink.receivedExpectedCount());

486

}

487

}

488

```