0
# Multi-Node Test Specification
1
2
The MultiNodeSpec class is the main framework for creating and executing coordinated multi-node tests. It extends TestKit and provides barrier synchronization, node-specific execution, and access to the TestConductor extension.
3
4
## MultiNodeSpec Class
5
6
```scala { .api }
7
abstract class MultiNodeSpec(config: MultiNodeConfig)
8
extends TestKit(system) with MultiNodeSpecCallbacks
9
```
10
11
### Constructors
12
13
```scala { .api }
14
// Primary constructor (most commonly used)
15
def this(config: MultiNodeConfig)
16
17
// Constructor with custom ActorSystem creator
18
def this(config: MultiNodeConfig, actorSystemCreator: Config => ActorSystem)
19
```
20
21
**Usage Example:**
22
23
```scala
24
object MyTestConfig extends MultiNodeConfig {
25
val first = role("first")
26
val second = role("second")
27
}
28
29
class MyMultiNodeTest extends MultiNodeSpec(MyTestConfig)
30
with AnyWordSpecLike with Matchers {
31
32
def initialParticipants = roles.size
33
34
// Test implementation
35
}
36
```
37
38
## Required Implementation
39
40
### Initial Participants
41
42
```scala { .api }
43
def initialParticipants: Int
44
```
45
46
**Must be implemented** by subclasses to define the number of participants required before starting the test. This is typically set to the number of roles.
47
48
**Usage Example:**
49
50
```scala
51
class ClusterTest extends MultiNodeSpec(ClusterConfig) {
52
import ClusterConfig._
53
54
def initialParticipants = roles.size // Most common pattern
55
56
// Alternative: specific number
57
// def initialParticipants = 3
58
}
59
```
60
61
**Requirements:**
62
- Must be a `def` (not `val` or `lazy val`)
63
- Must return a value greater than 0
64
- Must not exceed the total number of available nodes
65
66
## Node Execution Control
67
68
### Run On Specific Nodes
69
70
```scala { .api }
71
def runOn(nodes: RoleName*)(thunk: => Unit): Unit
72
```
73
74
Executes the given code block only on the specified nodes. Other nodes will skip the block entirely.
75
76
**Usage Example:**
77
78
```scala
79
runOn(first) {
80
// This code only runs on the 'first' node
81
val cluster = Cluster(system)
82
cluster.join(cluster.selfAddress)
83
84
system.actorOf(Props[ClusterListener](), "listener")
85
}
86
87
runOn(second, third) {
88
// This code runs on both 'second' and 'third' nodes
89
val cluster = Cluster(system)
90
cluster.join(node(first).address)
91
}
92
```
93
94
**Parameters:**
95
- `nodes: RoleName*` - Variable number of role names to execute on
96
- `thunk: => Unit` - Code block to execute (call-by-name)
97
98
### Node Identity Check
99
100
```scala { .api }
101
def isNode(nodes: RoleName*): Boolean
102
```
103
104
Checks if the current node matches any of the specified roles.
105
106
**Usage Example:**
107
108
```scala
109
if (isNode(first)) {
110
log.info("I am the first node")
111
} else if (isNode(second, third)) {
112
log.info("I am either second or third node")
113
}
114
115
// Using in conditionals
116
val seedNode = if (isNode(first)) cluster.selfAddress else node(first).address
117
```
118
119
**Parameters:**
120
- `nodes: RoleName*` - Role names to check against
121
122
**Returns:** `Boolean` - True if current node matches any specified role
123
124
## Barrier Synchronization
125
126
### Basic Barrier Entry
127
128
```scala { .api }
129
def enterBarrier(name: String*): Unit
130
```
131
132
Enters the named barriers in the specified order using the default barrier timeout.
133
134
**Usage Example:**
135
136
```scala
137
"cluster formation" must {
138
"start all nodes" in {
139
// All nodes wait here until everyone reaches this point
140
enterBarrier("startup")
141
}
142
143
"form cluster" in {
144
runOn(first) {
145
Cluster(system).join(Cluster(system).selfAddress)
146
}
147
enterBarrier("cluster-started")
148
149
runOn(second, third) {
150
Cluster(system).join(node(first).address)
151
}
152
enterBarrier("all-joined")
153
}
154
}
155
```
156
157
**Parameters:**
158
- `name: String*` - Variable number of barrier names to enter in sequence
159
160
### Barrier Entry with Timeout
161
162
```scala { .api }
163
def enterBarrier(max: FiniteDuration, name: String*): Unit
164
```
165
166
Enters barriers with a custom timeout, overriding the default barrier timeout.
167
168
**Usage Example:**
169
170
```scala
171
// Use longer timeout for slow operations
172
enterBarrier(60.seconds, "slow-initialization")
173
174
// Multiple barriers with custom timeout
175
enterBarrier(30.seconds, "phase1", "phase2", "phase3")
176
```
177
178
**Parameters:**
179
- `max: FiniteDuration` - Maximum time to wait for all nodes to reach barriers
180
- `name: String*` - Barrier names to enter in sequence
181
182
**Note:** The timeout is automatically scaled using `Duration.dilated` based on the `akka.test.timefactor` configuration.
183
184
## Node Communication
185
186
### Node Address Resolution
187
188
```scala { .api }
189
def node(role: RoleName): ActorPath
190
```
191
192
Returns the root ActorPath for the specified role, enabling actor selection and messaging between nodes.
193
194
**Usage Example:**
195
196
```scala
197
// Get reference to actor on another node
198
val remoteActor = system.actorSelection(node(first) / "user" / "serviceActor")
199
200
// Send message to remote actor
201
remoteActor ! "Hello from another node"
202
203
// Create actor path for deployment
204
val remotePath = RootActorPath(node(second).address) / "user" / "worker"
205
```
206
207
**Parameters:**
208
- `role: RoleName` - Role name to get address for
209
210
**Returns:** `ActorPath` - Root actor path for the specified node
211
212
## Utility Methods
213
214
### Dead Letter Suppression
215
216
```scala { .api }
217
def muteDeadLetters(messageClasses: Class[_]*)(sys: ActorSystem = system): Unit
218
```
219
220
Suppresses dead letter logging for specified message types, useful for tests that intentionally create dead letters.
221
222
**Usage Example:**
223
224
```scala
225
// Mute all dead letters
226
muteDeadLetters()
227
228
// Mute specific message types
229
muteDeadLetters(classOf[String], classOf[MyMessage])
230
231
// Mute on specific system
232
muteDeadLetters(classOf[ClusterEvent.MemberUp])(otherSystem)
233
```
234
235
**Parameters:**
236
- `messageClasses: Class[_]*` - Message classes to mute (empty = mute all)
237
- `sys: ActorSystem` - Actor system to apply muting (defaults to current system)
238
239
### System Restart
240
241
```scala { .api }
242
protected def startNewSystem(): ActorSystem
243
```
244
245
Creates a new ActorSystem with the same configuration and re-registers with the TestConductor. Used for testing system restarts and recovery scenarios.
246
247
**Usage Example:**
248
249
```scala
250
"system recovery" must {
251
"restart and rejoin cluster" in {
252
runOn(second) {
253
// Shutdown current system
254
system.terminate()
255
Await.ready(system.whenTerminated, 10.seconds)
256
257
// Start new system with same configuration
258
val newSystem = startNewSystem()
259
// New system is automatically registered with TestConductor
260
261
enterBarrier("system-restarted")
262
}
263
}
264
}
265
```
266
267
**Returns:** `ActorSystem` - New actor system with injected deployments and TestConductor
268
269
**Note:** Must be called before entering barriers or using TestConductor after system termination.
270
271
## TestKit Inherited Methods
272
273
MultiNodeSpec extends TestKit, providing access to essential testing utilities:
274
275
### Message Expectations
276
277
```scala { .api }
278
def expectMsg[T](obj: T): T
279
def expectMsg[T](d: FiniteDuration, obj: T): T
280
def expectMsgType[T](implicit t: ClassTag[T]): T
281
def expectMsgType[T](d: FiniteDuration)(implicit t: ClassTag[T]): T
282
def expectNoMsg(): Unit
283
def expectNoMsg(d: FiniteDuration): Unit
284
```
285
286
**Usage Example:**
287
288
```scala
289
runOn(first) {
290
val probe = TestProbe()
291
val actor = system.actorOf(Props[MyActor]())
292
293
actor.tell("ping", probe.ref)
294
probe.expectMsg(5.seconds, "pong")
295
probe.expectNoMsg(1.second)
296
}
297
```
298
299
### Conditional Waiting
300
301
```scala { .api }
302
def awaitCond(p: => Boolean): Unit
303
def awaitCond(p: => Boolean, max: Duration): Unit
304
def awaitCond(p: => Boolean, max: Duration, interval: Duration): Unit
305
```
306
307
Repeatedly evaluates condition until it becomes true or timeout is reached.
308
309
**Usage Example:**
310
311
```scala
312
runOn(first, second) {
313
awaitCond(Cluster(system).state.members.size == 2, 10.seconds)
314
}
315
```
316
317
### Time-Bounded Execution
318
319
```scala { .api }
320
def within[T](min: FiniteDuration, max: FiniteDuration)(f: => T): T
321
def within[T](max: FiniteDuration)(f: => T): T
322
```
323
324
Executes block and verifies it completes within specified time bounds.
325
326
**Usage Example:**
327
328
```scala
329
within(5.seconds, 15.seconds) {
330
// This block must complete between 5 and 15 seconds
331
val cluster = Cluster(system)
332
cluster.join(node(first).address)
333
awaitCond(cluster.readView.members.size == roles.size)
334
}
335
```
336
337
## MultiNodeSpecCallbacks Interface
338
339
MultiNodeSpec implements the MultiNodeSpecCallbacks trait for test framework integration:
340
341
```scala { .api }
342
trait MultiNodeSpecCallbacks {
343
def multiNodeSpecBeforeAll(): Unit
344
def multiNodeSpecAfterAll(): Unit
345
}
346
```
347
348
### Before All Hook
349
350
```scala { .api }
351
def multiNodeSpecBeforeAll(): Unit
352
```
353
354
Called once before all test cases start. Override for custom initialization.
355
356
**Usage Example:**
357
358
```scala
359
class MyTest extends MultiNodeSpec(MyConfig) {
360
override def multiNodeSpecBeforeAll(): Unit = {
361
multiNodeSpecBeforeAll() // Call parent implementation
362
// Custom setup logic
363
log.info("Starting multi-node test suite")
364
}
365
}
366
```
367
368
### After All Hook
369
370
```scala { .api }
371
def multiNodeSpecAfterAll(): Unit
372
```
373
374
Called once after all test cases complete. Override for custom cleanup.
375
376
**Usage Example:**
377
378
```scala
379
class MyTest extends MultiNodeSpec(MyConfig) {
380
override def multiNodeSpecAfterAll(): Unit = {
381
// Custom cleanup logic
382
log.info("Multi-node test suite completed")
383
multiNodeSpecAfterAll() // Call parent implementation
384
}
385
}
386
```
387
388
### ScalaTest Integration
389
390
For ScalaTest integration, use the STMultiNodeSpec trait:
391
392
```scala { .api }
393
trait STMultiNodeSpec extends MultiNodeSpecCallbacks
394
with AnyWordSpecLike with Matchers with BeforeAndAfterAll {
395
396
override def beforeAll(): Unit = multiNodeSpecBeforeAll()
397
override def afterAll(): Unit = multiNodeSpecAfterAll()
398
}
399
```
400
401
**Usage Example:**
402
403
```scala
404
class MyTest extends MultiNodeSpec(MyConfig) with STMultiNodeSpec {
405
// Test implementation automatically gets proper setup/teardown
406
407
"My distributed system" must {
408
"handle basic operations" in {
409
// Test code here
410
}
411
}
412
}
413
```
414
415
## Properties and State
416
417
### Role Information
418
419
```scala { .api }
420
val myself: RoleName // Current node's role
421
def roles: immutable.Seq[RoleName] // All registered roles
422
val log: LoggingAdapter // Logging adapter
423
```
424
425
### TestConductor Access
426
427
```scala { .api }
428
var testConductor: TestConductorExt // Access to barriers and failure injection
429
```
430
431
The TestConductor is automatically initialized and provides access to advanced coordination features like network failure injection.
432
433
**Usage Example:**
434
435
```scala
436
"network partition test" must {
437
"handle split brain" in {
438
// Create network partition
439
testConductor.blackhole(first, second, Direction.Both).await
440
441
enterBarrier("partition-created")
442
443
// Test behavior during partition
444
445
// Restore network
446
testConductor.passThrough(first, second, Direction.Both).await
447
448
enterBarrier("partition-healed")
449
}
450
}
451
```
452
453
### Configuration Access
454
455
```scala { .api }
456
def shutdownTimeout: FiniteDuration // Timeout for system shutdown (default 15.seconds)
457
def verifySystemShutdown: Boolean // Whether to verify clean shutdown (default false)
458
```
459
460
## Lifecycle Hooks
461
462
### Startup Hook
463
464
```scala { .api }
465
protected def atStartup(): Unit
466
```
467
468
Override to perform initialization when the entire test is starting up.
469
470
**Usage Example:**
471
472
```scala
473
class MyTest extends MultiNodeSpec(MyConfig) {
474
override protected def atStartup(): Unit = {
475
log.info("Multi-node test starting")
476
// Custom initialization logic
477
}
478
}
479
```
480
481
### Termination Hook
482
483
```scala { .api }
484
protected def afterTermination(): Unit
485
```
486
487
Override to perform cleanup when the entire test is terminating.
488
489
**Usage Example:**
490
491
```scala
492
class MyTest extends MultiNodeSpec(MyConfig) {
493
override protected def afterTermination(): Unit = {
494
log.info("Multi-node test completed")
495
// Custom cleanup logic
496
}
497
}
498
```
499
500
## Awaitable Helper
501
502
MultiNodeSpec provides an implicit conversion for enhanced Future handling:
503
504
```scala { .api }
505
implicit def awaitHelper[T](w: Awaitable[T]): AwaitHelper[T]
506
507
class AwaitHelper[T](w: Awaitable[T]) {
508
def await: T // Uses remaining duration from enclosing within block or QueryTimeout
509
}
510
```
511
512
**Usage Example:**
513
514
```scala
515
// Instead of using Await.result explicitly
516
val result = someFuture.await
517
518
// Automatically uses remaining time from within block
519
within(30.seconds) {
520
val result1 = future1.await // Has up to 30 seconds
521
val result2 = future2.await // Has remaining time after future1
522
}
523
```
524
525
## Complete Test Example
526
527
```scala
528
object ClusterTestConfig extends MultiNodeConfig {
529
val first = role("first")
530
val second = role("second")
531
val third = role("third")
532
533
commonConfig(ConfigFactory.parseString("""
534
akka {
535
actor.provider = cluster
536
remote.artery.canonical.port = 0
537
cluster {
538
seed-nodes = []
539
auto-down-unreachable-after = 2s
540
}
541
}
542
"""))
543
}
544
545
class ClusterTest extends MultiNodeSpec(ClusterTestConfig)
546
with AnyWordSpecLike with Matchers {
547
548
import ClusterTestConfig._
549
550
def initialParticipants = roles.size
551
552
"A cluster" must {
553
"start up" in {
554
enterBarrier("startup")
555
}
556
557
"form cluster" in {
558
runOn(first) {
559
Cluster(system).join(Cluster(system).selfAddress)
560
}
561
enterBarrier("first-joined")
562
563
runOn(second, third) {
564
Cluster(system).join(node(first).address)
565
}
566
567
within(15.seconds) {
568
awaitCond(Cluster(system).state.members.size == 3)
569
}
570
571
enterBarrier("cluster-formed")
572
}
573
574
"handle node removal" in {
575
runOn(first) {
576
Cluster(system).leave(node(third).address)
577
}
578
579
runOn(first, second) {
580
within(10.seconds) {
581
awaitCond(Cluster(system).state.members.size == 2)
582
}
583
}
584
585
runOn(third) {
586
within(10.seconds) {
587
awaitCond(Cluster(system).isTerminated)
588
}
589
}
590
591
enterBarrier("node-removed")
592
}
593
}
594
}
595
```