or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdmulti-node-spec.mdsystem-properties.mdtest-conductor.mdtest-configuration.md

multi-node-spec.mddocs/

0

# Multi-Node Test Specification

1

2

The MultiNodeSpec class is the main framework for creating and executing coordinated multi-node tests. It extends TestKit and provides barrier synchronization, node-specific execution, and access to the TestConductor extension.

3

4

## MultiNodeSpec Class

5

6

```scala { .api }

7

abstract class MultiNodeSpec(config: MultiNodeConfig)

8

extends TestKit(system) with MultiNodeSpecCallbacks

9

```

10

11

### Constructors

12

13

```scala { .api }

14

// Primary constructor (most commonly used)

15

def this(config: MultiNodeConfig)

16

17

// Constructor with custom ActorSystem creator

18

def this(config: MultiNodeConfig, actorSystemCreator: Config => ActorSystem)

19

```

20

21

**Usage Example:**

22

23

```scala

24

object MyTestConfig extends MultiNodeConfig {

25

val first = role("first")

26

val second = role("second")

27

}

28

29

class MyMultiNodeTest extends MultiNodeSpec(MyTestConfig)

30

with AnyWordSpecLike with Matchers {

31

32

def initialParticipants = roles.size

33

34

// Test implementation

35

}

36

```

37

38

## Required Implementation

39

40

### Initial Participants

41

42

```scala { .api }

43

def initialParticipants: Int

44

```

45

46

**Must be implemented** by subclasses to define the number of participants required before starting the test. This is typically set to the number of roles.

47

48

**Usage Example:**

49

50

```scala

51

class ClusterTest extends MultiNodeSpec(ClusterConfig) {

52

import ClusterConfig._

53

54

def initialParticipants = roles.size // Most common pattern

55

56

// Alternative: specific number

57

// def initialParticipants = 3

58

}

59

```

60

61

**Requirements:**

62

- Must be a `def` (not `val` or `lazy val`)

63

- Must return a value greater than 0

64

- Must not exceed the total number of available nodes

65

66

## Node Execution Control

67

68

### Run On Specific Nodes

69

70

```scala { .api }

71

def runOn(nodes: RoleName*)(thunk: => Unit): Unit

72

```

73

74

Executes the given code block only on the specified nodes. Other nodes will skip the block entirely.

75

76

**Usage Example:**

77

78

```scala

79

runOn(first) {

80

// This code only runs on the 'first' node

81

val cluster = Cluster(system)

82

cluster.join(cluster.selfAddress)

83

84

system.actorOf(Props[ClusterListener](), "listener")

85

}

86

87

runOn(second, third) {

88

// This code runs on both 'second' and 'third' nodes

89

val cluster = Cluster(system)

90

cluster.join(node(first).address)

91

}

92

```

93

94

**Parameters:**

95

- `nodes: RoleName*` - Variable number of role names to execute on

96

- `thunk: => Unit` - Code block to execute (call-by-name)

97

98

### Node Identity Check

99

100

```scala { .api }

101

def isNode(nodes: RoleName*): Boolean

102

```

103

104

Checks if the current node matches any of the specified roles.

105

106

**Usage Example:**

107

108

```scala

109

if (isNode(first)) {

110

log.info("I am the first node")

111

} else if (isNode(second, third)) {

112

log.info("I am either second or third node")

113

}

114

115

// Using in conditionals

116

val seedNode = if (isNode(first)) cluster.selfAddress else node(first).address

117

```

118

119

**Parameters:**

120

- `nodes: RoleName*` - Role names to check against

121

122

**Returns:** `Boolean` - True if current node matches any specified role

123

124

## Barrier Synchronization

125

126

### Basic Barrier Entry

127

128

```scala { .api }

129

def enterBarrier(name: String*): Unit

130

```

131

132

Enters the named barriers in the specified order using the default barrier timeout.

133

134

**Usage Example:**

135

136

```scala

137

"cluster formation" must {

138

"start all nodes" in {

139

// All nodes wait here until everyone reaches this point

140

enterBarrier("startup")

141

}

142

143

"form cluster" in {

144

runOn(first) {

145

Cluster(system).join(Cluster(system).selfAddress)

146

}

147

enterBarrier("cluster-started")

148

149

runOn(second, third) {

150

Cluster(system).join(node(first).address)

151

}

152

enterBarrier("all-joined")

153

}

154

}

155

```

156

157

**Parameters:**

158

- `name: String*` - Variable number of barrier names to enter in sequence

159

160

### Barrier Entry with Timeout

161

162

```scala { .api }

163

def enterBarrier(max: FiniteDuration, name: String*): Unit

164

```

165

166

Enters barriers with a custom timeout, overriding the default barrier timeout.

167

168

**Usage Example:**

169

170

```scala

171

// Use longer timeout for slow operations

172

enterBarrier(60.seconds, "slow-initialization")

173

174

// Multiple barriers with custom timeout

175

enterBarrier(30.seconds, "phase1", "phase2", "phase3")

176

```

177

178

**Parameters:**

179

- `max: FiniteDuration` - Maximum time to wait for all nodes to reach barriers

180

- `name: String*` - Barrier names to enter in sequence

181

182

**Note:** The timeout is automatically scaled using `Duration.dilated` based on the `akka.test.timefactor` configuration.

183

184

## Node Communication

185

186

### Node Address Resolution

187

188

```scala { .api }

189

def node(role: RoleName): ActorPath

190

```

191

192

Returns the root ActorPath for the specified role, enabling actor selection and messaging between nodes.

193

194

**Usage Example:**

195

196

```scala

197

// Get reference to actor on another node

198

val remoteActor = system.actorSelection(node(first) / "user" / "serviceActor")

199

200

// Send message to remote actor

201

remoteActor ! "Hello from another node"

202

203

// Create actor path for deployment

204

val remotePath = RootActorPath(node(second).address) / "user" / "worker"

205

```

206

207

**Parameters:**

208

- `role: RoleName` - Role name to get address for

209

210

**Returns:** `ActorPath` - Root actor path for the specified node

211

212

## Utility Methods

213

214

### Dead Letter Suppression

215

216

```scala { .api }

217

def muteDeadLetters(messageClasses: Class[_]*)(sys: ActorSystem = system): Unit

218

```

219

220

Suppresses dead letter logging for specified message types, useful for tests that intentionally create dead letters.

221

222

**Usage Example:**

223

224

```scala

225

// Mute all dead letters

226

muteDeadLetters()

227

228

// Mute specific message types

229

muteDeadLetters(classOf[String], classOf[MyMessage])

230

231

// Mute on specific system

232

muteDeadLetters(classOf[ClusterEvent.MemberUp])(otherSystem)

233

```

234

235

**Parameters:**

236

- `messageClasses: Class[_]*` - Message classes to mute (empty = mute all)

237

- `sys: ActorSystem` - Actor system to apply muting (defaults to current system)

238

239

### System Restart

240

241

```scala { .api }

242

protected def startNewSystem(): ActorSystem

243

```

244

245

Creates a new ActorSystem with the same configuration and re-registers with the TestConductor. Used for testing system restarts and recovery scenarios.

246

247

**Usage Example:**

248

249

```scala

250

"system recovery" must {

251

"restart and rejoin cluster" in {

252

runOn(second) {

253

// Shutdown current system

254

system.terminate()

255

Await.ready(system.whenTerminated, 10.seconds)

256

257

// Start new system with same configuration

258

val newSystem = startNewSystem()

259

// New system is automatically registered with TestConductor

260

261

enterBarrier("system-restarted")

262

}

263

}

264

}

265

```

266

267

**Returns:** `ActorSystem` - New actor system with injected deployments and TestConductor

268

269

**Note:** Must be called before entering barriers or using TestConductor after system termination.

270

271

## TestKit Inherited Methods

272

273

MultiNodeSpec extends TestKit, providing access to essential testing utilities:

274

275

### Message Expectations

276

277

```scala { .api }

278

def expectMsg[T](obj: T): T

279

def expectMsg[T](d: FiniteDuration, obj: T): T

280

def expectMsgType[T](implicit t: ClassTag[T]): T

281

def expectMsgType[T](d: FiniteDuration)(implicit t: ClassTag[T]): T

282

def expectNoMsg(): Unit

283

def expectNoMsg(d: FiniteDuration): Unit

284

```

285

286

**Usage Example:**

287

288

```scala

289

runOn(first) {

290

val probe = TestProbe()

291

val actor = system.actorOf(Props[MyActor]())

292

293

actor.tell("ping", probe.ref)

294

probe.expectMsg(5.seconds, "pong")

295

probe.expectNoMsg(1.second)

296

}

297

```

298

299

### Conditional Waiting

300

301

```scala { .api }

302

def awaitCond(p: => Boolean): Unit

303

def awaitCond(p: => Boolean, max: Duration): Unit

304

def awaitCond(p: => Boolean, max: Duration, interval: Duration): Unit

305

```

306

307

Repeatedly evaluates condition until it becomes true or timeout is reached.

308

309

**Usage Example:**

310

311

```scala

312

runOn(first, second) {

313

awaitCond(Cluster(system).state.members.size == 2, 10.seconds)

314

}

315

```

316

317

### Time-Bounded Execution

318

319

```scala { .api }

320

def within[T](min: FiniteDuration, max: FiniteDuration)(f: => T): T

321

def within[T](max: FiniteDuration)(f: => T): T

322

```

323

324

Executes block and verifies it completes within specified time bounds.

325

326

**Usage Example:**

327

328

```scala

329

within(5.seconds, 15.seconds) {

330

// This block must complete between 5 and 15 seconds

331

val cluster = Cluster(system)

332

cluster.join(node(first).address)

333

awaitCond(cluster.readView.members.size == roles.size)

334

}

335

```

336

337

## MultiNodeSpecCallbacks Interface

338

339

MultiNodeSpec implements the MultiNodeSpecCallbacks trait for test framework integration:

340

341

```scala { .api }

342

trait MultiNodeSpecCallbacks {

343

def multiNodeSpecBeforeAll(): Unit

344

def multiNodeSpecAfterAll(): Unit

345

}

346

```

347

348

### Before All Hook

349

350

```scala { .api }

351

def multiNodeSpecBeforeAll(): Unit

352

```

353

354

Called once before all test cases start. Override for custom initialization.

355

356

**Usage Example:**

357

358

```scala

359

class MyTest extends MultiNodeSpec(MyConfig) {

360

override def multiNodeSpecBeforeAll(): Unit = {

361

multiNodeSpecBeforeAll() // Call parent implementation

362

// Custom setup logic

363

log.info("Starting multi-node test suite")

364

}

365

}

366

```

367

368

### After All Hook

369

370

```scala { .api }

371

def multiNodeSpecAfterAll(): Unit

372

```

373

374

Called once after all test cases complete. Override for custom cleanup.

375

376

**Usage Example:**

377

378

```scala

379

class MyTest extends MultiNodeSpec(MyConfig) {

380

override def multiNodeSpecAfterAll(): Unit = {

381

// Custom cleanup logic

382

log.info("Multi-node test suite completed")

383

multiNodeSpecAfterAll() // Call parent implementation

384

}

385

}

386

```

387

388

### ScalaTest Integration

389

390

For ScalaTest integration, use the STMultiNodeSpec trait:

391

392

```scala { .api }

393

trait STMultiNodeSpec extends MultiNodeSpecCallbacks

394

with AnyWordSpecLike with Matchers with BeforeAndAfterAll {

395

396

override def beforeAll(): Unit = multiNodeSpecBeforeAll()

397

override def afterAll(): Unit = multiNodeSpecAfterAll()

398

}

399

```

400

401

**Usage Example:**

402

403

```scala

404

class MyTest extends MultiNodeSpec(MyConfig) with STMultiNodeSpec {

405

// Test implementation automatically gets proper setup/teardown

406

407

"My distributed system" must {

408

"handle basic operations" in {

409

// Test code here

410

}

411

}

412

}

413

```

414

415

## Properties and State

416

417

### Role Information

418

419

```scala { .api }

420

val myself: RoleName // Current node's role

421

def roles: immutable.Seq[RoleName] // All registered roles

422

val log: LoggingAdapter // Logging adapter

423

```

424

425

### TestConductor Access

426

427

```scala { .api }

428

var testConductor: TestConductorExt // Access to barriers and failure injection

429

```

430

431

The TestConductor is automatically initialized and provides access to advanced coordination features like network failure injection.

432

433

**Usage Example:**

434

435

```scala

436

"network partition test" must {

437

"handle split brain" in {

438

// Create network partition

439

testConductor.blackhole(first, second, Direction.Both).await

440

441

enterBarrier("partition-created")

442

443

// Test behavior during partition

444

445

// Restore network

446

testConductor.passThrough(first, second, Direction.Both).await

447

448

enterBarrier("partition-healed")

449

}

450

}

451

```

452

453

### Configuration Access

454

455

```scala { .api }

456

def shutdownTimeout: FiniteDuration // Timeout for system shutdown (default 15.seconds)

457

def verifySystemShutdown: Boolean // Whether to verify clean shutdown (default false)

458

```

459

460

## Lifecycle Hooks

461

462

### Startup Hook

463

464

```scala { .api }

465

protected def atStartup(): Unit

466

```

467

468

Override to perform initialization when the entire test is starting up.

469

470

**Usage Example:**

471

472

```scala

473

class MyTest extends MultiNodeSpec(MyConfig) {

474

override protected def atStartup(): Unit = {

475

log.info("Multi-node test starting")

476

// Custom initialization logic

477

}

478

}

479

```

480

481

### Termination Hook

482

483

```scala { .api }

484

protected def afterTermination(): Unit

485

```

486

487

Override to perform cleanup when the entire test is terminating.

488

489

**Usage Example:**

490

491

```scala

492

class MyTest extends MultiNodeSpec(MyConfig) {

493

override protected def afterTermination(): Unit = {

494

log.info("Multi-node test completed")

495

// Custom cleanup logic

496

}

497

}

498

```

499

500

## Awaitable Helper

501

502

MultiNodeSpec provides an implicit conversion for enhanced Future handling:

503

504

```scala { .api }

505

implicit def awaitHelper[T](w: Awaitable[T]): AwaitHelper[T]

506

507

class AwaitHelper[T](w: Awaitable[T]) {

508

def await: T // Uses remaining duration from enclosing within block or QueryTimeout

509

}

510

```

511

512

**Usage Example:**

513

514

```scala

515

// Instead of using Await.result explicitly

516

val result = someFuture.await

517

518

// Automatically uses remaining time from within block

519

within(30.seconds) {

520

val result1 = future1.await // Has up to 30 seconds

521

val result2 = future2.await // Has remaining time after future1

522

}

523

```

524

525

## Complete Test Example

526

527

```scala

528

object ClusterTestConfig extends MultiNodeConfig {

529

val first = role("first")

530

val second = role("second")

531

val third = role("third")

532

533

commonConfig(ConfigFactory.parseString("""

534

akka {

535

actor.provider = cluster

536

remote.artery.canonical.port = 0

537

cluster {

538

seed-nodes = []

539

auto-down-unreachable-after = 2s

540

}

541

}

542

"""))

543

}

544

545

class ClusterTest extends MultiNodeSpec(ClusterTestConfig)

546

with AnyWordSpecLike with Matchers {

547

548

import ClusterTestConfig._

549

550

def initialParticipants = roles.size

551

552

"A cluster" must {

553

"start up" in {

554

enterBarrier("startup")

555

}

556

557

"form cluster" in {

558

runOn(first) {

559

Cluster(system).join(Cluster(system).selfAddress)

560

}

561

enterBarrier("first-joined")

562

563

runOn(second, third) {

564

Cluster(system).join(node(first).address)

565

}

566

567

within(15.seconds) {

568

awaitCond(Cluster(system).state.members.size == 3)

569

}

570

571

enterBarrier("cluster-formed")

572

}

573

574

"handle node removal" in {

575

runOn(first) {

576

Cluster(system).leave(node(third).address)

577

}

578

579

runOn(first, second) {

580

within(10.seconds) {

581

awaitCond(Cluster(system).state.members.size == 2)

582

}

583

}

584

585

runOn(third) {

586

within(10.seconds) {

587

awaitCond(Cluster(system).isTerminated)

588

}

589

}

590

591

enterBarrier("node-removed")

592

}

593

}

594

}

595

```