or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-collection.mdindex.mdmetrics-testing.mdsecurity-testing.mdtest-data-providers.mdtest-environments.md

test-environments.mddocs/

0

# Test Environment Setup

1

2

Test environment utilities for setting up and managing Flink execution environments in tests, including both streaming and batch processing environments with MiniCluster integration.

3

4

## Capabilities

5

6

### Test Streaming Environment

7

8

`TestStreamEnvironment` provides a streaming execution environment that runs jobs on a MiniCluster for testing purposes.

9

10

```java { .api }

11

/**

12

* StreamExecutionEnvironment that executes jobs on MiniCluster for testing

13

*/

14

public class TestStreamEnvironment extends StreamExecutionEnvironment {

15

/**

16

* Create test streaming environment with full configuration

17

* @param cluster The MiniCluster to execute on

18

* @param configuration Flink configuration

19

* @param parallelism Default parallelism

20

* @param jarFiles JAR files to include in classpath

21

* @param classPaths Additional classpaths

22

*/

23

public TestStreamEnvironment(

24

MiniCluster cluster,

25

Configuration configuration,

26

int parallelism,

27

Collection<Path> jarFiles,

28

Collection<URL> classPaths

29

);

30

31

/**

32

* Create test streaming environment with simplified configuration

33

* @param cluster The MiniCluster to execute on

34

* @param parallelism Default parallelism

35

*/

36

public TestStreamEnvironment(MiniCluster cluster, int parallelism);

37

38

/**

39

* Set as the context environment with full configuration

40

* @param cluster The MiniCluster to execute on

41

* @param parallelism Default parallelism

42

* @param jarFiles JAR files to include in classpath

43

* @param classPaths Additional classpaths

44

*/

45

public static void setAsContext(

46

MiniCluster cluster,

47

int parallelism,

48

Collection<Path> jarFiles,

49

Collection<URL> classPaths

50

);

51

52

/**

53

* Set as the context environment with simplified configuration

54

* @param cluster The MiniCluster to execute on

55

* @param parallelism Default parallelism

56

*/

57

public static void setAsContext(MiniCluster cluster, int parallelism);

58

59

/**

60

* Reset the context environment

61

*/

62

public static void unsetAsContext();

63

}

64

```

65

66

**Usage Example:**

67

68

```java

69

import org.apache.flink.streaming.util.TestStreamEnvironment;

70

import org.apache.flink.runtime.minicluster.MiniCluster;

71

72

@Test

73

public void testStreamingJob() throws Exception {

74

MiniCluster cluster = // ... create cluster

75

76

// Set up test streaming environment

77

TestStreamEnvironment.setAsContext(cluster, 4);

78

79

try {

80

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

81

82

// Your streaming job logic here

83

DataStream<String> input = env.fromElements("test", "data");

84

DataStream<String> result = input.map(String::toUpperCase);

85

86

result.print();

87

env.execute("Test Streaming Job");

88

89

} finally {

90

TestStreamEnvironment.unsetAsContext();

91

}

92

}

93

```

94

95

### Test Batch Environment

96

97

`TestEnvironment` provides a batch execution environment that runs jobs on a MiniCluster for testing purposes.

98

99

```java { .api }

100

/**

101

* ExecutionEnvironment implementation that executes jobs on MiniCluster

102

*/

103

public class TestEnvironment extends ExecutionEnvironment {

104

/**

105

* Create test environment with full configuration

106

* @param cluster The MiniCluster to execute on

107

* @param parallelism Default parallelism

108

* @param objectReuse Whether to enable object reuse

109

* @param jarFiles JAR files to include in classpath

110

* @param classPaths Additional classpaths

111

*/

112

public TestEnvironment(

113

MiniCluster cluster,

114

int parallelism,

115

boolean objectReuse,

116

Collection<Path> jarFiles,

117

Collection<URL> classPaths

118

);

119

120

/**

121

* Create test environment with simplified configuration

122

* @param cluster The MiniCluster to execute on

123

* @param parallelism Default parallelism

124

* @param objectReuse Whether to enable object reuse

125

*/

126

public TestEnvironment(MiniCluster cluster, int parallelism, boolean objectReuse);

127

128

/**

129

* Get the result of the last job execution

130

* @return JobExecutionResult of the last executed job

131

*/

132

public JobExecutionResult getLastJobExecutionResult();

133

134

/**

135

* Set this environment as the context environment

136

*/

137

public void setAsContext();

138

139

/**

140

* Set as the context environment with full configuration

141

* @param cluster The MiniCluster to execute on

142

* @param parallelism Default parallelism

143

* @param jarFiles JAR files to include in classpath

144

* @param classPaths Additional classpaths

145

*/

146

public static void setAsContext(

147

MiniCluster cluster,

148

int parallelism,

149

Collection<Path> jarFiles,

150

Collection<URL> classPaths

151

);

152

153

/**

154

* Set as the context environment with simplified configuration

155

* @param cluster The MiniCluster to execute on

156

* @param parallelism Default parallelism

157

*/

158

public static void setAsContext(MiniCluster cluster, int parallelism);

159

160

/**

161

* Reset the context environment

162

*/

163

public static void unsetAsContext();

164

}

165

```

166

167

### MiniCluster Resource Management

168

169

`MiniClusterWithClientResource` provides JUnit integration for managing MiniCluster lifecycle in tests.

170

171

```java { .api }

172

/**

173

* Starts Flink mini cluster and registers execution environments as JUnit rule

174

*/

175

public class MiniClusterWithClientResource extends MiniClusterResource {

176

/**

177

* Create cluster resource with configuration

178

* @param configuration MiniCluster configuration

179

*/

180

public MiniClusterWithClientResource(MiniClusterResourceConfiguration configuration);

181

182

/**

183

* Get cluster client for job submission

184

* @return ClusterClient for interacting with cluster

185

*/

186

public ClusterClient<?> getClusterClient();

187

188

/**

189

* Get REST cluster client

190

* @return RestClusterClient for REST API access

191

*/

192

public RestClusterClient<?> getRestClusterClient();

193

194

/**

195

* Get test environment configured for this cluster

196

* @return TestEnvironment configured for this cluster

197

*/

198

public TestEnvironment getTestEnvironment();

199

}

200

```

201

202

**Usage Example:**

203

204

```java

205

import org.apache.flink.test.util.MiniClusterWithClientResource;

206

import org.apache.flink.test.util.MiniClusterResourceConfiguration;

207

208

public class MyFlinkTest {

209

210

@ClassRule

211

public static MiniClusterWithClientResource flinkCluster =

212

new MiniClusterWithClientResource(

213

new MiniClusterResourceConfiguration.Builder()

214

.setNumberSlotsPerTaskManager(2)

215

.setNumberTaskManagers(1)

216

.build());

217

218

@Test

219

public void testBatchJob() throws Exception {

220

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

221

222

// Your batch job logic here

223

DataSet<String> input = env.fromElements("hello", "world");

224

DataSet<String> result = input.map(String::toUpperCase);

225

226

List<String> output = result.collect();

227

228

// Validate results

229

assertEquals(Arrays.asList("HELLO", "WORLD"), output);

230

}

231

}

232

```

233

234

### Pipeline Executor Service Loader

235

236

`MiniClusterPipelineExecutorServiceLoader` provides pipeline execution service integration for MiniCluster.

237

238

```java { .api }

239

/**

240

* Pipeline executor service loader for MiniCluster execution

241

*/

242

public class MiniClusterPipelineExecutorServiceLoader implements PipelineExecutorServiceLoader {

243

public static final String NAME = "minicluster";

244

245

/**

246

* Create executor service loader for given MiniCluster

247

* @param miniCluster The MiniCluster to execute on

248

*/

249

public MiniClusterPipelineExecutorServiceLoader(MiniCluster miniCluster);

250

251

/**

252

* Update configuration for MiniCluster execution

253

* @param configuration Configuration to update

254

* @param jarFiles JAR files to include

255

* @param classpaths Additional classpaths

256

* @return Updated configuration

257

*/

258

public static Configuration updateConfigurationForMiniCluster(

259

Configuration configuration,

260

Collection<Path> jarFiles,

261

Collection<URL> classpaths

262

);

263

264

/**

265

* Get executor factory for configuration

266

* @param configuration Flink configuration

267

* @return PipelineExecutorFactory for the configuration

268

*/

269

public PipelineExecutorFactory getExecutorFactory(Configuration configuration);

270

271

/**

272

* Get supported executor names

273

* @return Stream of executor names

274

*/

275

public Stream<String> getExecutorNames();

276

}

277

```

278

279

### Test Base Utils

280

281

`TestBaseUtils` provides a comprehensive collection of utility methods for testing Flink applications, including result comparison, file operations, and configuration management.

282

283

```java { .api }

284

/**

285

* Utility class with various methods for testing purposes

286

*/

287

public class TestBaseUtils {

288

// Configuration constants

289

protected static final int MINIMUM_HEAP_SIZE_MB = 192;

290

protected static final String TASK_MANAGER_MEMORY_SIZE = "80m";

291

protected static final long DEFAULT_AKKA_ASK_TIMEOUT = 1000;

292

protected static final String DEFAULT_AKKA_STARTUP_TIMEOUT = "60 s";

293

public static final FiniteDuration DEFAULT_TIMEOUT;

294

public static final Time DEFAULT_HTTP_TIMEOUT = Time.seconds(10L);

295

296

// Result reading methods

297

/**

298

* Get result readers for result files

299

* @param resultPath Path to result directory

300

* @return Array of BufferedReaders for result files

301

*/

302

public static BufferedReader[] getResultReader(String resultPath) throws IOException;

303

304

/**

305

* Get result readers with exclusion prefixes and ordering

306

* @param resultPath Path to result directory

307

* @param excludePrefixes Prefixes to exclude from results

308

* @param inOrderOfFiles Whether to maintain file order

309

* @return Array of BufferedReaders for result files

310

*/

311

public static BufferedReader[] getResultReader(String resultPath, String[] excludePrefixes, boolean inOrderOfFiles) throws IOException;

312

313

/**

314

* Get result input streams

315

* @param resultPath Path to result directory

316

* @return Array of BufferedInputStreams for result files

317

*/

318

public static BufferedInputStream[] getResultInputStream(String resultPath) throws IOException;

319

320

/**

321

* Get result input streams with exclusion prefixes

322

* @param resultPath Path to result directory

323

* @param excludePrefixes Prefixes to exclude from results

324

* @return Array of BufferedInputStreams for result files

325

*/

326

public static BufferedInputStream[] getResultInputStream(String resultPath, String[] excludePrefixes) throws IOException;

327

328

/**

329

* Read all result lines into target list

330

* @param target List to store result lines

331

* @param resultPath Path to result directory

332

*/

333

public static void readAllResultLines(List<String> target, String resultPath) throws IOException;

334

335

/**

336

* Read all result lines with exclusions and ordering

337

* @param target List to store result lines

338

* @param resultPath Path to result directory

339

* @param excludePrefixes Prefixes to exclude from results

340

* @param inOrderOfFiles Whether to maintain file order

341

*/

342

public static void readAllResultLines(List<String> target, String resultPath, String[] excludePrefixes, boolean inOrderOfFiles) throws IOException;

343

344

// Result comparison methods

345

/**

346

* Compare results by lines in memory

347

* @param expectedResultStr Expected result as string

348

* @param resultPath Path to actual results

349

*/

350

public static void compareResultsByLinesInMemory(String expectedResultStr, String resultPath) throws Exception;

351

352

/**

353

* Compare results by lines in memory with exclusions

354

* @param expectedResultStr Expected result as string

355

* @param resultPath Path to actual results

356

* @param excludePrefixes Prefixes to exclude from comparison

357

*/

358

public static void compareResultsByLinesInMemory(String expectedResultStr, String resultPath, String[] excludePrefixes) throws Exception;

359

360

/**

361

* Compare results by lines with strict ordering

362

* @param expectedResultStr Expected result as string

363

* @param resultPath Path to actual results

364

*/

365

public static void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr, String resultPath) throws Exception;

366

367

/**

368

* Compare results by lines with strict ordering and exclusions

369

* @param expectedResultStr Expected result as string

370

* @param resultPath Path to actual results

371

* @param excludePrefixes Prefixes to exclude from comparison

372

*/

373

public static void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr, String resultPath, String[] excludePrefixes) throws Exception;

374

375

/**

376

* Check lines against regular expression pattern

377

* @param resultPath Path to result files

378

* @param regexp Regular expression pattern to match

379

*/

380

public static void checkLinesAgainstRegexp(String resultPath, String regexp);

381

382

/**

383

* Compare key-value pairs with delta tolerance

384

* @param expectedLines Expected key-value pairs

385

* @param resultPath Path to actual results

386

* @param delimiter Key-value delimiter

387

* @param maxDelta Maximum allowed delta for numeric comparisons

388

*/

389

public static void compareKeyValuePairsWithDelta(String expectedLines, String resultPath, String delimiter, double maxDelta) throws Exception;

390

391

/**

392

* Compare key-value pairs with delta tolerance and exclusions

393

* @param expectedLines Expected key-value pairs

394

* @param resultPath Path to actual results

395

* @param excludePrefixes Prefixes to exclude from comparison

396

* @param delimiter Key-value delimiter

397

* @param maxDelta Maximum allowed delta for numeric comparisons

398

*/

399

public static void compareKeyValuePairsWithDelta(String expectedLines, String resultPath, String[] excludePrefixes, String delimiter, double maxDelta) throws Exception;

400

401

/**

402

* Compare result collections with custom comparator

403

* @param expected Expected results list

404

* @param actual Actual results list

405

* @param comparator Comparator for element comparison

406

*/

407

public static <X> void compareResultCollections(List<X> expected, List<X> actual, Comparator<X> comparator);

408

409

// Collection comparison methods

410

/**

411

* Compare result as tuples

412

* @param result Actual result list

413

* @param expected Expected result as string

414

*/

415

public static <T> void compareResultAsTuples(List<T> result, String expected);

416

417

/**

418

* Compare result as text

419

* @param result Actual result list

420

* @param expected Expected result as string

421

*/

422

public static <T> void compareResultAsText(List<T> result, String expected);

423

424

/**

425

* Compare ordered result as text

426

* @param result Actual result list

427

* @param expected Expected result as string

428

*/

429

public static <T> void compareOrderedResultAsText(List<T> result, String expected);

430

431

/**

432

* Compare ordered result as text with tuple option

433

* @param result Actual result list

434

* @param expected Expected result as string

435

* @param asTuples Whether to treat as tuples

436

*/

437

public static <T> void compareOrderedResultAsText(List<T> result, String expected, boolean asTuples);

438

439

/**

440

* Check if result contains expected content

441

* @param result Actual result list

442

* @param expected Expected content as string

443

*/

444

public static <T> void containsResultAsText(List<T> result, String expected);

445

446

// Utility methods

447

/**

448

* Set environment variables

449

* @param newenv Map of environment variables to set

450

*/

451

public static void setEnv(Map<String, String> newenv);

452

453

/**

454

* Construct test path for class

455

* @param forClass Class to construct path for

456

* @param folder Folder name

457

* @return Constructed test path

458

*/

459

public static String constructTestPath(Class<?> forClass, String folder);

460

461

/**

462

* Construct test URI for class

463

* @param forClass Class to construct URI for

464

* @param folder Folder name

465

* @return Constructed test URI

466

*/

467

public static String constructTestURI(Class<?> forClass, String folder);

468

469

/**

470

* Get content from HTTP URL

471

* @param url URL to fetch from

472

* @return HTTP response content

473

*/

474

public static String getFromHTTP(String url) throws Exception;

475

476

/**

477

* Get content from HTTP URL with timeout

478

* @param url URL to fetch from

479

* @param timeout Request timeout

480

* @return HTTP response content

481

*/

482

public static String getFromHTTP(String url, Time timeout) throws Exception;

483

484

// Configuration methods

485

/**

486

* Convert configurations to parameter list

487

* @param testConfigs Configuration objects

488

* @return Parameter list for parameterized tests

489

*/

490

protected static Collection<Object[]> toParameterList(Configuration... testConfigs);

491

492

/**

493

* Convert configuration list to parameter list

494

* @param testConfigs List of configuration objects

495

* @return Parameter list for parameterized tests

496

*/

497

protected static Collection<Object[]> toParameterList(List<Configuration> testConfigs);

498

499

/**

500

* Tuple comparator for comparing Tuple objects

501

*/

502

public static class TupleComparator<T extends Tuple> implements Comparator<T> {

503

public int compare(T o1, T o2);

504

}

505

}

506

```

507

508

**Usage Example:**

509

510

```java

511

import org.apache.flink.test.util.TestBaseUtils;

512

import org.apache.flink.api.java.tuple.Tuple2;

513

514

@Test

515

public void testResultComparison() throws Exception {

516

// Test result comparison

517

String expected = "hello\nworld\nflink";

518

String resultPath = "/path/to/results";

519

520

TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath);

521

522

// Test with exclusions

523

String[] excludePrefixes = {"debug:", "info:"};

524

TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath, excludePrefixes);

525

526

// Test ordered comparison

527

TestBaseUtils.compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath);

528

529

// Test collection comparison

530

List<String> actualResults = Arrays.asList("HELLO", "WORLD", "FLINK");

531

String expectedResults = "HELLO\nWORLD\nFLINK";

532

TestBaseUtils.compareResultAsText(actualResults, expectedResults);

533

534

// Test tuple comparison

535

List<Tuple2<String, Integer>> tupleResults = Arrays.asList(

536

new Tuple2<>("hello", 1),

537

new Tuple2<>("world", 2)

538

);

539

String expectedTuples = "(hello,1)\n(world,2)";

540

TestBaseUtils.compareResultAsTuples(tupleResults, expectedTuples);

541

}

542

543

@Test

544

public void testKeyValueComparison() throws Exception {

545

// Test with numeric delta comparison

546

String expected = "a,1.0\nb,2.5\nc,3.7";

547

String resultPath = "/path/to/numeric/results";

548

double maxDelta = 0.1;

549

550

TestBaseUtils.compareKeyValuePairsWithDelta(expected, resultPath, ",", maxDelta);

551

}

552

553

@Test

554

public void testHTTPUtilities() throws Exception {

555

// Test HTTP fetching

556

String content = TestBaseUtils.getFromHTTP("http://example.com/test");

557

assertNotNull(content);

558

559

// Test with timeout

560

Time timeout = Time.seconds(30);

561

String contentWithTimeout = TestBaseUtils.getFromHTTP("http://example.com/test", timeout);

562

assertNotNull(contentWithTimeout);

563

}

564

565

@Test

566

public void testPathConstruction() {

567

String testPath = TestBaseUtils.constructTestPath(MyTest.class, "testdata");

568

String testURI = TestBaseUtils.constructTestURI(MyTest.class, "testdata");

569

570

assertNotNull(testPath);

571

assertNotNull(testURI);

572

}

573

```

574

575

## Test Base Classes

576

577

### Abstract Test Base

578

579

Base class for unit tests that reuse the same Flink cluster across multiple test methods.

580

581

```java { .api }

582

/**

583

* Base class for unit tests that reuse the same Flink cluster

584

*/

585

public abstract class AbstractTestBase extends TestBaseUtils {

586

/**

587

* Static class rule for managing cluster lifecycle

588

*/

589

public static MiniClusterWithClientResource miniClusterResource;

590

591

/**

592

* Static temporary folder rule

593

*/

594

public static TemporaryFolder TEMPORARY_FOLDER;

595

596

/**

597

* Get temporary directory path

598

* @param dirName Directory name

599

* @return Path to temporary directory

600

*/

601

public String getTempDirPath(String dirName);

602

603

/**

604

* Get temporary file path

605

* @param fileName File name

606

* @return Path to temporary file

607

*/

608

public String getTempFilePath(String fileName);

609

610

/**

611

* Create temporary file with contents

612

* @param fileName File name

613

* @param contents File contents

614

* @return Path to created file

615

*/

616

public String createTempFile(String fileName, String contents);

617

618

/**

619

* Create and register temporary file for cleanup

620

* @param fileName File name

621

* @return Created File object

622

*/

623

public File createAndRegisterTempFile(String fileName);

624

}

625

```

626

627

### Java Program Test Base

628

629

Base class for tests that run a single test program with object reuse enabled/disabled.

630

631

```java { .api }

632

/**

633

* Base for tests that run single test with object reuse enabled/disabled

634

*/

635

public abstract class JavaProgramTestBase extends AbstractTestBase {

636

/**

637

* Set number of test repetitions

638

* @param numberOfTestRepetitions Number of times to repeat test

639

*/

640

public void setNumberOfTestRepetitions(int numberOfTestRepetitions);

641

642

/**

643

* Get parallelism level

644

* @return Current parallelism setting

645

*/

646

public int getParallelism();

647

648

/**

649

* Get latest execution result

650

* @return JobExecutionResult of latest execution

651

*/

652

public JobExecutionResult getLatestExecutionResult();

653

654

/**

655

* Check if using collection execution

656

* @return true if collection execution mode

657

*/

658

public boolean isCollectionExecution();

659

660

/**

661

* Test program implementation - must be implemented by subclasses

662

*/

663

protected abstract void testProgram() throws Exception;

664

665

/**

666

* Pre-submission work - override if needed

667

*/

668

protected abstract void preSubmit() throws Exception;

669

670

/**

671

* Post-submission work - override if needed

672

*/

673

protected abstract void postSubmit() throws Exception;

674

675

/**

676

* Whether to skip collection execution - override if needed

677

* @return true to skip collection execution

678

*/

679

protected abstract boolean skipCollectionExecution();

680

}

681

```

682

683

### Multiple Programs Test Base

684

685

Base class for parameterized tests that run in different execution modes (cluster, collection, etc.).

686

687

```java { .api }

688

/**

689

* Base for parameterized tests that run in different execution modes

690

*/

691

public class MultipleProgramsTestBase extends AbstractTestBase {

692

/**

693

* Test execution modes

694

*/

695

public enum TestExecutionMode {

696

CLUSTER,

697

CLUSTER_OBJECT_REUSE,

698

COLLECTION

699

}

700

701

/**

702

* Create test base with execution mode

703

* @param mode Test execution mode

704

*/

705

public MultipleProgramsTestBase(TestExecutionMode mode);

706

707

/**

708

* Provides parameterized execution modes for JUnit parameterized tests

709

* @return Collection of execution mode parameters

710

*/

711

public static Collection<Object[]> executionModes();

712

}

713

```

714

715

**Usage Example:**

716

717

```java

718

import org.apache.flink.test.util.MultipleProgramsTestBase;

719

720

@RunWith(Parameterized.class)

721

public class MyParameterizedTest extends MultipleProgramsTestBase {

722

723

public MyParameterizedTest(TestExecutionMode mode) {

724

super(mode);

725

}

726

727

@Parameterized.Parameters

728

public static Collection<Object[]> executionModes() {

729

return MultipleProgramsTestBase.executionModes();

730

}

731

732

@Test

733

public void testInAllModes() throws Exception {

734

// Test will run in all execution modes

735

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

736

// ... test logic

737

}

738

}

739

```

740

741

## Utility Classes

742

743

### Test Process Builder

744

745

`TestProcessBuilder` provides utilities for creating and managing external test processes with JVM configuration.

746

747

```java { .api }

748

/**

749

* Utility wrapping ProcessBuilder with common testing options

750

*/

751

public class TestProcessBuilder {

752

/**

753

* Create process builder for main class

754

* @param mainClass Main class name to execute

755

*/

756

public TestProcessBuilder(String mainClass) throws IOException;

757

758

/**

759

* Start the configured process

760

* @return TestProcess wrapper for managing the process

761

*/

762

public TestProcess start() throws IOException;

763

764

/**

765

* Set JVM memory configuration

766

* @param jvmMemory Memory size for JVM

767

* @return This builder for method chaining

768

*/

769

public TestProcessBuilder setJvmMemory(MemorySize jvmMemory);

770

771

/**

772

* Add JVM argument

773

* @param arg JVM argument to add

774

* @return This builder for method chaining

775

*/

776

public TestProcessBuilder addJvmArg(String arg);

777

778

/**

779

* Add main class argument

780

* @param arg Argument for main class

781

* @return This builder for method chaining

782

*/

783

public TestProcessBuilder addMainClassArg(String arg);

784

785

/**

786

* Add Flink configuration as main class arguments

787

* @param config Flink configuration to add

788

* @return This builder for method chaining

789

*/

790

public TestProcessBuilder addConfigAsMainClassArgs(Configuration config);

791

792

/**

793

* Use clean environment for process

794

* @return This builder for method chaining

795

*/

796

public TestProcessBuilder withCleanEnvironment();

797

798

/**

799

* Wrapper for managing test process execution

800

*/

801

public static class TestProcess {

802

/**

803

* Get underlying Process object

804

* @return Process instance

805

*/

806

public Process getProcess();

807

808

/**

809

* Get process output writer

810

* @return StringWriter with process output

811

*/

812

public StringWriter getProcessOutput();

813

814

/**

815

* Get process error output writer

816

* @return StringWriter with error output

817

*/

818

public StringWriter getErrorOutput();

819

820

/**

821

* Destroy the process

822

*/

823

public void destroy();

824

}

825

}

826

```

827

828

### Shell Script Builder

829

830

`ShellScript` provides utilities for creating cross-platform shell scripts for testing.

831

832

```java { .api }

833

/**

834

* Utility for creating shell scripts on Linux and Windows

835

*/

836

public class ShellScript {

837

/**

838

* Create shell script builder

839

* @return Platform-appropriate shell script builder

840

*/

841

public static ShellScriptBuilder createShellScriptBuilder();

842

843

/**

844

* Get script file extension for current platform

845

* @return Script extension (.sh or .bat)

846

*/

847

public static String getScriptExtension();

848

849

/**

850

* Abstract builder for creating shell scripts

851

*/

852

public abstract static class ShellScriptBuilder {

853

/**

854

* Write script to file

855

* @param file Target file for script

856

*/

857

public void write(File file) throws IOException;

858

859

/**

860

* Add command to script

861

* @param command Command components to execute

862

*/

863

public abstract void command(List<String> command);

864

865

/**

866

* Set environment variable in script

867

* @param key Environment variable name

868

* @param value Environment variable value

869

*/

870

public abstract void env(String key, String value);

871

}

872

}

873

```

874

875

### Collection Test Environment

876

877

`CollectionTestEnvironment` provides collection-based execution environment for testing without cluster setup.

878

879

```java { .api }

880

/**

881

* Collection execution environment for testing

882

*/

883

public class CollectionTestEnvironment extends ExecutionEnvironment {

884

/**

885

* Get result of last job execution

886

* @return JobExecutionResult of last execution

887

*/

888

public JobExecutionResult getLastJobExecutionResult();

889

890

/**

891

* Execute job with given name

892

* @param jobName Name for the job execution

893

* @return JobExecutionResult after execution

894

*/

895

public JobExecutionResult execute(String jobName) throws Exception;

896

897

/**

898

* Set this environment as context environment

899

*/

900

protected void setAsContext();

901

902

/**

903

* Unset this environment from context

904

*/

905

protected static void unsetAsContext();

906

}

907

```

908

909

### Testing Security Context

910

911

`TestingSecurityContext` provides security context management for Kerberos-enabled testing scenarios.

912

913

```java { .api }

914

/**

915

* Security context for handling client and server principals in MiniKDC testing

916

*/

917

public class TestingSecurityContext {

918

/**

919

* Install security context with configurations

920

* @param config Security configuration

921

* @param clientSecurityConfigurationMap Client security configurations

922

*/

923

public static void install(

924

SecurityConfiguration config,

925

Map<String, ClientSecurityConfiguration> clientSecurityConfigurationMap

926

) throws Exception;

927

928

/**

929

* Client security configuration for testing

930

*/

931

static class ClientSecurityConfiguration {

932

/**

933

* Create client security configuration

934

* @param principal Kerberos principal

935

* @param keytab Path to keytab file

936

*/

937

public ClientSecurityConfiguration(String principal, String keytab);

938

939

/**

940

* Get Kerberos principal

941

* @return Principal string

942

*/

943

public String getPrincipal();

944

945

/**

946

* Get keytab file path

947

* @return Keytab file path

948

*/

949

public String getKeytab();

950

}

951

}

952

```